1#!/usr/bin/env python
2#
3# Copyright 2015 Google Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Upload and download support for apitools."""
18from __future__ import print_function
19
20import email.generator as email_generator
21import email.mime.multipart as mime_multipart
22import email.mime.nonmultipart as mime_nonmultipart
23import io
24import json
25import mimetypes
26import os
27import threading
28
29import six
30from six.moves import http_client
31
32from apitools.base.py import buffered_stream
33from apitools.base.py import compression
34from apitools.base.py import exceptions
35from apitools.base.py import http_wrapper
36from apitools.base.py import stream_slice
37from apitools.base.py import util
38
39__all__ = [
40    'Download',
41    'Upload',
42    'RESUMABLE_UPLOAD',
43    'SIMPLE_UPLOAD',
44    'DownloadProgressPrinter',
45    'DownloadCompletePrinter',
46    'UploadProgressPrinter',
47    'UploadCompletePrinter',
48]
49
50_RESUMABLE_UPLOAD_THRESHOLD = 5 << 20
51SIMPLE_UPLOAD = 'simple'
52RESUMABLE_UPLOAD = 'resumable'
53
54
55def DownloadProgressPrinter(response, unused_download):
56    """Print download progress based on response."""
57    if 'content-range' in response.info:
58        print('Received %s' % response.info['content-range'])
59    else:
60        print('Received %d bytes' % response.length)
61
62
63def DownloadCompletePrinter(unused_response, unused_download):
64    """Print information about a completed download."""
65    print('Download complete')
66
67
68def UploadProgressPrinter(response, unused_upload):
69    """Print upload progress based on response."""
70    print('Sent %s' % response.info['range'])
71
72
73def UploadCompletePrinter(unused_response, unused_upload):
74    """Print information about a completed upload."""
75    print('Upload complete')
76
77
78class _Transfer(object):
79
80    """Generic bits common to Uploads and Downloads."""
81
82    def __init__(self, stream, close_stream=False, chunksize=None,
83                 auto_transfer=True, http=None, num_retries=5):
84        self.__bytes_http = None
85        self.__close_stream = close_stream
86        self.__http = http
87        self.__stream = stream
88        self.__url = None
89
90        self.__num_retries = 5
91        # Let the @property do validation
92        self.num_retries = num_retries
93
94        self.retry_func = (
95            http_wrapper.HandleExceptionsAndRebuildHttpConnections)
96        self.auto_transfer = auto_transfer
97        self.chunksize = chunksize or 1048576
98
99    def __repr__(self):
100        return str(self)
101
102    @property
103    def close_stream(self):
104        return self.__close_stream
105
106    @property
107    def http(self):
108        return self.__http
109
110    @property
111    def bytes_http(self):
112        return self.__bytes_http or self.http
113
114    @bytes_http.setter
115    def bytes_http(self, value):
116        self.__bytes_http = value
117
118    @property
119    def num_retries(self):
120        return self.__num_retries
121
122    @num_retries.setter
123    def num_retries(self, value):
124        util.Typecheck(value, six.integer_types)
125        if value < 0:
126            raise exceptions.InvalidDataError(
127                'Cannot have negative value for num_retries')
128        self.__num_retries = value
129
130    @property
131    def stream(self):
132        return self.__stream
133
134    @property
135    def url(self):
136        return self.__url
137
138    def _Initialize(self, http, url):
139        """Initialize this download by setting self.http and self.url.
140
141        We want the user to be able to override self.http by having set
142        the value in the constructor; in that case, we ignore the provided
143        http.
144
145        Args:
146          http: An httplib2.Http instance or None.
147          url: The url for this transfer.
148
149        Returns:
150          None. Initializes self.
151        """
152        self.EnsureUninitialized()
153        if self.http is None:
154            self.__http = http or http_wrapper.GetHttp()
155        self.__url = url
156
157    @property
158    def initialized(self):
159        return self.url is not None and self.http is not None
160
161    @property
162    def _type_name(self):
163        return type(self).__name__
164
165    def EnsureInitialized(self):
166        if not self.initialized:
167            raise exceptions.TransferInvalidError(
168                'Cannot use uninitialized %s' % self._type_name)
169
170    def EnsureUninitialized(self):
171        if self.initialized:
172            raise exceptions.TransferInvalidError(
173                'Cannot re-initialize %s' % self._type_name)
174
175    def __del__(self):
176        if self.__close_stream:
177            self.__stream.close()
178
179    def _ExecuteCallback(self, callback, response):
180        # TODO(craigcitro): Push these into a queue.
181        if callback is not None:
182            threading.Thread(target=callback, args=(response, self)).start()
183
184
185class Download(_Transfer):
186
187    """Data for a single download.
188
189    Public attributes:
190      chunksize: default chunksize to use for transfers.
191    """
192    _ACCEPTABLE_STATUSES = set((
193        http_client.OK,
194        http_client.NO_CONTENT,
195        http_client.PARTIAL_CONTENT,
196        http_client.REQUESTED_RANGE_NOT_SATISFIABLE,
197    ))
198    _REQUIRED_SERIALIZATION_KEYS = set((
199        'auto_transfer', 'progress', 'total_size', 'url'))
200
201    def __init__(self, stream, progress_callback=None, finish_callback=None,
202                 **kwds):
203        total_size = kwds.pop('total_size', None)
204        super(Download, self).__init__(stream, **kwds)
205        self.__initial_response = None
206        self.__progress = 0
207        self.__total_size = total_size
208        self.__encoding = None
209
210        self.progress_callback = progress_callback
211        self.finish_callback = finish_callback
212
213    @property
214    def progress(self):
215        return self.__progress
216
217    @property
218    def encoding(self):
219        return self.__encoding
220
221    @classmethod
222    def FromFile(cls, filename, overwrite=False, auto_transfer=True, **kwds):
223        """Create a new download object from a filename."""
224        path = os.path.expanduser(filename)
225        if os.path.exists(path) and not overwrite:
226            raise exceptions.InvalidUserInputError(
227                'File %s exists and overwrite not specified' % path)
228        return cls(open(path, 'wb'), close_stream=True,
229                   auto_transfer=auto_transfer, **kwds)
230
231    @classmethod
232    def FromStream(cls, stream, auto_transfer=True, total_size=None, **kwds):
233        """Create a new Download object from a stream."""
234        return cls(stream, auto_transfer=auto_transfer, total_size=total_size,
235                   **kwds)
236
237    @classmethod
238    def FromData(cls, stream, json_data, http=None, auto_transfer=None,
239                 **kwds):
240        """Create a new Download object from a stream and serialized data."""
241        info = json.loads(json_data)
242        missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys())
243        if missing_keys:
244            raise exceptions.InvalidDataError(
245                'Invalid serialization data, missing keys: %s' % (
246                    ', '.join(missing_keys)))
247        download = cls.FromStream(stream, **kwds)
248        if auto_transfer is not None:
249            download.auto_transfer = auto_transfer
250        else:
251            download.auto_transfer = info['auto_transfer']
252        setattr(download, '_Download__progress', info['progress'])
253        setattr(download, '_Download__total_size', info['total_size'])
254        download._Initialize(  # pylint: disable=protected-access
255            http, info['url'])
256        return download
257
258    @property
259    def serialization_data(self):
260        self.EnsureInitialized()
261        return {
262            'auto_transfer': self.auto_transfer,
263            'progress': self.progress,
264            'total_size': self.total_size,
265            'url': self.url,
266        }
267
268    @property
269    def total_size(self):
270        return self.__total_size
271
272    def __str__(self):
273        if not self.initialized:
274            return 'Download (uninitialized)'
275        return 'Download with %d/%s bytes transferred from url %s' % (
276            self.progress, self.total_size, self.url)
277
278    def ConfigureRequest(self, http_request, url_builder):
279        url_builder.query_params['alt'] = 'media'
280        # TODO(craigcitro): We need to send range requests because by
281        # default httplib2 stores entire reponses in memory. Override
282        # httplib2's download method (as gsutil does) so that this is not
283        # necessary.
284        http_request.headers['Range'] = 'bytes=0-%d' % (self.chunksize - 1,)
285
286    def __SetTotal(self, info):
287        """Sets the total size based off info if possible otherwise 0."""
288        if 'content-range' in info:
289            _, _, total = info['content-range'].rpartition('/')
290            if total != '*':
291                self.__total_size = int(total)
292        # Note "total_size is None" means we don't know it; if no size
293        # info was returned on our initial range request, that means we
294        # have a 0-byte file. (That last statement has been verified
295        # empirically, but is not clearly documented anywhere.)
296        if self.total_size is None:
297            self.__total_size = 0
298
299    def InitializeDownload(self, http_request, http=None, client=None):
300        """Initialize this download by making a request.
301
302        Args:
303          http_request: The HttpRequest to use to initialize this download.
304          http: The httplib2.Http instance for this request.
305          client: If provided, let this client process the final URL before
306              sending any additional requests. If client is provided and
307              http is not, client.http will be used instead.
308        """
309        self.EnsureUninitialized()
310        if http is None and client is None:
311            raise exceptions.UserError('Must provide client or http.')
312        http = http or client.http
313        if client is not None:
314            http_request.url = client.FinalizeTransferUrl(http_request.url)
315        url = http_request.url
316        if self.auto_transfer:
317            end_byte = self.__ComputeEndByte(0)
318            self.__SetRangeHeader(http_request, 0, end_byte)
319            response = http_wrapper.MakeRequest(
320                self.bytes_http or http, http_request)
321            if response.status_code not in self._ACCEPTABLE_STATUSES:
322                raise exceptions.HttpError.FromResponse(response)
323            self.__initial_response = response
324            self.__SetTotal(response.info)
325            url = response.info.get('content-location', response.request_url)
326        if client is not None:
327            url = client.FinalizeTransferUrl(url)
328        self._Initialize(http, url)
329        # Unless the user has requested otherwise, we want to just
330        # go ahead and pump the bytes now.
331        if self.auto_transfer:
332            self.StreamInChunks()
333
334    def __NormalizeStartEnd(self, start, end=None):
335        """Normalizes start and end values based on total size."""
336        if end is not None:
337            if start < 0:
338                raise exceptions.TransferInvalidError(
339                    'Cannot have end index with negative start index ' +
340                    '[start=%d, end=%d]' % (start, end))
341            elif start >= self.total_size:
342                raise exceptions.TransferInvalidError(
343                    'Cannot have start index greater than total size ' +
344                    '[start=%d, total_size=%d]' % (start, self.total_size))
345            end = min(end, self.total_size - 1)
346            if end < start:
347                raise exceptions.TransferInvalidError(
348                    'Range requested with end[%s] < start[%s]' % (end, start))
349            return start, end
350        else:
351            if start < 0:
352                start = max(0, start + self.total_size)
353            return start, self.total_size - 1
354
355    def __SetRangeHeader(self, request, start, end=None):
356        if start < 0:
357            request.headers['range'] = 'bytes=%d' % start
358        elif end is None or end < start:
359            request.headers['range'] = 'bytes=%d-' % start
360        else:
361            request.headers['range'] = 'bytes=%d-%d' % (start, end)
362
363    def __ComputeEndByte(self, start, end=None, use_chunks=True):
364        """Compute the last byte to fetch for this request.
365
366        This is all based on the HTTP spec for Range and
367        Content-Range.
368
369        Note that this is potentially confusing in several ways:
370          * the value for the last byte is 0-based, eg "fetch 10 bytes
371            from the beginning" would return 9 here.
372          * if we have no information about size, and don't want to
373            use the chunksize, we'll return None.
374        See the tests for more examples.
375
376        Args:
377          start: byte to start at.
378          end: (int or None, default: None) Suggested last byte.
379          use_chunks: (bool, default: True) If False, ignore self.chunksize.
380
381        Returns:
382          Last byte to use in a Range header, or None.
383
384        """
385        end_byte = end
386
387        if start < 0 and not self.total_size:
388            return end_byte
389
390        if use_chunks:
391            alternate = start + self.chunksize - 1
392            if end_byte is not None:
393                end_byte = min(end_byte, alternate)
394            else:
395                end_byte = alternate
396
397        if self.total_size:
398            alternate = self.total_size - 1
399            if end_byte is not None:
400                end_byte = min(end_byte, alternate)
401            else:
402                end_byte = alternate
403
404        return end_byte
405
406    def __GetChunk(self, start, end, additional_headers=None):
407        """Retrieve a chunk, and return the full response."""
408        self.EnsureInitialized()
409        request = http_wrapper.Request(url=self.url)
410        self.__SetRangeHeader(request, start, end=end)
411        if additional_headers is not None:
412            request.headers.update(additional_headers)
413        return http_wrapper.MakeRequest(
414            self.bytes_http, request, retry_func=self.retry_func,
415            retries=self.num_retries)
416
417    def __ProcessResponse(self, response):
418        """Process response (by updating self and writing to self.stream)."""
419        if response.status_code not in self._ACCEPTABLE_STATUSES:
420            # We distinguish errors that mean we made a mistake in setting
421            # up the transfer versus something we should attempt again.
422            if response.status_code in (http_client.FORBIDDEN,
423                                        http_client.NOT_FOUND):
424                raise exceptions.HttpError.FromResponse(response)
425            else:
426                raise exceptions.TransferRetryError(response.content)
427        if response.status_code in (http_client.OK,
428                                    http_client.PARTIAL_CONTENT):
429            try:
430                self.stream.write(six.ensure_binary(response.content))
431            except TypeError:
432                self.stream.write(six.ensure_text(response.content))
433            self.__progress += response.length
434            if response.info and 'content-encoding' in response.info:
435                # TODO(craigcitro): Handle the case where this changes over a
436                # download.
437                self.__encoding = response.info['content-encoding']
438        elif response.status_code == http_client.NO_CONTENT:
439            # It's important to write something to the stream for the case
440            # of a 0-byte download to a file, as otherwise python won't
441            # create the file.
442            self.stream.write('')
443        return response
444
445    def GetRange(self, start, end=None, additional_headers=None,
446                 use_chunks=True):
447        """Retrieve a given byte range from this download, inclusive.
448
449        Range must be of one of these three forms:
450        * 0 <= start, end = None: Fetch from start to the end of the file.
451        * 0 <= start <= end: Fetch the bytes from start to end.
452        * start < 0, end = None: Fetch the last -start bytes of the file.
453
454        (These variations correspond to those described in the HTTP 1.1
455        protocol for range headers in RFC 2616, sec. 14.35.1.)
456
457        Args:
458          start: (int) Where to start fetching bytes. (See above.)
459          end: (int, optional) Where to stop fetching bytes. (See above.)
460          additional_headers: (bool, optional) Any additional headers to
461              pass with the request.
462          use_chunks: (bool, default: True) If False, ignore self.chunksize
463              and fetch this range in a single request.
464
465        Returns:
466          None. Streams bytes into self.stream.
467        """
468        self.EnsureInitialized()
469        progress_end_normalized = False
470        if self.total_size is not None:
471            progress, end_byte = self.__NormalizeStartEnd(start, end)
472            progress_end_normalized = True
473        else:
474            progress = start
475            end_byte = end
476        while (not progress_end_normalized or end_byte is None or
477               progress <= end_byte):
478            end_byte = self.__ComputeEndByte(progress, end=end_byte,
479                                             use_chunks=use_chunks)
480            response = self.__GetChunk(progress, end_byte,
481                                       additional_headers=additional_headers)
482            if not progress_end_normalized:
483                self.__SetTotal(response.info)
484                progress, end_byte = self.__NormalizeStartEnd(start, end)
485                progress_end_normalized = True
486            response = self.__ProcessResponse(response)
487            progress += response.length
488            if response.length == 0:
489                if response.status_code == http_client.OK:
490                    # There can legitimately be no Content-Length header sent
491                    # in some cases (e.g., when there's a Transfer-Encoding
492                    # header) and if this was a 200 response (as opposed to
493                    # 206 Partial Content) we know we're done now without
494                    # looping further on received length.
495                    return
496                raise exceptions.TransferRetryError(
497                    'Zero bytes unexpectedly returned in download response')
498
499    def StreamInChunks(self, callback=None, finish_callback=None,
500                       additional_headers=None):
501        """Stream the entire download in chunks."""
502        self.StreamMedia(callback=callback, finish_callback=finish_callback,
503                         additional_headers=additional_headers,
504                         use_chunks=True)
505
506    def StreamMedia(self, callback=None, finish_callback=None,
507                    additional_headers=None, use_chunks=True):
508        """Stream the entire download.
509
510        Args:
511          callback: (default: None) Callback to call as each chunk is
512              completed.
513          finish_callback: (default: None) Callback to call when the
514              download is complete.
515          additional_headers: (default: None) Additional headers to
516              include in fetching bytes.
517          use_chunks: (bool, default: True) If False, ignore self.chunksize
518              and stream this download in a single request.
519
520        Returns:
521            None. Streams bytes into self.stream.
522        """
523        callback = callback or self.progress_callback
524        finish_callback = finish_callback or self.finish_callback
525
526        self.EnsureInitialized()
527        while True:
528            if self.__initial_response is not None:
529                response = self.__initial_response
530                self.__initial_response = None
531            else:
532                end_byte = self.__ComputeEndByte(self.progress,
533                                                 use_chunks=use_chunks)
534                response = self.__GetChunk(
535                    self.progress, end_byte,
536                    additional_headers=additional_headers)
537            if self.total_size is None:
538                self.__SetTotal(response.info)
539            response = self.__ProcessResponse(response)
540            self._ExecuteCallback(callback, response)
541            if (response.status_code == http_client.OK or
542                    self.progress >= self.total_size):
543                break
544        self._ExecuteCallback(finish_callback, response)
545
546
547if six.PY3:
548    class MultipartBytesGenerator(email_generator.BytesGenerator):
549        """Generates a bytes Message object tree for multipart messages
550
551        This is a BytesGenerator that has been modified to not attempt line
552        termination character modification in the bytes payload. Known to
553        work with the compat32 policy only. It may work on others, but not
554        tested. The outfp object must accept bytes in its write method.
555        """
556        def _handle_text(self, msg):
557            # If the string has surrogates the original source was bytes, so
558            # just write it back out.
559            if msg._payload is None:
560                return
561            self.write(msg._payload)
562
563        # Default body handler
564        _writeBody = _handle_text
565
566
567class Upload(_Transfer):
568
569    """Data for a single Upload.
570
571    Fields:
572      stream: The stream to upload.
573      mime_type: MIME type of the upload.
574      total_size: (optional) Total upload size for the stream.
575      close_stream: (default: False) Whether or not we should close the
576          stream when finished with the upload.
577      auto_transfer: (default: True) If True, stream all bytes as soon as
578          the upload is created.
579    """
580    _REQUIRED_SERIALIZATION_KEYS = set((
581        'auto_transfer', 'mime_type', 'total_size', 'url'))
582
583    def __init__(self, stream, mime_type, total_size=None, http=None,
584                 close_stream=False, chunksize=None, auto_transfer=True,
585                 progress_callback=None, finish_callback=None,
586                 gzip_encoded=False, **kwds):
587        super(Upload, self).__init__(
588            stream, close_stream=close_stream, chunksize=chunksize,
589            auto_transfer=auto_transfer, http=http, **kwds)
590        self.__complete = False
591        self.__final_response = None
592        self.__mime_type = mime_type
593        self.__progress = 0
594        self.__server_chunk_granularity = None
595        self.__strategy = None
596        self.__total_size = None
597        self.__gzip_encoded = gzip_encoded
598
599        self.progress_callback = progress_callback
600        self.finish_callback = finish_callback
601        self.total_size = total_size
602
603    @property
604    def progress(self):
605        return self.__progress
606
607    @classmethod
608    def FromFile(cls, filename, mime_type=None, auto_transfer=True,
609                 gzip_encoded=False, **kwds):
610        """Create a new Upload object from a filename."""
611        path = os.path.expanduser(filename)
612        if not os.path.exists(path):
613            raise exceptions.NotFoundError('Could not find file %s' % path)
614        if not mime_type:
615            mime_type, _ = mimetypes.guess_type(path)
616            if mime_type is None:
617                raise exceptions.InvalidUserInputError(
618                    'Could not guess mime type for %s' % path)
619        size = os.stat(path).st_size
620        return cls(open(path, 'rb'), mime_type, total_size=size,
621                   close_stream=True, auto_transfer=auto_transfer,
622                   gzip_encoded=gzip_encoded, **kwds)
623
624    @classmethod
625    def FromStream(cls, stream, mime_type, total_size=None, auto_transfer=True,
626                   gzip_encoded=False, **kwds):
627        """Create a new Upload object from a stream."""
628        if mime_type is None:
629            raise exceptions.InvalidUserInputError(
630                'No mime_type specified for stream')
631        return cls(stream, mime_type, total_size=total_size,
632                   close_stream=False, auto_transfer=auto_transfer,
633                   gzip_encoded=gzip_encoded, **kwds)
634
635    @classmethod
636    def FromData(cls, stream, json_data, http, auto_transfer=None,
637                 gzip_encoded=False, **kwds):
638        """Create a new Upload of stream from serialized json_data and http."""
639        info = json.loads(json_data)
640        missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys())
641        if missing_keys:
642            raise exceptions.InvalidDataError(
643                'Invalid serialization data, missing keys: %s' % (
644                    ', '.join(missing_keys)))
645        if 'total_size' in kwds:
646            raise exceptions.InvalidUserInputError(
647                'Cannot override total_size on serialized Upload')
648        upload = cls.FromStream(stream, info['mime_type'],
649                                total_size=info.get('total_size'),
650                                gzip_encoded=gzip_encoded, **kwds)
651        if isinstance(stream, io.IOBase) and not stream.seekable():
652            raise exceptions.InvalidUserInputError(
653                'Cannot restart resumable upload on non-seekable stream')
654        if auto_transfer is not None:
655            upload.auto_transfer = auto_transfer
656        else:
657            upload.auto_transfer = info['auto_transfer']
658        upload.strategy = RESUMABLE_UPLOAD
659        upload._Initialize(  # pylint: disable=protected-access
660            http, info['url'])
661        upload.RefreshResumableUploadState()
662        upload.EnsureInitialized()
663        if upload.auto_transfer:
664            upload.StreamInChunks()
665        return upload
666
667    @property
668    def serialization_data(self):
669        self.EnsureInitialized()
670        if self.strategy != RESUMABLE_UPLOAD:
671            raise exceptions.InvalidDataError(
672                'Serialization only supported for resumable uploads')
673        return {
674            'auto_transfer': self.auto_transfer,
675            'mime_type': self.mime_type,
676            'total_size': self.total_size,
677            'url': self.url,
678        }
679
680    @property
681    def complete(self):
682        return self.__complete
683
684    @property
685    def mime_type(self):
686        return self.__mime_type
687
688    def __str__(self):
689        if not self.initialized:
690            return 'Upload (uninitialized)'
691        return 'Upload with %d/%s bytes transferred for url %s' % (
692            self.progress, self.total_size or '???', self.url)
693
694    @property
695    def strategy(self):
696        return self.__strategy
697
698    @strategy.setter
699    def strategy(self, value):
700        if value not in (SIMPLE_UPLOAD, RESUMABLE_UPLOAD):
701            raise exceptions.UserError((
702                'Invalid value "%s" for upload strategy, must be one of '
703                '"simple" or "resumable".') % value)
704        self.__strategy = value
705
706    @property
707    def total_size(self):
708        return self.__total_size
709
710    @total_size.setter
711    def total_size(self, value):
712        self.EnsureUninitialized()
713        self.__total_size = value
714
715    def __SetDefaultUploadStrategy(self, upload_config, http_request):
716        """Determine and set the default upload strategy for this upload.
717
718        We generally prefer simple or multipart, unless we're forced to
719        use resumable. This happens when any of (1) the upload is too
720        large, (2) the simple endpoint doesn't support multipart requests
721        and we have metadata, or (3) there is no simple upload endpoint.
722
723        Args:
724          upload_config: Configuration for the upload endpoint.
725          http_request: The associated http request.
726
727        Returns:
728          None.
729        """
730        if upload_config.resumable_path is None:
731            self.strategy = SIMPLE_UPLOAD
732        if self.strategy is not None:
733            return
734        strategy = SIMPLE_UPLOAD
735        if (self.total_size is not None and
736                self.total_size > _RESUMABLE_UPLOAD_THRESHOLD):
737            strategy = RESUMABLE_UPLOAD
738        if http_request.body and not upload_config.simple_multipart:
739            strategy = RESUMABLE_UPLOAD
740        if not upload_config.simple_path:
741            strategy = RESUMABLE_UPLOAD
742        self.strategy = strategy
743
744    def ConfigureRequest(self, upload_config, http_request, url_builder):
745        """Configure the request and url for this upload."""
746        # Validate total_size vs. max_size
747        if (self.total_size and upload_config.max_size and
748                self.total_size > upload_config.max_size):
749            raise exceptions.InvalidUserInputError(
750                'Upload too big: %s larger than max size %s' % (
751                    self.total_size, upload_config.max_size))
752        # Validate mime type
753        if not util.AcceptableMimeType(upload_config.accept, self.mime_type):
754            raise exceptions.InvalidUserInputError(
755                'MIME type %s does not match any accepted MIME ranges %s' % (
756                    self.mime_type, upload_config.accept))
757
758        self.__SetDefaultUploadStrategy(upload_config, http_request)
759        if self.strategy == SIMPLE_UPLOAD:
760            url_builder.relative_path = upload_config.simple_path
761            if http_request.body:
762                url_builder.query_params['uploadType'] = 'multipart'
763                self.__ConfigureMultipartRequest(http_request)
764            else:
765                url_builder.query_params['uploadType'] = 'media'
766                self.__ConfigureMediaRequest(http_request)
767            # Once the entire body is written, compress the body if configured
768            # to. Both multipart and media request uploads will read the
769            # entire stream into memory, which means full compression is also
770            # safe to perform. Because the strategy is set to SIMPLE_UPLOAD,
771            # StreamInChunks throws an exception, meaning double compression
772            # cannot happen.
773            if self.__gzip_encoded:
774                http_request.headers['Content-Encoding'] = 'gzip'
775                # Turn the body into a stream so that we can compress it, then
776                # read the compressed bytes.  In the event of a retry (e.g. if
777                # our access token has expired), we need to be able to re-read
778                # the body, which we can't do with a stream. So, we consume the
779                # bytes from the stream now and store them in a re-readable
780                # bytes container.
781                http_request.body = (
782                    compression.CompressStream(
783                        six.BytesIO(http_request.body))[0].read())
784        else:
785            url_builder.relative_path = upload_config.resumable_path
786            url_builder.query_params['uploadType'] = 'resumable'
787            self.__ConfigureResumableRequest(http_request)
788
789    def __ConfigureMediaRequest(self, http_request):
790        """Configure http_request as a simple request for this upload."""
791        http_request.headers['content-type'] = self.mime_type
792        http_request.body = self.stream.read()
793        http_request.loggable_body = '<media body>'
794
795    def __ConfigureMultipartRequest(self, http_request):
796        """Configure http_request as a multipart request for this upload."""
797        # This is a multipart/related upload.
798        msg_root = mime_multipart.MIMEMultipart('related')
799        # msg_root should not write out its own headers
800        setattr(msg_root, '_write_headers', lambda self: None)
801
802        # attach the body as one part
803        msg = mime_nonmultipart.MIMENonMultipart(
804            *http_request.headers['content-type'].split('/'))
805        msg.set_payload(http_request.body)
806        msg_root.attach(msg)
807
808        # attach the media as the second part
809        msg = mime_nonmultipart.MIMENonMultipart(*self.mime_type.split('/'))
810        msg['Content-Transfer-Encoding'] = 'binary'
811        msg.set_payload(self.stream.read())
812        msg_root.attach(msg)
813
814        # NOTE: We encode the body, but can't use
815        #       `email.message.Message.as_string` because it prepends
816        #       `> ` to `From ` lines.
817        fp = six.BytesIO()
818        if six.PY3:
819            generator_class = MultipartBytesGenerator
820        else:
821            generator_class = email_generator.Generator
822        g = generator_class(fp, mangle_from_=False)
823        g.flatten(msg_root, unixfrom=False)
824        http_request.body = fp.getvalue()
825
826        multipart_boundary = msg_root.get_boundary()
827        http_request.headers['content-type'] = (
828            'multipart/related; boundary=%r' % multipart_boundary)
829        if isinstance(multipart_boundary, six.text_type):
830            multipart_boundary = multipart_boundary.encode('ascii')
831
832        body_components = http_request.body.split(multipart_boundary)
833        headers, _, _ = body_components[-2].partition(b'\n\n')
834        body_components[-2] = b'\n\n'.join([headers, b'<media body>\n\n--'])
835        http_request.loggable_body = multipart_boundary.join(body_components)
836
837    def __ConfigureResumableRequest(self, http_request):
838        http_request.headers['X-Upload-Content-Type'] = self.mime_type
839        if self.total_size is not None:
840            http_request.headers[
841                'X-Upload-Content-Length'] = str(self.total_size)
842
843    def RefreshResumableUploadState(self):
844        """Talk to the server and refresh the state of this resumable upload.
845
846        Returns:
847          Response if the upload is complete.
848        """
849        if self.strategy != RESUMABLE_UPLOAD:
850            return
851        self.EnsureInitialized()
852        refresh_request = http_wrapper.Request(
853            url=self.url, http_method='PUT',
854            headers={'Content-Range': 'bytes */*'})
855        refresh_response = http_wrapper.MakeRequest(
856            self.http, refresh_request, redirections=0,
857            retries=self.num_retries)
858        range_header = self._GetRangeHeaderFromResponse(refresh_response)
859        if refresh_response.status_code in (http_client.OK,
860                                            http_client.CREATED):
861            self.__complete = True
862            self.__progress = self.total_size
863            self.stream.seek(self.progress)
864            # If we're finished, the refresh response will contain the metadata
865            # originally requested. Cache it so it can be returned in
866            # StreamInChunks.
867            self.__final_response = refresh_response
868        elif refresh_response.status_code == http_wrapper.RESUME_INCOMPLETE:
869            if range_header is None:
870                self.__progress = 0
871            else:
872                self.__progress = self.__GetLastByte(range_header) + 1
873            self.stream.seek(self.progress)
874        else:
875            raise exceptions.HttpError.FromResponse(refresh_response)
876
877    def _GetRangeHeaderFromResponse(self, response):
878        return response.info.get('Range', response.info.get('range'))
879
880    def InitializeUpload(self, http_request, http=None, client=None):
881        """Initialize this upload from the given http_request."""
882        if self.strategy is None:
883            raise exceptions.UserError(
884                'No upload strategy set; did you call ConfigureRequest?')
885        if http is None and client is None:
886            raise exceptions.UserError('Must provide client or http.')
887        if self.strategy != RESUMABLE_UPLOAD:
888            return
889        http = http or client.http
890        if client is not None:
891            http_request.url = client.FinalizeTransferUrl(http_request.url)
892        self.EnsureUninitialized()
893        http_response = http_wrapper.MakeRequest(http, http_request,
894                                                 retries=self.num_retries)
895        if http_response.status_code != http_client.OK:
896            raise exceptions.HttpError.FromResponse(http_response)
897
898        self.__server_chunk_granularity = http_response.info.get(
899            'X-Goog-Upload-Chunk-Granularity')
900        url = http_response.info['location']
901        if client is not None:
902            url = client.FinalizeTransferUrl(url)
903        self._Initialize(http, url)
904
905        # Unless the user has requested otherwise, we want to just
906        # go ahead and pump the bytes now.
907        if self.auto_transfer:
908            return self.StreamInChunks()
909        return http_response
910
911    def __GetLastByte(self, range_header):
912        _, _, end = range_header.partition('-')
913        # TODO(craigcitro): Validate start == 0?
914        return int(end)
915
916    def __ValidateChunksize(self, chunksize=None):
917        if self.__server_chunk_granularity is None:
918            return
919        chunksize = chunksize or self.chunksize
920        if chunksize % self.__server_chunk_granularity:
921            raise exceptions.ConfigurationValueError(
922                'Server requires chunksize to be a multiple of %d' %
923                self.__server_chunk_granularity)
924
925    def __IsRetryable(self, response):
926        return (response.status_code >= 500 or
927                response.status_code == http_wrapper.TOO_MANY_REQUESTS or
928                response.retry_after)
929
930    def __StreamMedia(self, callback=None, finish_callback=None,
931                      additional_headers=None, use_chunks=True):
932        """Helper function for StreamMedia / StreamInChunks."""
933        if self.strategy != RESUMABLE_UPLOAD:
934            raise exceptions.InvalidUserInputError(
935                'Cannot stream non-resumable upload')
936        callback = callback or self.progress_callback
937        finish_callback = finish_callback or self.finish_callback
938        # final_response is set if we resumed an already-completed upload.
939        response = self.__final_response
940
941        def CallSendChunk(start):
942            return self.__SendChunk(
943                start, additional_headers=additional_headers)
944
945        def CallSendMediaBody(start):
946            return self.__SendMediaBody(
947                start, additional_headers=additional_headers)
948
949        send_func = CallSendChunk if use_chunks else CallSendMediaBody
950        if not use_chunks and self.__gzip_encoded:
951            raise exceptions.InvalidUserInputError(
952                'Cannot gzip encode non-chunked upload')
953        if use_chunks:
954            self.__ValidateChunksize(self.chunksize)
955        self.EnsureInitialized()
956        while not self.complete:
957            response = send_func(self.stream.tell())
958            if response.status_code in (http_client.OK, http_client.CREATED):
959                self.__complete = True
960                break
961            if response.status_code not in (
962                    http_client.OK, http_client.CREATED,
963                    http_wrapper.RESUME_INCOMPLETE):
964                # Only raise an exception if the error is something we can't
965                # recover from.
966                if (self.strategy != RESUMABLE_UPLOAD or
967                        not self.__IsRetryable(response)):
968                    raise exceptions.HttpError.FromResponse(response)
969                # We want to reset our state to wherever the server left us
970                # before this failed request, and then raise.
971                self.RefreshResumableUploadState()
972
973                self._ExecuteCallback(callback, response)
974                continue
975
976            self.__progress = self.__GetLastByte(
977                self._GetRangeHeaderFromResponse(response))
978            if self.progress + 1 != self.stream.tell():
979                # TODO(craigcitro): Add a better way to recover here.
980                raise exceptions.CommunicationError(
981                    'Failed to transfer all bytes in chunk, upload paused at '
982                    'byte %d' % self.progress)
983            self._ExecuteCallback(callback, response)
984        if self.__complete and hasattr(self.stream, 'seek'):
985            current_pos = self.stream.tell()
986            self.stream.seek(0, os.SEEK_END)
987            end_pos = self.stream.tell()
988            self.stream.seek(current_pos)
989            if current_pos != end_pos:
990                raise exceptions.TransferInvalidError(
991                    'Upload complete with %s additional bytes left in stream' %
992                    (int(end_pos) - int(current_pos)))
993        self._ExecuteCallback(finish_callback, response)
994        return response
995
996    def StreamMedia(self, callback=None, finish_callback=None,
997                    additional_headers=None):
998        """Send this resumable upload in a single request.
999
1000        Args:
1001          callback: Progress callback function with inputs
1002              (http_wrapper.Response, transfer.Upload)
1003          finish_callback: Final callback function with inputs
1004              (http_wrapper.Response, transfer.Upload)
1005          additional_headers: Dict of headers to include with the upload
1006              http_wrapper.Request.
1007
1008        Returns:
1009          http_wrapper.Response of final response.
1010        """
1011        return self.__StreamMedia(
1012            callback=callback, finish_callback=finish_callback,
1013            additional_headers=additional_headers, use_chunks=False)
1014
1015    def StreamInChunks(self, callback=None, finish_callback=None,
1016                       additional_headers=None):
1017        """Send this (resumable) upload in chunks."""
1018        return self.__StreamMedia(
1019            callback=callback, finish_callback=finish_callback,
1020            additional_headers=additional_headers)
1021
1022    def __SendMediaRequest(self, request, end):
1023        """Request helper function for SendMediaBody & SendChunk."""
1024        def CheckResponse(response):
1025            if response is None:
1026                # Caller shouldn't call us if the response is None,
1027                # but handle anyway.
1028                raise exceptions.RequestError(
1029                    'Request to url %s did not return a response.' %
1030                    response.request_url)
1031        response = http_wrapper.MakeRequest(
1032            self.bytes_http, request, retry_func=self.retry_func,
1033            retries=self.num_retries, check_response_func=CheckResponse)
1034        if response.status_code == http_wrapper.RESUME_INCOMPLETE:
1035            last_byte = self.__GetLastByte(
1036                self._GetRangeHeaderFromResponse(response))
1037            if last_byte + 1 != end:
1038                self.stream.seek(last_byte + 1)
1039        return response
1040
1041    def __SendMediaBody(self, start, additional_headers=None):
1042        """Send the entire media stream in a single request."""
1043        self.EnsureInitialized()
1044        if self.total_size is None:
1045            raise exceptions.TransferInvalidError(
1046                'Total size must be known for SendMediaBody')
1047        body_stream = stream_slice.StreamSlice(
1048            self.stream, self.total_size - start)
1049
1050        request = http_wrapper.Request(url=self.url, http_method='PUT',
1051                                       body=body_stream)
1052        request.headers['Content-Type'] = self.mime_type
1053        if start == self.total_size:
1054            # End of an upload with 0 bytes left to send; just finalize.
1055            range_string = 'bytes */%s' % self.total_size
1056        else:
1057            range_string = 'bytes %s-%s/%s' % (start, self.total_size - 1,
1058                                               self.total_size)
1059
1060        request.headers['Content-Range'] = range_string
1061        if additional_headers:
1062            request.headers.update(additional_headers)
1063
1064        return self.__SendMediaRequest(request, self.total_size)
1065
1066    def __SendChunk(self, start, additional_headers=None):
1067        """Send the specified chunk."""
1068        self.EnsureInitialized()
1069        no_log_body = self.total_size is None
1070        request = http_wrapper.Request(url=self.url, http_method='PUT')
1071        if self.__gzip_encoded:
1072            request.headers['Content-Encoding'] = 'gzip'
1073            body_stream, read_length, exhausted = compression.CompressStream(
1074                self.stream, self.chunksize)
1075            end = start + read_length
1076            # If the stream length was previously unknown and the input stream
1077            # is exhausted, then we're at the end of the stream.
1078            if self.total_size is None and exhausted:
1079                self.__total_size = end
1080        elif self.total_size is None:
1081            # For the streaming resumable case, we need to detect when
1082            # we're at the end of the stream.
1083            body_stream = buffered_stream.BufferedStream(
1084                self.stream, start, self.chunksize)
1085            end = body_stream.stream_end_position
1086            if body_stream.stream_exhausted:
1087                self.__total_size = end
1088            # TODO: Here, change body_stream from a stream to a string object,
1089            # which means reading a chunk into memory.  This works around
1090            # https://code.google.com/p/httplib2/issues/detail?id=176 which can
1091            # cause httplib2 to skip bytes on 401's for file objects.
1092            # Rework this solution to be more general.
1093            body_stream = body_stream.read(self.chunksize)
1094        else:
1095            end = min(start + self.chunksize, self.total_size)
1096            body_stream = stream_slice.StreamSlice(self.stream, end - start)
1097        # TODO(craigcitro): Think about clearer errors on "no data in
1098        # stream".
1099        request.body = body_stream
1100        request.headers['Content-Type'] = self.mime_type
1101        if no_log_body:
1102            # Disable logging of streaming body.
1103            # TODO: Remove no_log_body and rework as part of a larger logs
1104            # refactor.
1105            request.loggable_body = '<media body>'
1106        if self.total_size is None:
1107            # Streaming resumable upload case, unknown total size.
1108            range_string = 'bytes %s-%s/*' % (start, end - 1)
1109        elif end == start:
1110            # End of an upload with 0 bytes left to send; just finalize.
1111            range_string = 'bytes */%s' % self.total_size
1112        else:
1113            # Normal resumable upload case with known sizes.
1114            range_string = 'bytes %s-%s/%s' % (start, end - 1, self.total_size)
1115
1116        request.headers['Content-Range'] = range_string
1117        if additional_headers:
1118            request.headers.update(additional_headers)
1119
1120        return self.__SendMediaRequest(request, end)
1121