1# Copyright 2012 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, 10# software distributed under the License is distributed on an 11# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 12# either express or implied. See the License for the specific 13# language governing permissions and limitations under the License. 14 15"""File Interface for Google Cloud Storage.""" 16 17 18 19from __future__ import with_statement 20 21 22 23__all__ = ['delete', 24 'listbucket', 25 'open', 26 'stat', 27 ] 28 29import logging 30import StringIO 31import urllib 32import xml.etree.cElementTree as ET 33from . import api_utils 34from . import common 35from . import errors 36from . import storage_api 37 38 39 40def open(filename, 41 mode='r', 42 content_type=None, 43 options=None, 44 read_buffer_size=storage_api.ReadBuffer.DEFAULT_BUFFER_SIZE, 45 retry_params=None, 46 _account_id=None): 47 """Opens a Google Cloud Storage file and returns it as a File-like object. 48 49 Args: 50 filename: A Google Cloud Storage filename of form '/bucket/filename'. 51 mode: 'r' for reading mode. 'w' for writing mode. 52 In reading mode, the file must exist. In writing mode, a file will 53 be created or be overrode. 54 content_type: The MIME type of the file. str. Only valid in writing mode. 55 options: A str->basestring dict to specify additional headers to pass to 56 GCS e.g. {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}. 57 Supported options are x-goog-acl, x-goog-meta-, cache-control, 58 content-disposition, and content-encoding. 59 Only valid in writing mode. 60 See https://developers.google.com/storage/docs/reference-headers 61 for details. 62 read_buffer_size: The buffer size for read. Read keeps a buffer 63 and prefetches another one. To minimize blocking for large files, 64 always read by buffer size. To minimize number of RPC requests for 65 small files, set a large buffer size. Max is 30MB. 66 retry_params: An instance of api_utils.RetryParams for subsequent calls 67 to GCS from this file handle. If None, the default one is used. 68 _account_id: Internal-use only. 69 70 Returns: 71 A reading or writing buffer that supports File-like interface. Buffer 72 must be closed after operations are done. 73 74 Raises: 75 errors.AuthorizationError: if authorization failed. 76 errors.NotFoundError: if an object that's expected to exist doesn't. 77 ValueError: invalid open mode or if content_type or options are specified 78 in reading mode. 79 """ 80 common.validate_file_path(filename) 81 api = storage_api._get_storage_api(retry_params=retry_params, 82 account_id=_account_id) 83 filename = api_utils._quote_filename(filename) 84 85 if mode == 'w': 86 common.validate_options(options) 87 return storage_api.StreamingBuffer(api, filename, content_type, options) 88 elif mode == 'r': 89 if content_type or options: 90 raise ValueError('Options and content_type can only be specified ' 91 'for writing mode.') 92 return storage_api.ReadBuffer(api, 93 filename, 94 buffer_size=read_buffer_size) 95 else: 96 raise ValueError('Invalid mode %s.' % mode) 97 98 99def delete(filename, retry_params=None, _account_id=None): 100 """Delete a Google Cloud Storage file. 101 102 Args: 103 filename: A Google Cloud Storage filename of form '/bucket/filename'. 104 retry_params: An api_utils.RetryParams for this call to GCS. If None, 105 the default one is used. 106 _account_id: Internal-use only. 107 108 Raises: 109 errors.NotFoundError: if the file doesn't exist prior to deletion. 110 """ 111 api = storage_api._get_storage_api(retry_params=retry_params, 112 account_id=_account_id) 113 common.validate_file_path(filename) 114 filename = api_utils._quote_filename(filename) 115 status, resp_headers, content = api.delete_object(filename) 116 errors.check_status(status, [204], filename, resp_headers=resp_headers, 117 body=content) 118 119 120def stat(filename, retry_params=None, _account_id=None): 121 """Get GCSFileStat of a Google Cloud storage file. 122 123 Args: 124 filename: A Google Cloud Storage filename of form '/bucket/filename'. 125 retry_params: An api_utils.RetryParams for this call to GCS. If None, 126 the default one is used. 127 _account_id: Internal-use only. 128 129 Returns: 130 a GCSFileStat object containing info about this file. 131 132 Raises: 133 errors.AuthorizationError: if authorization failed. 134 errors.NotFoundError: if an object that's expected to exist doesn't. 135 """ 136 common.validate_file_path(filename) 137 api = storage_api._get_storage_api(retry_params=retry_params, 138 account_id=_account_id) 139 status, headers, content = api.head_object( 140 api_utils._quote_filename(filename)) 141 errors.check_status(status, [200], filename, resp_headers=headers, 142 body=content) 143 file_stat = common.GCSFileStat( 144 filename=filename, 145 st_size=common.get_stored_content_length(headers), 146 st_ctime=common.http_time_to_posix(headers.get('last-modified')), 147 etag=headers.get('etag'), 148 content_type=headers.get('content-type'), 149 metadata=common.get_metadata(headers)) 150 151 return file_stat 152 153 154def _copy2(src, dst, metadata=None, retry_params=None): 155 """Copy the file content from src to dst. 156 157 Internal use only! 158 159 Args: 160 src: /bucket/filename 161 dst: /bucket/filename 162 metadata: a dict of metadata for this copy. If None, old metadata is copied. 163 For example, {'x-goog-meta-foo': 'bar'}. 164 retry_params: An api_utils.RetryParams for this call to GCS. If None, 165 the default one is used. 166 167 Raises: 168 errors.AuthorizationError: if authorization failed. 169 errors.NotFoundError: if an object that's expected to exist doesn't. 170 """ 171 common.validate_file_path(src) 172 common.validate_file_path(dst) 173 174 if metadata is None: 175 metadata = {} 176 copy_meta = 'COPY' 177 else: 178 copy_meta = 'REPLACE' 179 metadata.update({'x-goog-copy-source': src, 180 'x-goog-metadata-directive': copy_meta}) 181 182 api = storage_api._get_storage_api(retry_params=retry_params) 183 status, resp_headers, content = api.put_object( 184 api_utils._quote_filename(dst), headers=metadata) 185 errors.check_status(status, [200], src, metadata, resp_headers, body=content) 186 187 188def listbucket(path_prefix, marker=None, prefix=None, max_keys=None, 189 delimiter=None, retry_params=None, _account_id=None): 190 """Returns a GCSFileStat iterator over a bucket. 191 192 Optional arguments can limit the result to a subset of files under bucket. 193 194 This function has two modes: 195 1. List bucket mode: Lists all files in the bucket without any concept of 196 hierarchy. GCS doesn't have real directory hierarchies. 197 2. Directory emulation mode: If you specify the 'delimiter' argument, 198 it is used as a path separator to emulate a hierarchy of directories. 199 In this mode, the "path_prefix" argument should end in the delimiter 200 specified (thus designates a logical directory). The logical directory's 201 contents, both files and subdirectories, are listed. The names of 202 subdirectories returned will end with the delimiter. So listbucket 203 can be called with the subdirectory name to list the subdirectory's 204 contents. 205 206 Args: 207 path_prefix: A Google Cloud Storage path of format "/bucket" or 208 "/bucket/prefix". Only objects whose fullpath starts with the 209 path_prefix will be returned. 210 marker: Another path prefix. Only objects whose fullpath starts 211 lexicographically after marker will be returned (exclusive). 212 prefix: Deprecated. Use path_prefix. 213 max_keys: The limit on the number of objects to return. int. 214 For best performance, specify max_keys only if you know how many objects 215 you want. Otherwise, this method requests large batches and handles 216 pagination for you. 217 delimiter: Use to turn on directory mode. str of one or multiple chars 218 that your bucket uses as its directory separator. 219 retry_params: An api_utils.RetryParams for this call to GCS. If None, 220 the default one is used. 221 _account_id: Internal-use only. 222 223 Examples: 224 For files "/bucket/a", 225 "/bucket/bar/1" 226 "/bucket/foo", 227 "/bucket/foo/1", "/bucket/foo/2/1", "/bucket/foo/3/1", 228 229 Regular mode: 230 listbucket("/bucket/f", marker="/bucket/foo/1") 231 will match "/bucket/foo/2/1", "/bucket/foo/3/1". 232 233 Directory mode: 234 listbucket("/bucket/", delimiter="/") 235 will match "/bucket/a, "/bucket/bar/" "/bucket/foo", "/bucket/foo/". 236 listbucket("/bucket/foo/", delimiter="/") 237 will match "/bucket/foo/1", "/bucket/foo/2/", "/bucket/foo/3/" 238 239 Returns: 240 Regular mode: 241 A GCSFileStat iterator over matched files ordered by filename. 242 The iterator returns GCSFileStat objects. filename, etag, st_size, 243 st_ctime, and is_dir are set. 244 245 Directory emulation mode: 246 A GCSFileStat iterator over matched files and directories ordered by 247 name. The iterator returns GCSFileStat objects. For directories, 248 only the filename and is_dir fields are set. 249 250 The last name yielded can be used as next call's marker. 251 """ 252 if prefix: 253 common.validate_bucket_path(path_prefix) 254 bucket = path_prefix 255 else: 256 bucket, prefix = common._process_path_prefix(path_prefix) 257 258 if marker and marker.startswith(bucket): 259 marker = marker[len(bucket) + 1:] 260 261 api = storage_api._get_storage_api(retry_params=retry_params, 262 account_id=_account_id) 263 options = {} 264 if marker: 265 options['marker'] = marker 266 if max_keys: 267 options['max-keys'] = max_keys 268 if prefix: 269 options['prefix'] = prefix 270 if delimiter: 271 options['delimiter'] = delimiter 272 273 return _Bucket(api, bucket, options) 274 275 276class _Bucket(object): 277 """A wrapper for a GCS bucket as the return value of listbucket.""" 278 279 def __init__(self, api, path, options): 280 """Initialize. 281 282 Args: 283 api: storage_api instance. 284 path: bucket path of form '/bucket'. 285 options: a dict of listbucket options. Please see listbucket doc. 286 """ 287 self._init(api, path, options) 288 289 def _init(self, api, path, options): 290 self._api = api 291 self._path = path 292 self._options = options.copy() 293 self._get_bucket_fut = self._api.get_bucket_async( 294 self._path + '?' + urllib.urlencode(self._options)) 295 self._last_yield = None 296 self._new_max_keys = self._options.get('max-keys') 297 298 def __getstate__(self): 299 options = self._options 300 if self._last_yield: 301 options['marker'] = self._last_yield.filename[len(self._path) + 1:] 302 if self._new_max_keys is not None: 303 options['max-keys'] = self._new_max_keys 304 return {'api': self._api, 305 'path': self._path, 306 'options': options} 307 308 def __setstate__(self, state): 309 self._init(state['api'], state['path'], state['options']) 310 311 def __iter__(self): 312 """Iter over the bucket. 313 314 Yields: 315 GCSFileStat: a GCSFileStat for an object in the bucket. 316 They are ordered by GCSFileStat.filename. 317 """ 318 total = 0 319 max_keys = self._options.get('max-keys') 320 321 while self._get_bucket_fut: 322 status, resp_headers, content = self._get_bucket_fut.get_result() 323 errors.check_status(status, [200], self._path, resp_headers=resp_headers, 324 body=content, extras=self._options) 325 326 if self._should_get_another_batch(content): 327 self._get_bucket_fut = self._api.get_bucket_async( 328 self._path + '?' + urllib.urlencode(self._options)) 329 else: 330 self._get_bucket_fut = None 331 332 root = ET.fromstring(content) 333 dirs = self._next_dir_gen(root) 334 files = self._next_file_gen(root) 335 next_file = files.next() 336 next_dir = dirs.next() 337 338 while ((max_keys is None or total < max_keys) and 339 not (next_file is None and next_dir is None)): 340 total += 1 341 if next_file is None: 342 self._last_yield = next_dir 343 next_dir = dirs.next() 344 elif next_dir is None: 345 self._last_yield = next_file 346 next_file = files.next() 347 elif next_dir < next_file: 348 self._last_yield = next_dir 349 next_dir = dirs.next() 350 elif next_file < next_dir: 351 self._last_yield = next_file 352 next_file = files.next() 353 else: 354 logging.error( 355 'Should never reach. next file is %r. next dir is %r.', 356 next_file, next_dir) 357 if self._new_max_keys: 358 self._new_max_keys -= 1 359 yield self._last_yield 360 361 def _next_file_gen(self, root): 362 """Generator for next file element in the document. 363 364 Args: 365 root: root element of the XML tree. 366 367 Yields: 368 GCSFileStat for the next file. 369 """ 370 for e in root.getiterator(common._T_CONTENTS): 371 st_ctime, size, etag, key = None, None, None, None 372 for child in e.getiterator('*'): 373 if child.tag == common._T_LAST_MODIFIED: 374 st_ctime = common.dt_str_to_posix(child.text) 375 elif child.tag == common._T_ETAG: 376 etag = child.text 377 elif child.tag == common._T_SIZE: 378 size = child.text 379 elif child.tag == common._T_KEY: 380 key = child.text 381 yield common.GCSFileStat(self._path + '/' + key, 382 size, etag, st_ctime) 383 e.clear() 384 yield None 385 386 def _next_dir_gen(self, root): 387 """Generator for next directory element in the document. 388 389 Args: 390 root: root element in the XML tree. 391 392 Yields: 393 GCSFileStat for the next directory. 394 """ 395 for e in root.getiterator(common._T_COMMON_PREFIXES): 396 yield common.GCSFileStat( 397 self._path + '/' + e.find(common._T_PREFIX).text, 398 st_size=None, etag=None, st_ctime=None, is_dir=True) 399 e.clear() 400 yield None 401 402 def _should_get_another_batch(self, content): 403 """Whether to issue another GET bucket call. 404 405 Args: 406 content: response XML. 407 408 Returns: 409 True if should, also update self._options for the next request. 410 False otherwise. 411 """ 412 if ('max-keys' in self._options and 413 self._options['max-keys'] <= common._MAX_GET_BUCKET_RESULT): 414 return False 415 416 elements = self._find_elements( 417 content, set([common._T_IS_TRUNCATED, 418 common._T_NEXT_MARKER])) 419 if elements.get(common._T_IS_TRUNCATED, 'false').lower() != 'true': 420 return False 421 422 next_marker = elements.get(common._T_NEXT_MARKER) 423 if next_marker is None: 424 self._options.pop('marker', None) 425 return False 426 self._options['marker'] = next_marker 427 return True 428 429 def _find_elements(self, result, elements): 430 """Find interesting elements from XML. 431 432 This function tries to only look for specified elements 433 without parsing the entire XML. The specified elements is better 434 located near the beginning. 435 436 Args: 437 result: response XML. 438 elements: a set of interesting element tags. 439 440 Returns: 441 A dict from element tag to element value. 442 """ 443 element_mapping = {} 444 result = StringIO.StringIO(result) 445 for _, e in ET.iterparse(result, events=('end',)): 446 if not elements: 447 break 448 if e.tag in elements: 449 element_mapping[e.tag] = e.text 450 elements.remove(e.tag) 451 return element_mapping 452