1# Copyright 2012 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, 10# software distributed under the License is distributed on an 11# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 12# either express or implied. See the License for the specific 13# language governing permissions and limitations under the License. 14 15"""Python wrappers for the Google Storage RESTful API.""" 16 17 18 19 20 21__all__ = ['ReadBuffer', 22 'StreamingBuffer', 23 ] 24 25import collections 26import os 27import urlparse 28 29from . import api_utils 30from . import common 31from . import errors 32from . import rest_api 33 34try: 35 from google.appengine.api import urlfetch 36 from google.appengine.ext import ndb 37except ImportError: 38 from google.appengine.api import urlfetch 39 from google.appengine.ext import ndb 40 41 42 43def _get_storage_api(retry_params, account_id=None): 44 """Returns storage_api instance for API methods. 45 46 Args: 47 retry_params: An instance of api_utils.RetryParams. If none, 48 thread's default will be used. 49 account_id: Internal-use only. 50 51 Returns: 52 A storage_api instance to handle urlfetch work to GCS. 53 On dev appserver, this instance by default will talk to a local stub 54 unless common.ACCESS_TOKEN is set. That token will be used to talk 55 to the real GCS. 56 """ 57 58 59 api = _StorageApi(_StorageApi.full_control_scope, 60 service_account_id=account_id, 61 retry_params=retry_params) 62 if common.local_run() and not common.get_access_token(): 63 api.api_url = common.local_api_url() 64 if common.get_access_token(): 65 api.token = common.get_access_token() 66 return api 67 68 69class _StorageApi(rest_api._RestApi): 70 """A simple wrapper for the Google Storage RESTful API. 71 72 WARNING: Do NOT directly use this api. It's an implementation detail 73 and is subject to change at any release. 74 75 All async methods have similar args and returns. 76 77 Args: 78 path: The path to the Google Storage object or bucket, e.g. 79 '/mybucket/myfile' or '/mybucket'. 80 **kwd: Options for urlfetch. e.g. 81 headers={'content-type': 'text/plain'}, payload='blah'. 82 83 Returns: 84 A ndb Future. When fulfilled, future.get_result() should return 85 a tuple of (status, headers, content) that represents a HTTP response 86 of Google Cloud Storage XML API. 87 """ 88 89 api_url = 'https://storage.googleapis.com' 90 read_only_scope = 'https://www.googleapis.com/auth/devstorage.read_only' 91 read_write_scope = 'https://www.googleapis.com/auth/devstorage.read_write' 92 full_control_scope = 'https://www.googleapis.com/auth/devstorage.full_control' 93 94 def __getstate__(self): 95 """Store state as part of serialization/pickling. 96 97 Returns: 98 A tuple (of dictionaries) with the state of this object 99 """ 100 return (super(_StorageApi, self).__getstate__(), {'api_url': self.api_url}) 101 102 def __setstate__(self, state): 103 """Restore state as part of deserialization/unpickling. 104 105 Args: 106 state: the tuple from a __getstate__ call 107 """ 108 superstate, localstate = state 109 super(_StorageApi, self).__setstate__(superstate) 110 self.api_url = localstate['api_url'] 111 112 @api_utils._eager_tasklet 113 @ndb.tasklet 114 def do_request_async(self, url, method='GET', headers=None, payload=None, 115 deadline=None, callback=None): 116 """Inherit docs. 117 118 This method translates urlfetch exceptions to more service specific ones. 119 """ 120 if headers is None: 121 headers = {} 122 if 'x-goog-api-version' not in headers: 123 headers['x-goog-api-version'] = '2' 124 headers['accept-encoding'] = 'gzip, *' 125 try: 126 resp_tuple = yield super(_StorageApi, self).do_request_async( 127 url, method=method, headers=headers, payload=payload, 128 deadline=deadline, callback=callback) 129 except urlfetch.DownloadError, e: 130 raise errors.TimeoutError( 131 'Request to Google Cloud Storage timed out.', e) 132 133 raise ndb.Return(resp_tuple) 134 135 136 def post_object_async(self, path, **kwds): 137 """POST to an object.""" 138 return self.do_request_async(self.api_url + path, 'POST', **kwds) 139 140 def put_object_async(self, path, **kwds): 141 """PUT an object.""" 142 return self.do_request_async(self.api_url + path, 'PUT', **kwds) 143 144 def get_object_async(self, path, **kwds): 145 """GET an object. 146 147 Note: No payload argument is supported. 148 """ 149 return self.do_request_async(self.api_url + path, 'GET', **kwds) 150 151 def delete_object_async(self, path, **kwds): 152 """DELETE an object. 153 154 Note: No payload argument is supported. 155 """ 156 return self.do_request_async(self.api_url + path, 'DELETE', **kwds) 157 158 def head_object_async(self, path, **kwds): 159 """HEAD an object. 160 161 Depending on request headers, HEAD returns various object properties, 162 e.g. Content-Length, Last-Modified, and ETag. 163 164 Note: No payload argument is supported. 165 """ 166 return self.do_request_async(self.api_url + path, 'HEAD', **kwds) 167 168 def get_bucket_async(self, path, **kwds): 169 """GET a bucket.""" 170 return self.do_request_async(self.api_url + path, 'GET', **kwds) 171 172 173_StorageApi = rest_api.add_sync_methods(_StorageApi) 174 175 176class ReadBuffer(object): 177 """A class for reading Google storage files.""" 178 179 DEFAULT_BUFFER_SIZE = 1024 * 1024 180 MAX_REQUEST_SIZE = 30 * DEFAULT_BUFFER_SIZE 181 182 def __init__(self, 183 api, 184 path, 185 buffer_size=DEFAULT_BUFFER_SIZE, 186 max_request_size=MAX_REQUEST_SIZE): 187 """Constructor. 188 189 Args: 190 api: A StorageApi instance. 191 path: Quoted/escaped path to the object, e.g. /mybucket/myfile 192 buffer_size: buffer size. The ReadBuffer keeps 193 one buffer. But there may be a pending future that contains 194 a second buffer. This size must be less than max_request_size. 195 max_request_size: Max bytes to request in one urlfetch. 196 """ 197 self._api = api 198 self._path = path 199 self.name = api_utils._unquote_filename(path) 200 self.closed = False 201 202 assert buffer_size <= max_request_size 203 self._buffer_size = buffer_size 204 self._max_request_size = max_request_size 205 self._offset = 0 206 self._buffer = _Buffer() 207 self._etag = None 208 209 get_future = self._get_segment(0, self._buffer_size, check_response=False) 210 211 status, headers, content = self._api.head_object(path) 212 errors.check_status(status, [200], path, resp_headers=headers, body=content) 213 self._file_size = long(common.get_stored_content_length(headers)) 214 self._check_etag(headers.get('etag')) 215 216 self._buffer_future = None 217 218 if self._file_size != 0: 219 content, check_response_closure = get_future.get_result() 220 check_response_closure() 221 self._buffer.reset(content) 222 self._request_next_buffer() 223 224 def __getstate__(self): 225 """Store state as part of serialization/pickling. 226 227 The contents of the read buffer are not stored, only the current offset for 228 data read by the client. A new read buffer is established at unpickling. 229 The head information for the object (file size and etag) are stored to 230 reduce startup and ensure the file has not changed. 231 232 Returns: 233 A dictionary with the state of this object 234 """ 235 return {'api': self._api, 236 'path': self._path, 237 'buffer_size': self._buffer_size, 238 'request_size': self._max_request_size, 239 'etag': self._etag, 240 'size': self._file_size, 241 'offset': self._offset, 242 'closed': self.closed} 243 244 def __setstate__(self, state): 245 """Restore state as part of deserialization/unpickling. 246 247 Args: 248 state: the dictionary from a __getstate__ call 249 250 Along with restoring the state, pre-fetch the next read buffer. 251 """ 252 self._api = state['api'] 253 self._path = state['path'] 254 self.name = api_utils._unquote_filename(self._path) 255 self._buffer_size = state['buffer_size'] 256 self._max_request_size = state['request_size'] 257 self._etag = state['etag'] 258 self._file_size = state['size'] 259 self._offset = state['offset'] 260 self._buffer = _Buffer() 261 self.closed = state['closed'] 262 self._buffer_future = None 263 if self._remaining() and not self.closed: 264 self._request_next_buffer() 265 266 def __iter__(self): 267 """Iterator interface. 268 269 Note the ReadBuffer container itself is the iterator. It's 270 (quote PEP0234) 271 'destructive: they consumes all the values and a second iterator 272 cannot easily be created that iterates independently over the same values. 273 You could open the file for the second time, or seek() to the beginning.' 274 275 Returns: 276 Self. 277 """ 278 return self 279 280 def next(self): 281 line = self.readline() 282 if not line: 283 raise StopIteration() 284 return line 285 286 def readline(self, size=-1): 287 """Read one line delimited by '\n' from the file. 288 289 A trailing newline character is kept in the string. It may be absent when a 290 file ends with an incomplete line. If the size argument is non-negative, 291 it specifies the maximum string size (counting the newline) to return. 292 A negative size is the same as unspecified. Empty string is returned 293 only when EOF is encountered immediately. 294 295 Args: 296 size: Maximum number of bytes to read. If not specified, readline stops 297 only on '\n' or EOF. 298 299 Returns: 300 The data read as a string. 301 302 Raises: 303 IOError: When this buffer is closed. 304 """ 305 self._check_open() 306 if size == 0 or not self._remaining(): 307 return '' 308 309 data_list = [] 310 newline_offset = self._buffer.find_newline(size) 311 while newline_offset < 0: 312 data = self._buffer.read(size) 313 size -= len(data) 314 self._offset += len(data) 315 data_list.append(data) 316 if size == 0 or not self._remaining(): 317 return ''.join(data_list) 318 self._buffer.reset(self._buffer_future.get_result()) 319 self._request_next_buffer() 320 newline_offset = self._buffer.find_newline(size) 321 322 data = self._buffer.read_to_offset(newline_offset + 1) 323 self._offset += len(data) 324 data_list.append(data) 325 326 return ''.join(data_list) 327 328 def read(self, size=-1): 329 """Read data from RAW file. 330 331 Args: 332 size: Number of bytes to read as integer. Actual number of bytes 333 read is always equal to size unless EOF is reached. If size is 334 negative or unspecified, read the entire file. 335 336 Returns: 337 data read as str. 338 339 Raises: 340 IOError: When this buffer is closed. 341 """ 342 self._check_open() 343 if not self._remaining(): 344 return '' 345 346 data_list = [] 347 while True: 348 remaining = self._buffer.remaining() 349 if size >= 0 and size < remaining: 350 data_list.append(self._buffer.read(size)) 351 self._offset += size 352 break 353 else: 354 size -= remaining 355 self._offset += remaining 356 data_list.append(self._buffer.read()) 357 358 if self._buffer_future is None: 359 if size < 0 or size >= self._remaining(): 360 needs = self._remaining() 361 else: 362 needs = size 363 data_list.extend(self._get_segments(self._offset, needs)) 364 self._offset += needs 365 break 366 367 if self._buffer_future: 368 self._buffer.reset(self._buffer_future.get_result()) 369 self._buffer_future = None 370 371 if self._buffer_future is None: 372 self._request_next_buffer() 373 return ''.join(data_list) 374 375 def _remaining(self): 376 return self._file_size - self._offset 377 378 def _request_next_buffer(self): 379 """Request next buffer. 380 381 Requires self._offset and self._buffer are in consistent state. 382 """ 383 self._buffer_future = None 384 next_offset = self._offset + self._buffer.remaining() 385 if next_offset != self._file_size: 386 self._buffer_future = self._get_segment(next_offset, 387 self._buffer_size) 388 389 def _get_segments(self, start, request_size): 390 """Get segments of the file from Google Storage as a list. 391 392 A large request is broken into segments to avoid hitting urlfetch 393 response size limit. Each segment is returned from a separate urlfetch. 394 395 Args: 396 start: start offset to request. Inclusive. Have to be within the 397 range of the file. 398 request_size: number of bytes to request. 399 400 Returns: 401 A list of file segments in order 402 """ 403 if not request_size: 404 return [] 405 406 end = start + request_size 407 futures = [] 408 409 while request_size > self._max_request_size: 410 futures.append(self._get_segment(start, self._max_request_size)) 411 request_size -= self._max_request_size 412 start += self._max_request_size 413 if start < end: 414 futures.append(self._get_segment(start, end-start)) 415 return [fut.get_result() for fut in futures] 416 417 @ndb.tasklet 418 def _get_segment(self, start, request_size, check_response=True): 419 """Get a segment of the file from Google Storage. 420 421 Args: 422 start: start offset of the segment. Inclusive. Have to be within the 423 range of the file. 424 request_size: number of bytes to request. Have to be small enough 425 for a single urlfetch request. May go over the logical range of the 426 file. 427 check_response: True to check the validity of GCS response automatically 428 before the future returns. False otherwise. See Yields section. 429 430 Yields: 431 If check_response is True, the segment [start, start + request_size) 432 of the file. 433 Otherwise, a tuple. The first element is the unverified file segment. 434 The second element is a closure that checks response. Caller should 435 first invoke the closure before consuing the file segment. 436 437 Raises: 438 ValueError: if the file has changed while reading. 439 """ 440 end = start + request_size - 1 441 content_range = '%d-%d' % (start, end) 442 headers = {'Range': 'bytes=' + content_range} 443 status, resp_headers, content = yield self._api.get_object_async( 444 self._path, headers=headers) 445 def _checker(): 446 errors.check_status(status, [200, 206], self._path, headers, 447 resp_headers, body=content) 448 self._check_etag(resp_headers.get('etag')) 449 if check_response: 450 _checker() 451 raise ndb.Return(content) 452 raise ndb.Return(content, _checker) 453 454 def _check_etag(self, etag): 455 """Check if etag is the same across requests to GCS. 456 457 If self._etag is None, set it. If etag is set, check that the new 458 etag equals the old one. 459 460 In the __init__ method, we fire one HEAD and one GET request using 461 ndb tasklet. One of them would return first and set the first value. 462 463 Args: 464 etag: etag from a GCS HTTP response. None if etag is not part of the 465 response header. It could be None for example in the case of GCS 466 composite file. 467 468 Raises: 469 ValueError: if two etags are not equal. 470 """ 471 if etag is None: 472 return 473 elif self._etag is None: 474 self._etag = etag 475 elif self._etag != etag: 476 raise ValueError('File on GCS has changed while reading.') 477 478 def close(self): 479 self.closed = True 480 self._buffer = None 481 self._buffer_future = None 482 483 def __enter__(self): 484 return self 485 486 def __exit__(self, atype, value, traceback): 487 self.close() 488 return False 489 490 def seek(self, offset, whence=os.SEEK_SET): 491 """Set the file's current offset. 492 493 Note if the new offset is out of bound, it is adjusted to either 0 or EOF. 494 495 Args: 496 offset: seek offset as number. 497 whence: seek mode. Supported modes are os.SEEK_SET (absolute seek), 498 os.SEEK_CUR (seek relative to the current position), and os.SEEK_END 499 (seek relative to the end, offset should be negative). 500 501 Raises: 502 IOError: When this buffer is closed. 503 ValueError: When whence is invalid. 504 """ 505 self._check_open() 506 507 self._buffer.reset() 508 self._buffer_future = None 509 510 if whence == os.SEEK_SET: 511 self._offset = offset 512 elif whence == os.SEEK_CUR: 513 self._offset += offset 514 elif whence == os.SEEK_END: 515 self._offset = self._file_size + offset 516 else: 517 raise ValueError('Whence mode %s is invalid.' % str(whence)) 518 519 self._offset = min(self._offset, self._file_size) 520 self._offset = max(self._offset, 0) 521 if self._remaining(): 522 self._request_next_buffer() 523 524 def tell(self): 525 """Tell the file's current offset. 526 527 Returns: 528 current offset in reading this file. 529 530 Raises: 531 IOError: When this buffer is closed. 532 """ 533 self._check_open() 534 return self._offset 535 536 def _check_open(self): 537 if self.closed: 538 raise IOError('Buffer is closed.') 539 540 def seekable(self): 541 return True 542 543 def readable(self): 544 return True 545 546 def writable(self): 547 return False 548 549 550class _Buffer(object): 551 """In memory buffer.""" 552 553 def __init__(self): 554 self.reset() 555 556 def reset(self, content='', offset=0): 557 self._buffer = content 558 self._offset = offset 559 560 def read(self, size=-1): 561 """Returns bytes from self._buffer and update related offsets. 562 563 Args: 564 size: number of bytes to read starting from current offset. 565 Read the entire buffer if negative. 566 567 Returns: 568 Requested bytes from buffer. 569 """ 570 if size < 0: 571 offset = len(self._buffer) 572 else: 573 offset = self._offset + size 574 return self.read_to_offset(offset) 575 576 def read_to_offset(self, offset): 577 """Returns bytes from self._buffer and update related offsets. 578 579 Args: 580 offset: read from current offset to this offset, exclusive. 581 582 Returns: 583 Requested bytes from buffer. 584 """ 585 assert offset >= self._offset 586 result = self._buffer[self._offset: offset] 587 self._offset += len(result) 588 return result 589 590 def remaining(self): 591 return len(self._buffer) - self._offset 592 593 def find_newline(self, size=-1): 594 """Search for newline char in buffer starting from current offset. 595 596 Args: 597 size: number of bytes to search. -1 means all. 598 599 Returns: 600 offset of newline char in buffer. -1 if doesn't exist. 601 """ 602 if size < 0: 603 return self._buffer.find('\n', self._offset) 604 return self._buffer.find('\n', self._offset, self._offset + size) 605 606 607class StreamingBuffer(object): 608 """A class for creating large objects using the 'resumable' API. 609 610 The API is a subset of the Python writable stream API sufficient to 611 support writing zip files using the zipfile module. 612 613 The exact sequence of calls and use of headers is documented at 614 https://developers.google.com/storage/docs/developer-guide#unknownresumables 615 """ 616 617 _blocksize = 256 * 1024 618 619 _flushsize = 8 * _blocksize 620 621 _maxrequestsize = 9 * 4 * _blocksize 622 623 def __init__(self, 624 api, 625 path, 626 content_type=None, 627 gcs_headers=None): 628 """Constructor. 629 630 Args: 631 api: A StorageApi instance. 632 path: Quoted/escaped path to the object, e.g. /mybucket/myfile 633 content_type: Optional content-type; Default value is 634 delegate to Google Cloud Storage. 635 gcs_headers: additional gs headers as a str->str dict, e.g 636 {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}. 637 Raises: 638 IOError: When this location can not be found. 639 """ 640 assert self._maxrequestsize > self._blocksize 641 assert self._maxrequestsize % self._blocksize == 0 642 assert self._maxrequestsize >= self._flushsize 643 644 self._api = api 645 self._path = path 646 647 self.name = api_utils._unquote_filename(path) 648 self.closed = False 649 650 self._buffer = collections.deque() 651 self._buffered = 0 652 self._written = 0 653 self._offset = 0 654 655 headers = {'x-goog-resumable': 'start'} 656 if content_type: 657 headers['content-type'] = content_type 658 if gcs_headers: 659 headers.update(gcs_headers) 660 status, resp_headers, content = self._api.post_object(path, headers=headers) 661 errors.check_status(status, [201], path, headers, resp_headers, 662 body=content) 663 loc = resp_headers.get('location') 664 if not loc: 665 raise IOError('No location header found in 201 response') 666 parsed = urlparse.urlparse(loc) 667 self._path_with_token = '%s?%s' % (self._path, parsed.query) 668 669 def __getstate__(self): 670 """Store state as part of serialization/pickling. 671 672 The contents of the write buffer are stored. Writes to the underlying 673 storage are required to be on block boundaries (_blocksize) except for the 674 last write. In the worst case the pickled version of this object may be 675 slightly larger than the blocksize. 676 677 Returns: 678 A dictionary with the state of this object 679 680 """ 681 return {'api': self._api, 682 'path': self._path, 683 'path_token': self._path_with_token, 684 'buffer': self._buffer, 685 'buffered': self._buffered, 686 'written': self._written, 687 'offset': self._offset, 688 'closed': self.closed} 689 690 def __setstate__(self, state): 691 """Restore state as part of deserialization/unpickling. 692 693 Args: 694 state: the dictionary from a __getstate__ call 695 """ 696 self._api = state['api'] 697 self._path_with_token = state['path_token'] 698 self._buffer = state['buffer'] 699 self._buffered = state['buffered'] 700 self._written = state['written'] 701 self._offset = state['offset'] 702 self.closed = state['closed'] 703 self._path = state['path'] 704 self.name = api_utils._unquote_filename(self._path) 705 706 def write(self, data): 707 """Write some bytes. 708 709 Args: 710 data: data to write. str. 711 712 Raises: 713 TypeError: if data is not of type str. 714 """ 715 self._check_open() 716 if not isinstance(data, str): 717 raise TypeError('Expected str but got %s.' % type(data)) 718 if not data: 719 return 720 self._buffer.append(data) 721 self._buffered += len(data) 722 self._offset += len(data) 723 if self._buffered >= self._flushsize: 724 self._flush() 725 726 def flush(self): 727 """Flush as much as possible to GCS. 728 729 GCS *requires* that all writes except for the final one align on 730 256KB boundaries. So the internal buffer may still have < 256KB bytes left 731 after flush. 732 """ 733 self._check_open() 734 self._flush(finish=False) 735 736 def tell(self): 737 """Return the total number of bytes passed to write() so far. 738 739 (There is no seek() method.) 740 """ 741 return self._offset 742 743 def close(self): 744 """Flush the buffer and finalize the file. 745 746 When this returns the new file is available for reading. 747 """ 748 if not self.closed: 749 self.closed = True 750 self._flush(finish=True) 751 self._buffer = None 752 753 def __enter__(self): 754 return self 755 756 def __exit__(self, atype, value, traceback): 757 self.close() 758 return False 759 760 def _flush(self, finish=False): 761 """Internal API to flush. 762 763 Buffer is flushed to GCS only when the total amount of buffered data is at 764 least self._blocksize, or to flush the final (incomplete) block of 765 the file with finish=True. 766 """ 767 while ((finish and self._buffered >= 0) or 768 (not finish and self._buffered >= self._blocksize)): 769 tmp_buffer = [] 770 tmp_buffer_len = 0 771 772 excess = 0 773 while self._buffer: 774 buf = self._buffer.popleft() 775 size = len(buf) 776 self._buffered -= size 777 tmp_buffer.append(buf) 778 tmp_buffer_len += size 779 if tmp_buffer_len >= self._maxrequestsize: 780 excess = tmp_buffer_len - self._maxrequestsize 781 break 782 if not finish and ( 783 tmp_buffer_len % self._blocksize + self._buffered < 784 self._blocksize): 785 excess = tmp_buffer_len % self._blocksize 786 break 787 788 if excess: 789 over = tmp_buffer.pop() 790 size = len(over) 791 assert size >= excess 792 tmp_buffer_len -= size 793 head, tail = over[:-excess], over[-excess:] 794 self._buffer.appendleft(tail) 795 self._buffered += len(tail) 796 if head: 797 tmp_buffer.append(head) 798 tmp_buffer_len += len(head) 799 800 data = ''.join(tmp_buffer) 801 file_len = '*' 802 if finish and not self._buffered: 803 file_len = self._written + len(data) 804 self._send_data(data, self._written, file_len) 805 self._written += len(data) 806 if file_len != '*': 807 break 808 809 def _send_data(self, data, start_offset, file_len): 810 """Send the block to the storage service. 811 812 This is a utility method that does not modify self. 813 814 Args: 815 data: data to send in str. 816 start_offset: start offset of the data in relation to the file. 817 file_len: an int if this is the last data to append to the file. 818 Otherwise '*'. 819 """ 820 headers = {} 821 end_offset = start_offset + len(data) - 1 822 823 if data: 824 headers['content-range'] = ('bytes %d-%d/%s' % 825 (start_offset, end_offset, file_len)) 826 else: 827 headers['content-range'] = ('bytes */%s' % file_len) 828 829 status, response_headers, content = self._api.put_object( 830 self._path_with_token, payload=data, headers=headers) 831 if file_len == '*': 832 expected = 308 833 else: 834 expected = 200 835 errors.check_status(status, [expected], self._path, headers, 836 response_headers, content, 837 {'upload_path': self._path_with_token}) 838 839 def _get_offset_from_gcs(self): 840 """Get the last offset that has been written to GCS. 841 842 This is a utility method that does not modify self. 843 844 Returns: 845 an int of the last offset written to GCS by this upload, inclusive. 846 -1 means nothing has been written. 847 """ 848 headers = {'content-range': 'bytes */*'} 849 status, response_headers, content = self._api.put_object( 850 self._path_with_token, headers=headers) 851 errors.check_status(status, [308], self._path, headers, 852 response_headers, content, 853 {'upload_path': self._path_with_token}) 854 val = response_headers.get('range') 855 if val is None: 856 return -1 857 _, offset = val.rsplit('-', 1) 858 return int(offset) 859 860 def _force_close(self, file_length=None): 861 """Close this buffer on file_length. 862 863 Finalize this upload immediately on file_length. 864 Contents that are still in memory will not be uploaded. 865 866 This is a utility method that does not modify self. 867 868 Args: 869 file_length: file length. Must match what has been uploaded. If None, 870 it will be queried from GCS. 871 """ 872 if file_length is None: 873 file_length = self._get_offset_from_gcs() + 1 874 self._send_data('', 0, file_length) 875 876 def _check_open(self): 877 if self.closed: 878 raise IOError('Buffer is closed.') 879 880 def seekable(self): 881 return False 882 883 def readable(self): 884 return False 885 886 def writable(self): 887 return True 888