1#!/usr/bin/env python 2"""Upload and download support for apitools.""" 3from __future__ import print_function 4 5import email.generator as email_generator 6import email.mime.multipart as mime_multipart 7import email.mime.nonmultipart as mime_nonmultipart 8import io 9import json 10import mimetypes 11import os 12import threading 13 14import six 15from six.moves import http_client 16 17from apitools.base.py import buffered_stream 18from apitools.base.py import exceptions 19from apitools.base.py import http_wrapper 20from apitools.base.py import stream_slice 21from apitools.base.py import util 22 23__all__ = [ 24 'Download', 25 'Upload', 26 'RESUMABLE_UPLOAD', 27 'SIMPLE_UPLOAD', 28 'DownloadProgressPrinter', 29 'DownloadCompletePrinter', 30 'UploadProgressPrinter', 31 'UploadCompletePrinter', 32] 33 34_RESUMABLE_UPLOAD_THRESHOLD = 5 << 20 35SIMPLE_UPLOAD = 'simple' 36RESUMABLE_UPLOAD = 'resumable' 37 38 39def DownloadProgressPrinter(response, unused_download): 40 """Print download progress based on response.""" 41 if 'content-range' in response.info: 42 print('Received %s' % response.info['content-range']) 43 else: 44 print('Received %d bytes' % response.length) 45 46 47def DownloadCompletePrinter(unused_response, unused_download): 48 """Print information about a completed download.""" 49 print('Download complete') 50 51 52def UploadProgressPrinter(response, unused_upload): 53 """Print upload progress based on response.""" 54 print('Sent %s' % response.info['range']) 55 56 57def UploadCompletePrinter(unused_response, unused_upload): 58 """Print information about a completed upload.""" 59 print('Upload complete') 60 61 62class _Transfer(object): 63 64 """Generic bits common to Uploads and Downloads.""" 65 66 def __init__(self, stream, close_stream=False, chunksize=None, 67 auto_transfer=True, http=None, num_retries=5): 68 self.__bytes_http = None 69 self.__close_stream = close_stream 70 self.__http = http 71 self.__stream = stream 72 self.__url = None 73 74 self.__num_retries = 5 75 # Let the @property do validation 76 self.num_retries = num_retries 77 78 self.retry_func = ( 79 http_wrapper.HandleExceptionsAndRebuildHttpConnections) 80 self.auto_transfer = auto_transfer 81 self.chunksize = chunksize or 1048576 82 83 def __repr__(self): 84 return str(self) 85 86 @property 87 def close_stream(self): 88 return self.__close_stream 89 90 @property 91 def http(self): 92 return self.__http 93 94 @property 95 def bytes_http(self): 96 return self.__bytes_http or self.http 97 98 @bytes_http.setter 99 def bytes_http(self, value): 100 self.__bytes_http = value 101 102 @property 103 def num_retries(self): 104 return self.__num_retries 105 106 @num_retries.setter 107 def num_retries(self, value): 108 util.Typecheck(value, six.integer_types) 109 if value < 0: 110 raise exceptions.InvalidDataError( 111 'Cannot have negative value for num_retries') 112 self.__num_retries = value 113 114 @property 115 def stream(self): 116 return self.__stream 117 118 @property 119 def url(self): 120 return self.__url 121 122 def _Initialize(self, http, url): 123 """Initialize this download by setting self.http and self.url. 124 125 We want the user to be able to override self.http by having set 126 the value in the constructor; in that case, we ignore the provided 127 http. 128 129 Args: 130 http: An httplib2.Http instance or None. 131 url: The url for this transfer. 132 133 Returns: 134 None. Initializes self. 135 """ 136 self.EnsureUninitialized() 137 if self.http is None: 138 self.__http = http or http_wrapper.GetHttp() 139 self.__url = url 140 141 @property 142 def initialized(self): 143 return self.url is not None and self.http is not None 144 145 @property 146 def _type_name(self): 147 return type(self).__name__ 148 149 def EnsureInitialized(self): 150 if not self.initialized: 151 raise exceptions.TransferInvalidError( 152 'Cannot use uninitialized %s', self._type_name) 153 154 def EnsureUninitialized(self): 155 if self.initialized: 156 raise exceptions.TransferInvalidError( 157 'Cannot re-initialize %s', self._type_name) 158 159 def __del__(self): 160 if self.__close_stream: 161 self.__stream.close() 162 163 def _ExecuteCallback(self, callback, response): 164 # TODO(craigcitro): Push these into a queue. 165 if callback is not None: 166 threading.Thread(target=callback, args=(response, self)).start() 167 168 169class Download(_Transfer): 170 171 """Data for a single download. 172 173 Public attributes: 174 chunksize: default chunksize to use for transfers. 175 """ 176 _ACCEPTABLE_STATUSES = set(( 177 http_client.OK, 178 http_client.NO_CONTENT, 179 http_client.PARTIAL_CONTENT, 180 http_client.REQUESTED_RANGE_NOT_SATISFIABLE, 181 )) 182 _REQUIRED_SERIALIZATION_KEYS = set(( 183 'auto_transfer', 'progress', 'total_size', 'url')) 184 185 def __init__(self, stream, progress_callback=None, finish_callback=None, 186 **kwds): 187 total_size = kwds.pop('total_size', None) 188 super(Download, self).__init__(stream, **kwds) 189 self.__initial_response = None 190 self.__progress = 0 191 self.__total_size = total_size 192 self.__encoding = None 193 194 self.progress_callback = progress_callback 195 self.finish_callback = finish_callback 196 197 @property 198 def progress(self): 199 return self.__progress 200 201 @property 202 def encoding(self): 203 return self.__encoding 204 205 @classmethod 206 def FromFile(cls, filename, overwrite=False, auto_transfer=True, **kwds): 207 """Create a new download object from a filename.""" 208 path = os.path.expanduser(filename) 209 if os.path.exists(path) and not overwrite: 210 raise exceptions.InvalidUserInputError( 211 'File %s exists and overwrite not specified' % path) 212 return cls(open(path, 'wb'), close_stream=True, 213 auto_transfer=auto_transfer, **kwds) 214 215 @classmethod 216 def FromStream(cls, stream, auto_transfer=True, total_size=None, **kwds): 217 """Create a new Download object from a stream.""" 218 return cls(stream, auto_transfer=auto_transfer, total_size=total_size, 219 **kwds) 220 221 @classmethod 222 def FromData(cls, stream, json_data, http=None, auto_transfer=None, 223 **kwds): 224 """Create a new Download object from a stream and serialized data.""" 225 info = json.loads(json_data) 226 missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys()) 227 if missing_keys: 228 raise exceptions.InvalidDataError( 229 'Invalid serialization data, missing keys: %s' % ( 230 ', '.join(missing_keys))) 231 download = cls.FromStream(stream, **kwds) 232 if auto_transfer is not None: 233 download.auto_transfer = auto_transfer 234 else: 235 download.auto_transfer = info['auto_transfer'] 236 setattr(download, '_Download__progress', info['progress']) 237 setattr(download, '_Download__total_size', info['total_size']) 238 download._Initialize( # pylint: disable=protected-access 239 http, info['url']) 240 return download 241 242 @property 243 def serialization_data(self): 244 self.EnsureInitialized() 245 return { 246 'auto_transfer': self.auto_transfer, 247 'progress': self.progress, 248 'total_size': self.total_size, 249 'url': self.url, 250 } 251 252 @property 253 def total_size(self): 254 return self.__total_size 255 256 def __str__(self): 257 if not self.initialized: 258 return 'Download (uninitialized)' 259 else: 260 return 'Download with %d/%s bytes transferred from url %s' % ( 261 self.progress, self.total_size, self.url) 262 263 def ConfigureRequest(self, http_request, url_builder): 264 url_builder.query_params['alt'] = 'media' 265 # TODO(craigcitro): We need to send range requests because by 266 # default httplib2 stores entire reponses in memory. Override 267 # httplib2's download method (as gsutil does) so that this is not 268 # necessary. 269 http_request.headers['Range'] = 'bytes=0-%d' % (self.chunksize - 1,) 270 271 def __SetTotal(self, info): 272 if 'content-range' in info: 273 _, _, total = info['content-range'].rpartition('/') 274 if total != '*': 275 self.__total_size = int(total) 276 # Note "total_size is None" means we don't know it; if no size 277 # info was returned on our initial range request, that means we 278 # have a 0-byte file. (That last statement has been verified 279 # empirically, but is not clearly documented anywhere.) 280 if self.total_size is None: 281 self.__total_size = 0 282 283 def InitializeDownload(self, http_request, http=None, client=None): 284 """Initialize this download by making a request. 285 286 Args: 287 http_request: The HttpRequest to use to initialize this download. 288 http: The httplib2.Http instance for this request. 289 client: If provided, let this client process the final URL before 290 sending any additional requests. If client is provided and 291 http is not, client.http will be used instead. 292 """ 293 self.EnsureUninitialized() 294 if http is None and client is None: 295 raise exceptions.UserError('Must provide client or http.') 296 http = http or client.http 297 if client is not None: 298 http_request.url = client.FinalizeTransferUrl(http_request.url) 299 url = http_request.url 300 if self.auto_transfer: 301 end_byte = self.__ComputeEndByte(0) 302 self.__SetRangeHeader(http_request, 0, end_byte) 303 response = http_wrapper.MakeRequest( 304 self.bytes_http or http, http_request) 305 if response.status_code not in self._ACCEPTABLE_STATUSES: 306 raise exceptions.HttpError.FromResponse(response) 307 self.__initial_response = response 308 self.__SetTotal(response.info) 309 url = response.info.get('content-location', response.request_url) 310 if client is not None: 311 url = client.FinalizeTransferUrl(url) 312 self._Initialize(http, url) 313 # Unless the user has requested otherwise, we want to just 314 # go ahead and pump the bytes now. 315 if self.auto_transfer: 316 self.StreamInChunks() 317 318 def __NormalizeStartEnd(self, start, end=None): 319 if end is not None: 320 if start < 0: 321 raise exceptions.TransferInvalidError( 322 'Cannot have end index with negative start index') 323 elif start >= self.total_size: 324 raise exceptions.TransferInvalidError( 325 'Cannot have start index greater than total size') 326 end = min(end, self.total_size - 1) 327 if end < start: 328 raise exceptions.TransferInvalidError( 329 'Range requested with end[%s] < start[%s]' % (end, start)) 330 return start, end 331 else: 332 if start < 0: 333 start = max(0, start + self.total_size) 334 return start, self.total_size - 1 335 336 def __SetRangeHeader(self, request, start, end=None): 337 if start < 0: 338 request.headers['range'] = 'bytes=%d' % start 339 elif end is None: 340 request.headers['range'] = 'bytes=%d-' % start 341 else: 342 request.headers['range'] = 'bytes=%d-%d' % (start, end) 343 344 def __ComputeEndByte(self, start, end=None, use_chunks=True): 345 """Compute the last byte to fetch for this request. 346 347 This is all based on the HTTP spec for Range and 348 Content-Range. 349 350 Note that this is potentially confusing in several ways: 351 * the value for the last byte is 0-based, eg "fetch 10 bytes 352 from the beginning" would return 9 here. 353 * if we have no information about size, and don't want to 354 use the chunksize, we'll return None. 355 See the tests for more examples. 356 357 Args: 358 start: byte to start at. 359 end: (int or None, default: None) Suggested last byte. 360 use_chunks: (bool, default: True) If False, ignore self.chunksize. 361 362 Returns: 363 Last byte to use in a Range header, or None. 364 365 """ 366 end_byte = end 367 368 if start < 0 and not self.total_size: 369 return end_byte 370 371 if use_chunks: 372 alternate = start + self.chunksize - 1 373 if end_byte is not None: 374 end_byte = min(end_byte, alternate) 375 else: 376 end_byte = alternate 377 378 if self.total_size: 379 alternate = self.total_size - 1 380 if end_byte is not None: 381 end_byte = min(end_byte, alternate) 382 else: 383 end_byte = alternate 384 385 return end_byte 386 387 def __GetChunk(self, start, end, additional_headers=None): 388 """Retrieve a chunk, and return the full response.""" 389 self.EnsureInitialized() 390 request = http_wrapper.Request(url=self.url) 391 self.__SetRangeHeader(request, start, end=end) 392 if additional_headers is not None: 393 request.headers.update(additional_headers) 394 return http_wrapper.MakeRequest( 395 self.bytes_http, request, retry_func=self.retry_func, 396 retries=self.num_retries) 397 398 def __ProcessResponse(self, response): 399 """Process response (by updating self and writing to self.stream).""" 400 if response.status_code not in self._ACCEPTABLE_STATUSES: 401 # We distinguish errors that mean we made a mistake in setting 402 # up the transfer versus something we should attempt again. 403 if response.status_code in (http_client.FORBIDDEN, 404 http_client.NOT_FOUND): 405 raise exceptions.HttpError.FromResponse(response) 406 else: 407 raise exceptions.TransferRetryError(response.content) 408 if response.status_code in (http_client.OK, 409 http_client.PARTIAL_CONTENT): 410 self.stream.write(response.content) 411 self.__progress += response.length 412 if response.info and 'content-encoding' in response.info: 413 # TODO(craigcitro): Handle the case where this changes over a 414 # download. 415 self.__encoding = response.info['content-encoding'] 416 elif response.status_code == http_client.NO_CONTENT: 417 # It's important to write something to the stream for the case 418 # of a 0-byte download to a file, as otherwise python won't 419 # create the file. 420 self.stream.write('') 421 return response 422 423 def GetRange(self, start, end=None, additional_headers=None, 424 use_chunks=True): 425 """Retrieve a given byte range from this download, inclusive. 426 427 Range must be of one of these three forms: 428 * 0 <= start, end = None: Fetch from start to the end of the file. 429 * 0 <= start <= end: Fetch the bytes from start to end. 430 * start < 0, end = None: Fetch the last -start bytes of the file. 431 432 (These variations correspond to those described in the HTTP 1.1 433 protocol for range headers in RFC 2616, sec. 14.35.1.) 434 435 Args: 436 start: (int) Where to start fetching bytes. (See above.) 437 end: (int, optional) Where to stop fetching bytes. (See above.) 438 additional_headers: (bool, optional) Any additional headers to 439 pass with the request. 440 use_chunks: (bool, default: True) If False, ignore self.chunksize 441 and fetch this range in a single request. 442 443 Returns: 444 None. Streams bytes into self.stream. 445 """ 446 self.EnsureInitialized() 447 progress_end_normalized = False 448 if self.total_size is not None: 449 progress, end_byte = self.__NormalizeStartEnd(start, end) 450 progress_end_normalized = True 451 else: 452 progress = start 453 end_byte = end 454 while (not progress_end_normalized or end_byte is None or 455 progress <= end_byte): 456 end_byte = self.__ComputeEndByte(progress, end=end_byte, 457 use_chunks=use_chunks) 458 response = self.__GetChunk(progress, end_byte, 459 additional_headers=additional_headers) 460 if not progress_end_normalized: 461 self.__SetTotal(response.info) 462 progress, end_byte = self.__NormalizeStartEnd(start, end) 463 progress_end_normalized = True 464 response = self.__ProcessResponse(response) 465 progress += response.length 466 if response.length == 0: 467 raise exceptions.TransferRetryError( 468 'Zero bytes unexpectedly returned in download response') 469 470 def StreamInChunks(self, callback=None, finish_callback=None, 471 additional_headers=None): 472 """Stream the entire download in chunks.""" 473 self.StreamMedia(callback=callback, finish_callback=finish_callback, 474 additional_headers=additional_headers, 475 use_chunks=True) 476 477 def StreamMedia(self, callback=None, finish_callback=None, 478 additional_headers=None, use_chunks=True): 479 """Stream the entire download. 480 481 Args: 482 callback: (default: None) Callback to call as each chunk is 483 completed. 484 finish_callback: (default: None) Callback to call when the 485 download is complete. 486 additional_headers: (default: None) Additional headers to 487 include in fetching bytes. 488 use_chunks: (bool, default: True) If False, ignore self.chunksize 489 and stream this download in a single request. 490 491 Returns: 492 None. Streams bytes into self.stream. 493 """ 494 callback = callback or self.progress_callback 495 finish_callback = finish_callback or self.finish_callback 496 497 self.EnsureInitialized() 498 while True: 499 if self.__initial_response is not None: 500 response = self.__initial_response 501 self.__initial_response = None 502 else: 503 end_byte = self.__ComputeEndByte(self.progress, 504 use_chunks=use_chunks) 505 response = self.__GetChunk( 506 self.progress, end_byte, 507 additional_headers=additional_headers) 508 if self.total_size is None: 509 self.__SetTotal(response.info) 510 response = self.__ProcessResponse(response) 511 self._ExecuteCallback(callback, response) 512 if (response.status_code == http_client.OK or 513 self.progress >= self.total_size): 514 break 515 self._ExecuteCallback(finish_callback, response) 516 517 518class Upload(_Transfer): 519 520 """Data for a single Upload. 521 522 Fields: 523 stream: The stream to upload. 524 mime_type: MIME type of the upload. 525 total_size: (optional) Total upload size for the stream. 526 close_stream: (default: False) Whether or not we should close the 527 stream when finished with the upload. 528 auto_transfer: (default: True) If True, stream all bytes as soon as 529 the upload is created. 530 """ 531 _REQUIRED_SERIALIZATION_KEYS = set(( 532 'auto_transfer', 'mime_type', 'total_size', 'url')) 533 534 def __init__(self, stream, mime_type, total_size=None, http=None, 535 close_stream=False, chunksize=None, auto_transfer=True, 536 progress_callback=None, finish_callback=None, 537 **kwds): 538 super(Upload, self).__init__( 539 stream, close_stream=close_stream, chunksize=chunksize, 540 auto_transfer=auto_transfer, http=http, **kwds) 541 self.__complete = False 542 self.__final_response = None 543 self.__mime_type = mime_type 544 self.__progress = 0 545 self.__server_chunk_granularity = None 546 self.__strategy = None 547 self.__total_size = None 548 549 self.progress_callback = progress_callback 550 self.finish_callback = finish_callback 551 self.total_size = total_size 552 553 @property 554 def progress(self): 555 return self.__progress 556 557 @classmethod 558 def FromFile(cls, filename, mime_type=None, auto_transfer=True, **kwds): 559 """Create a new Upload object from a filename.""" 560 path = os.path.expanduser(filename) 561 if not os.path.exists(path): 562 raise exceptions.NotFoundError('Could not find file %s' % path) 563 if not mime_type: 564 mime_type, _ = mimetypes.guess_type(path) 565 if mime_type is None: 566 raise exceptions.InvalidUserInputError( 567 'Could not guess mime type for %s' % path) 568 size = os.stat(path).st_size 569 return cls(open(path, 'rb'), mime_type, total_size=size, 570 close_stream=True, auto_transfer=auto_transfer, **kwds) 571 572 @classmethod 573 def FromStream(cls, stream, mime_type, total_size=None, auto_transfer=True, 574 **kwds): 575 """Create a new Upload object from a stream.""" 576 if mime_type is None: 577 raise exceptions.InvalidUserInputError( 578 'No mime_type specified for stream') 579 return cls(stream, mime_type, total_size=total_size, 580 close_stream=False, auto_transfer=auto_transfer, **kwds) 581 582 @classmethod 583 def FromData(cls, stream, json_data, http, auto_transfer=None, **kwds): 584 """Create a new Upload of stream from serialized json_data and http.""" 585 info = json.loads(json_data) 586 missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys()) 587 if missing_keys: 588 raise exceptions.InvalidDataError( 589 'Invalid serialization data, missing keys: %s' % ( 590 ', '.join(missing_keys))) 591 if 'total_size' in kwds: 592 raise exceptions.InvalidUserInputError( 593 'Cannot override total_size on serialized Upload') 594 upload = cls.FromStream(stream, info['mime_type'], 595 total_size=info.get('total_size'), **kwds) 596 if isinstance(stream, io.IOBase) and not stream.seekable(): 597 raise exceptions.InvalidUserInputError( 598 'Cannot restart resumable upload on non-seekable stream') 599 if auto_transfer is not None: 600 upload.auto_transfer = auto_transfer 601 else: 602 upload.auto_transfer = info['auto_transfer'] 603 upload.strategy = RESUMABLE_UPLOAD 604 upload._Initialize( # pylint: disable=protected-access 605 http, info['url']) 606 upload.RefreshResumableUploadState() 607 upload.EnsureInitialized() 608 if upload.auto_transfer: 609 upload.StreamInChunks() 610 return upload 611 612 @property 613 def serialization_data(self): 614 self.EnsureInitialized() 615 if self.strategy != RESUMABLE_UPLOAD: 616 raise exceptions.InvalidDataError( 617 'Serialization only supported for resumable uploads') 618 return { 619 'auto_transfer': self.auto_transfer, 620 'mime_type': self.mime_type, 621 'total_size': self.total_size, 622 'url': self.url, 623 } 624 625 @property 626 def complete(self): 627 return self.__complete 628 629 @property 630 def mime_type(self): 631 return self.__mime_type 632 633 def __str__(self): 634 if not self.initialized: 635 return 'Upload (uninitialized)' 636 else: 637 return 'Upload with %d/%s bytes transferred for url %s' % ( 638 self.progress, self.total_size or '???', self.url) 639 640 @property 641 def strategy(self): 642 return self.__strategy 643 644 @strategy.setter 645 def strategy(self, value): 646 if value not in (SIMPLE_UPLOAD, RESUMABLE_UPLOAD): 647 raise exceptions.UserError(( 648 'Invalid value "%s" for upload strategy, must be one of ' 649 '"simple" or "resumable".') % value) 650 self.__strategy = value 651 652 @property 653 def total_size(self): 654 return self.__total_size 655 656 @total_size.setter 657 def total_size(self, value): 658 self.EnsureUninitialized() 659 self.__total_size = value 660 661 def __SetDefaultUploadStrategy(self, upload_config, http_request): 662 """Determine and set the default upload strategy for this upload. 663 664 We generally prefer simple or multipart, unless we're forced to 665 use resumable. This happens when any of (1) the upload is too 666 large, (2) the simple endpoint doesn't support multipart requests 667 and we have metadata, or (3) there is no simple upload endpoint. 668 669 Args: 670 upload_config: Configuration for the upload endpoint. 671 http_request: The associated http request. 672 673 Returns: 674 None. 675 """ 676 if upload_config.resumable_path is None: 677 self.strategy = SIMPLE_UPLOAD 678 if self.strategy is not None: 679 return 680 strategy = SIMPLE_UPLOAD 681 if (self.total_size is not None and 682 self.total_size > _RESUMABLE_UPLOAD_THRESHOLD): 683 strategy = RESUMABLE_UPLOAD 684 if http_request.body and not upload_config.simple_multipart: 685 strategy = RESUMABLE_UPLOAD 686 if not upload_config.simple_path: 687 strategy = RESUMABLE_UPLOAD 688 self.strategy = strategy 689 690 def ConfigureRequest(self, upload_config, http_request, url_builder): 691 """Configure the request and url for this upload.""" 692 # Validate total_size vs. max_size 693 if (self.total_size and upload_config.max_size and 694 self.total_size > upload_config.max_size): 695 raise exceptions.InvalidUserInputError( 696 'Upload too big: %s larger than max size %s' % ( 697 self.total_size, upload_config.max_size)) 698 # Validate mime type 699 if not util.AcceptableMimeType(upload_config.accept, self.mime_type): 700 raise exceptions.InvalidUserInputError( 701 'MIME type %s does not match any accepted MIME ranges %s' % ( 702 self.mime_type, upload_config.accept)) 703 704 self.__SetDefaultUploadStrategy(upload_config, http_request) 705 if self.strategy == SIMPLE_UPLOAD: 706 url_builder.relative_path = upload_config.simple_path 707 if http_request.body: 708 url_builder.query_params['uploadType'] = 'multipart' 709 self.__ConfigureMultipartRequest(http_request) 710 else: 711 url_builder.query_params['uploadType'] = 'media' 712 self.__ConfigureMediaRequest(http_request) 713 else: 714 url_builder.relative_path = upload_config.resumable_path 715 url_builder.query_params['uploadType'] = 'resumable' 716 self.__ConfigureResumableRequest(http_request) 717 718 def __ConfigureMediaRequest(self, http_request): 719 """Configure http_request as a simple request for this upload.""" 720 http_request.headers['content-type'] = self.mime_type 721 http_request.body = self.stream.read() 722 http_request.loggable_body = '<media body>' 723 724 def __ConfigureMultipartRequest(self, http_request): 725 """Configure http_request as a multipart request for this upload.""" 726 # This is a multipart/related upload. 727 msg_root = mime_multipart.MIMEMultipart('related') 728 # msg_root should not write out its own headers 729 setattr(msg_root, '_write_headers', lambda self: None) 730 731 # attach the body as one part 732 msg = mime_nonmultipart.MIMENonMultipart( 733 *http_request.headers['content-type'].split('/')) 734 msg.set_payload(http_request.body) 735 msg_root.attach(msg) 736 737 # attach the media as the second part 738 msg = mime_nonmultipart.MIMENonMultipart(*self.mime_type.split('/')) 739 msg['Content-Transfer-Encoding'] = 'binary' 740 msg.set_payload(self.stream.read()) 741 msg_root.attach(msg) 742 743 # NOTE: We encode the body, but can't use 744 # `email.message.Message.as_string` because it prepends 745 # `> ` to `From ` lines. 746 # NOTE: We must use six.StringIO() instead of io.StringIO() since the 747 # `email` library uses cStringIO in Py2 and io.StringIO in Py3. 748 fp = six.StringIO() 749 g = email_generator.Generator(fp, mangle_from_=False) 750 g.flatten(msg_root, unixfrom=False) 751 http_request.body = fp.getvalue() 752 753 multipart_boundary = msg_root.get_boundary() 754 http_request.headers['content-type'] = ( 755 'multipart/related; boundary=%r' % multipart_boundary) 756 757 body_components = http_request.body.split(multipart_boundary) 758 headers, _, _ = body_components[-2].partition('\n\n') 759 body_components[-2] = '\n\n'.join([headers, '<media body>\n\n--']) 760 http_request.loggable_body = multipart_boundary.join(body_components) 761 762 def __ConfigureResumableRequest(self, http_request): 763 http_request.headers['X-Upload-Content-Type'] = self.mime_type 764 if self.total_size is not None: 765 http_request.headers[ 766 'X-Upload-Content-Length'] = str(self.total_size) 767 768 def RefreshResumableUploadState(self): 769 """Talk to the server and refresh the state of this resumable upload. 770 771 Returns: 772 Response if the upload is complete. 773 """ 774 if self.strategy != RESUMABLE_UPLOAD: 775 return 776 self.EnsureInitialized() 777 refresh_request = http_wrapper.Request( 778 url=self.url, http_method='PUT', 779 headers={'Content-Range': 'bytes */*'}) 780 refresh_response = http_wrapper.MakeRequest( 781 self.http, refresh_request, redirections=0, 782 retries=self.num_retries) 783 range_header = self._GetRangeHeaderFromResponse(refresh_response) 784 if refresh_response.status_code in (http_client.OK, 785 http_client.CREATED): 786 self.__complete = True 787 self.__progress = self.total_size 788 self.stream.seek(self.progress) 789 # If we're finished, the refresh response will contain the metadata 790 # originally requested. Cache it so it can be returned in 791 # StreamInChunks. 792 self.__final_response = refresh_response 793 elif refresh_response.status_code == http_wrapper.RESUME_INCOMPLETE: 794 if range_header is None: 795 self.__progress = 0 796 else: 797 self.__progress = self.__GetLastByte(range_header) + 1 798 self.stream.seek(self.progress) 799 else: 800 raise exceptions.HttpError.FromResponse(refresh_response) 801 802 def _GetRangeHeaderFromResponse(self, response): 803 return response.info.get('Range', response.info.get('range')) 804 805 def InitializeUpload(self, http_request, http=None, client=None): 806 """Initialize this upload from the given http_request.""" 807 if self.strategy is None: 808 raise exceptions.UserError( 809 'No upload strategy set; did you call ConfigureRequest?') 810 if http is None and client is None: 811 raise exceptions.UserError('Must provide client or http.') 812 if self.strategy != RESUMABLE_UPLOAD: 813 return 814 http = http or client.http 815 if client is not None: 816 http_request.url = client.FinalizeTransferUrl(http_request.url) 817 self.EnsureUninitialized() 818 http_response = http_wrapper.MakeRequest(http, http_request, 819 retries=self.num_retries) 820 if http_response.status_code != http_client.OK: 821 raise exceptions.HttpError.FromResponse(http_response) 822 823 self.__server_chunk_granularity = http_response.info.get( 824 'X-Goog-Upload-Chunk-Granularity') 825 url = http_response.info['location'] 826 if client is not None: 827 url = client.FinalizeTransferUrl(url) 828 self._Initialize(http, url) 829 830 # Unless the user has requested otherwise, we want to just 831 # go ahead and pump the bytes now. 832 if self.auto_transfer: 833 return self.StreamInChunks() 834 835 def __GetLastByte(self, range_header): 836 _, _, end = range_header.partition('-') 837 # TODO(craigcitro): Validate start == 0? 838 return int(end) 839 840 def __ValidateChunksize(self, chunksize=None): 841 if self.__server_chunk_granularity is None: 842 return 843 chunksize = chunksize or self.chunksize 844 if chunksize % self.__server_chunk_granularity: 845 raise exceptions.ConfigurationValueError( 846 'Server requires chunksize to be a multiple of %d', 847 self.__server_chunk_granularity) 848 849 def __StreamMedia(self, callback=None, finish_callback=None, 850 additional_headers=None, use_chunks=True): 851 """Helper function for StreamMedia / StreamInChunks.""" 852 if self.strategy != RESUMABLE_UPLOAD: 853 raise exceptions.InvalidUserInputError( 854 'Cannot stream non-resumable upload') 855 callback = callback or self.progress_callback 856 finish_callback = finish_callback or self.finish_callback 857 # final_response is set if we resumed an already-completed upload. 858 response = self.__final_response 859 send_func = self.__SendChunk if use_chunks else self.__SendMediaBody 860 if use_chunks: 861 self.__ValidateChunksize(self.chunksize) 862 self.EnsureInitialized() 863 while not self.complete: 864 response = send_func(self.stream.tell(), 865 additional_headers=additional_headers) 866 if response.status_code in (http_client.OK, http_client.CREATED): 867 self.__complete = True 868 break 869 self.__progress = self.__GetLastByte(response.info['range']) 870 if self.progress + 1 != self.stream.tell(): 871 # TODO(craigcitro): Add a better way to recover here. 872 raise exceptions.CommunicationError( 873 'Failed to transfer all bytes in chunk, upload paused at ' 874 'byte %d' % self.progress) 875 self._ExecuteCallback(callback, response) 876 if self.__complete and hasattr(self.stream, 'seek'): 877 current_pos = self.stream.tell() 878 self.stream.seek(0, os.SEEK_END) 879 end_pos = self.stream.tell() 880 self.stream.seek(current_pos) 881 if current_pos != end_pos: 882 raise exceptions.TransferInvalidError( 883 'Upload complete with %s additional bytes left in stream' % 884 (int(end_pos) - int(current_pos))) 885 self._ExecuteCallback(finish_callback, response) 886 return response 887 888 def StreamMedia(self, callback=None, finish_callback=None, 889 additional_headers=None): 890 """Send this resumable upload in a single request. 891 892 Args: 893 callback: Progress callback function with inputs 894 (http_wrapper.Response, transfer.Upload) 895 finish_callback: Final callback function with inputs 896 (http_wrapper.Response, transfer.Upload) 897 additional_headers: Dict of headers to include with the upload 898 http_wrapper.Request. 899 900 Returns: 901 http_wrapper.Response of final response. 902 """ 903 return self.__StreamMedia( 904 callback=callback, finish_callback=finish_callback, 905 additional_headers=additional_headers, use_chunks=False) 906 907 def StreamInChunks(self, callback=None, finish_callback=None, 908 additional_headers=None): 909 """Send this (resumable) upload in chunks.""" 910 return self.__StreamMedia( 911 callback=callback, finish_callback=finish_callback, 912 additional_headers=additional_headers) 913 914 def __SendMediaRequest(self, request, end): 915 """Request helper function for SendMediaBody & SendChunk.""" 916 response = http_wrapper.MakeRequest( 917 self.bytes_http, request, retry_func=self.retry_func, 918 retries=self.num_retries) 919 if response.status_code not in (http_client.OK, http_client.CREATED, 920 http_wrapper.RESUME_INCOMPLETE): 921 # We want to reset our state to wherever the server left us 922 # before this failed request, and then raise. 923 self.RefreshResumableUploadState() 924 raise exceptions.HttpError.FromResponse(response) 925 if response.status_code == http_wrapper.RESUME_INCOMPLETE: 926 last_byte = self.__GetLastByte( 927 self._GetRangeHeaderFromResponse(response)) 928 if last_byte + 1 != end: 929 self.stream.seek(last_byte) 930 return response 931 932 def __SendMediaBody(self, start, additional_headers=None): 933 """Send the entire media stream in a single request.""" 934 self.EnsureInitialized() 935 if self.total_size is None: 936 raise exceptions.TransferInvalidError( 937 'Total size must be known for SendMediaBody') 938 body_stream = stream_slice.StreamSlice( 939 self.stream, self.total_size - start) 940 941 request = http_wrapper.Request(url=self.url, http_method='PUT', 942 body=body_stream) 943 request.headers['Content-Type'] = self.mime_type 944 if start == self.total_size: 945 # End of an upload with 0 bytes left to send; just finalize. 946 range_string = 'bytes */%s' % self.total_size 947 else: 948 range_string = 'bytes %s-%s/%s' % (start, self.total_size - 1, 949 self.total_size) 950 951 request.headers['Content-Range'] = range_string 952 if additional_headers: 953 request.headers.update(additional_headers) 954 955 return self.__SendMediaRequest(request, self.total_size) 956 957 def __SendChunk(self, start, additional_headers=None): 958 """Send the specified chunk.""" 959 self.EnsureInitialized() 960 no_log_body = self.total_size is None 961 if self.total_size is None: 962 # For the streaming resumable case, we need to detect when 963 # we're at the end of the stream. 964 body_stream = buffered_stream.BufferedStream( 965 self.stream, start, self.chunksize) 966 end = body_stream.stream_end_position 967 if body_stream.stream_exhausted: 968 self.__total_size = end 969 # TODO: Here, change body_stream from a stream to a string object, 970 # which means reading a chunk into memory. This works around 971 # https://code.google.com/p/httplib2/issues/detail?id=176 which can 972 # cause httplib2 to skip bytes on 401's for file objects. 973 # Rework this solution to be more general. 974 body_stream = body_stream.read(self.chunksize) 975 else: 976 end = min(start + self.chunksize, self.total_size) 977 body_stream = stream_slice.StreamSlice(self.stream, end - start) 978 # TODO(craigcitro): Think about clearer errors on "no data in 979 # stream". 980 request = http_wrapper.Request(url=self.url, http_method='PUT', 981 body=body_stream) 982 request.headers['Content-Type'] = self.mime_type 983 if no_log_body: 984 # Disable logging of streaming body. 985 # TODO: Remove no_log_body and rework as part of a larger logs 986 # refactor. 987 request.loggable_body = '<media body>' 988 if self.total_size is None: 989 # Streaming resumable upload case, unknown total size. 990 range_string = 'bytes %s-%s/*' % (start, end - 1) 991 elif end == start: 992 # End of an upload with 0 bytes left to send; just finalize. 993 range_string = 'bytes */%s' % self.total_size 994 else: 995 # Normal resumable upload case with known sizes. 996 range_string = 'bytes %s-%s/%s' % (start, end - 1, self.total_size) 997 998 request.headers['Content-Range'] = range_string 999 if additional_headers: 1000 request.headers.update(additional_headers) 1001 1002 return self.__SendMediaRequest(request, end) 1003