1#!/usr/bin/env python
2#
3# Copyright 2015 Google Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Library for handling batch HTTP requests for apitools."""
18
19import collections
20import email.generator as generator
21import email.mime.multipart as mime_multipart
22import email.mime.nonmultipart as mime_nonmultipart
23import email.parser as email_parser
24import itertools
25import time
26import uuid
27
28import six
29from six.moves import http_client
30from six.moves import urllib_parse
31from six.moves import range  # pylint: disable=redefined-builtin
32
33from apitools.base.py import exceptions
34from apitools.base.py import http_wrapper
35
36__all__ = [
37    'BatchApiRequest',
38]
39
40
41class RequestResponseAndHandler(collections.namedtuple(
42        'RequestResponseAndHandler', ['request', 'response', 'handler'])):
43
44    """Container for data related to completing an HTTP request.
45
46    This contains an HTTP request, its response, and a callback for handling
47    the response from the server.
48
49    Attributes:
50      request: An http_wrapper.Request object representing the HTTP request.
51      response: The http_wrapper.Response object returned from the server.
52      handler: A callback function accepting two arguments, response
53        and exception. Response is an http_wrapper.Response object, and
54        exception is an apiclient.errors.HttpError object if an error
55        occurred, or otherwise None.
56    """
57
58
59class BatchApiRequest(object):
60    """Batches multiple api requests into a single request."""
61
62    class ApiCall(object):
63
64        """Holds request and response information for each request.
65
66        ApiCalls are ultimately exposed to the client once the HTTP
67        batch request has been completed.
68
69        Attributes:
70          http_request: A client-supplied http_wrapper.Request to be
71              submitted to the server.
72          response: A http_wrapper.Response object given by the server as a
73              response to the user request, or None if an error occurred.
74          exception: An apiclient.errors.HttpError object if an error
75              occurred, or None.
76
77        """
78
79        def __init__(self, request, retryable_codes, service, method_config):
80            """Initialize an individual API request.
81
82            Args:
83              request: An http_wrapper.Request object.
84              retryable_codes: A list of integer HTTP codes that can
85                  be retried.
86              service: A service inheriting from base_api.BaseApiService.
87              method_config: Method config for the desired API request.
88
89            """
90            self.__retryable_codes = list(
91                set(retryable_codes + [http_client.UNAUTHORIZED]))
92            self.__http_response = None
93            self.__service = service
94            self.__method_config = method_config
95
96            self.http_request = request
97            # TODO(user): Add some validation to these fields.
98            self.__response = None
99            self.__exception = None
100
101        @property
102        def is_error(self):
103            return self.exception is not None
104
105        @property
106        def response(self):
107            return self.__response
108
109        @property
110        def exception(self):
111            return self.__exception
112
113        @property
114        def authorization_failed(self):
115            return (self.__http_response and (
116                self.__http_response.status_code == http_client.UNAUTHORIZED))
117
118        @property
119        def terminal_state(self):
120            if self.__http_response is None:
121                return False
122            response_code = self.__http_response.status_code
123            return response_code not in self.__retryable_codes
124
125        def HandleResponse(self, http_response, exception):
126            """Handles an incoming http response to the request in http_request.
127
128            This is intended to be used as a callback function for
129            BatchHttpRequest.Add.
130
131            Args:
132              http_response: Deserialized http_wrapper.Response object.
133              exception: apiclient.errors.HttpError object if an error
134                  occurred.
135
136            """
137            self.__http_response = http_response
138            self.__exception = exception
139            if self.terminal_state and not self.__exception:
140                self.__response = self.__service.ProcessHttpResponse(
141                    self.__method_config, self.__http_response)
142
143    def __init__(self, batch_url=None, retryable_codes=None):
144        """Initialize a batch API request object.
145
146        Args:
147          batch_url: Base URL for batch API calls.
148          retryable_codes: A list of integer HTTP codes that can be retried.
149        """
150        self.api_requests = []
151        self.retryable_codes = retryable_codes or []
152        self.batch_url = batch_url or 'https://www.googleapis.com/batch'
153
154    def Add(self, service, method, request, global_params=None):
155        """Add a request to the batch.
156
157        Args:
158          service: A class inheriting base_api.BaseApiService.
159          method: A string indicated desired method from the service. See
160              the example in the class docstring.
161          request: An input message appropriate for the specified
162              service.method.
163          global_params: Optional additional parameters to pass into
164              method.PrepareHttpRequest.
165
166        Returns:
167          None
168
169        """
170        # Retrieve the configs for the desired method and service.
171        method_config = service.GetMethodConfig(method)
172        upload_config = service.GetUploadConfig(method)
173
174        # Prepare the HTTP Request.
175        http_request = service.PrepareHttpRequest(
176            method_config, request, global_params=global_params,
177            upload_config=upload_config)
178
179        # Create the request and add it to our master list.
180        api_request = self.ApiCall(
181            http_request, self.retryable_codes, service, method_config)
182        self.api_requests.append(api_request)
183
184    def Execute(self, http, sleep_between_polls=5, max_retries=5,
185                max_batch_size=None, batch_request_callback=None):
186        """Execute all of the requests in the batch.
187
188        Args:
189          http: httplib2.Http object for use in the request.
190          sleep_between_polls: Integer number of seconds to sleep between
191              polls.
192          max_retries: Max retries. Any requests that have not succeeded by
193              this number of retries simply report the last response or
194              exception, whatever it happened to be.
195          max_batch_size: int, if specified requests will be split in batches
196              of given size.
197          batch_request_callback: function of (http_response, exception) passed
198              to BatchHttpRequest which will be run on any given results.
199
200        Returns:
201          List of ApiCalls.
202        """
203        requests = [request for request in self.api_requests
204                    if not request.terminal_state]
205        batch_size = max_batch_size or len(requests)
206
207        for attempt in range(max_retries):
208            if attempt:
209                time.sleep(sleep_between_polls)
210
211            for i in range(0, len(requests), batch_size):
212                # Create a batch_http_request object and populate it with
213                # incomplete requests.
214                batch_http_request = BatchHttpRequest(
215                    batch_url=self.batch_url,
216                    callback=batch_request_callback
217                )
218                for request in itertools.islice(requests,
219                                                i, i + batch_size):
220                    batch_http_request.Add(
221                        request.http_request, request.HandleResponse)
222                batch_http_request.Execute(http)
223
224                if hasattr(http.request, 'credentials'):
225                    if any(request.authorization_failed
226                           for request in itertools.islice(requests,
227                                                           i, i + batch_size)):
228                        http.request.credentials.refresh(http)
229
230            # Collect retryable requests.
231            requests = [request for request in self.api_requests if not
232                        request.terminal_state]
233            if not requests:
234                break
235
236        return self.api_requests
237
238
239class BatchHttpRequest(object):
240
241    """Batches multiple http_wrapper.Request objects into a single request."""
242
243    def __init__(self, batch_url, callback=None):
244        """Constructor for a BatchHttpRequest.
245
246        Args:
247          batch_url: URL to send batch requests to.
248          callback: A callback to be called for each response, of the
249              form callback(response, exception). The first parameter is
250              the deserialized Response object. The second is an
251              apiclient.errors.HttpError exception object if an HTTP error
252              occurred while processing the request, or None if no error
253              occurred.
254        """
255        # Endpoint to which these requests are sent.
256        self.__batch_url = batch_url
257
258        # Global callback to be called for each individual response in the
259        # batch.
260        self.__callback = callback
261
262        # List of requests, responses and handlers.
263        self.__request_response_handlers = {}
264
265        # The last auto generated id.
266        self.__last_auto_id = itertools.count()
267
268        # Unique ID on which to base the Content-ID headers.
269        self.__base_id = uuid.uuid4()
270
271    def _ConvertIdToHeader(self, request_id):
272        """Convert an id to a Content-ID header value.
273
274        Args:
275          request_id: String identifier for a individual request.
276
277        Returns:
278          A Content-ID header with the id_ encoded into it. A UUID is
279          prepended to the value because Content-ID headers are
280          supposed to be universally unique.
281
282        """
283        return '<%s+%s>' % (self.__base_id, urllib_parse.quote(request_id))
284
285    @staticmethod
286    def _ConvertHeaderToId(header):
287        """Convert a Content-ID header value to an id.
288
289        Presumes the Content-ID header conforms to the format that
290        _ConvertIdToHeader() returns.
291
292        Args:
293          header: A string indicating the Content-ID header value.
294
295        Returns:
296          The extracted id value.
297
298        Raises:
299          BatchError if the header is not in the expected format.
300        """
301        if not (header.startswith('<') or header.endswith('>')):
302            raise exceptions.BatchError(
303                'Invalid value for Content-ID: %s' % header)
304        if '+' not in header:
305            raise exceptions.BatchError(
306                'Invalid value for Content-ID: %s' % header)
307        _, request_id = header[1:-1].rsplit('+', 1)
308
309        return urllib_parse.unquote(request_id)
310
311    def _SerializeRequest(self, request):
312        """Convert a http_wrapper.Request object into a string.
313
314        Args:
315          request: A http_wrapper.Request to serialize.
316
317        Returns:
318          The request as a string in application/http format.
319        """
320        # Construct status line
321        parsed = urllib_parse.urlsplit(request.url)
322        request_line = urllib_parse.urlunsplit(
323            (None, None, parsed.path, parsed.query, None))
324        status_line = u' '.join((
325            request.http_method,
326            request_line.decode('utf-8'),
327            u'HTTP/1.1\n'
328        ))
329        major, minor = request.headers.get(
330            'content-type', 'application/json').split('/')
331        msg = mime_nonmultipart.MIMENonMultipart(major, minor)
332
333        # MIMENonMultipart adds its own Content-Type header.
334        # Keep all of the other headers in `request.headers`.
335        for key, value in request.headers.items():
336            if key == 'content-type':
337                continue
338            msg[key] = value
339
340        msg['Host'] = parsed.netloc
341        msg.set_unixfrom(None)
342
343        if request.body is not None:
344            msg.set_payload(request.body)
345
346        # Serialize the mime message.
347        str_io = six.StringIO()
348        # maxheaderlen=0 means don't line wrap headers.
349        gen = generator.Generator(str_io, maxheaderlen=0)
350        gen.flatten(msg, unixfrom=False)
351        body = str_io.getvalue()
352
353        return status_line + body
354
355    def _DeserializeResponse(self, payload):
356        """Convert string into Response and content.
357
358        Args:
359          payload: Header and body string to be deserialized.
360
361        Returns:
362          A Response object
363        """
364        # Strip off the status line.
365        status_line, payload = payload.split('\n', 1)
366        _, status, _ = status_line.split(' ', 2)
367
368        # Parse the rest of the response.
369        parser = email_parser.Parser()
370        msg = parser.parsestr(payload)
371
372        # Get the headers.
373        info = dict(msg)
374        info['status'] = status
375
376        # Create Response from the parsed headers.
377        content = msg.get_payload()
378
379        return http_wrapper.Response(info, content, self.__batch_url)
380
381    def _NewId(self):
382        """Create a new id.
383
384        Auto incrementing number that avoids conflicts with ids already used.
385
386        Returns:
387           A new unique id string.
388        """
389        return str(next(self.__last_auto_id))
390
391    def Add(self, request, callback=None):
392        """Add a new request.
393
394        Args:
395          request: A http_wrapper.Request to add to the batch.
396          callback: A callback to be called for this response, of the
397              form callback(response, exception). The first parameter is the
398              deserialized response object. The second is an
399              apiclient.errors.HttpError exception object if an HTTP error
400              occurred while processing the request, or None if no errors
401              occurred.
402
403        Returns:
404          None
405        """
406        handler = RequestResponseAndHandler(request, None, callback)
407        self.__request_response_handlers[self._NewId()] = handler
408
409    def _Execute(self, http):
410        """Serialize batch request, send to server, process response.
411
412        Args:
413          http: A httplib2.Http object to be used to make the request with.
414
415        Raises:
416          httplib2.HttpLib2Error if a transport error has occured.
417          apiclient.errors.BatchError if the response is the wrong format.
418        """
419        message = mime_multipart.MIMEMultipart('mixed')
420        # Message should not write out its own headers.
421        setattr(message, '_write_headers', lambda self: None)
422
423        # Add all the individual requests.
424        for key in self.__request_response_handlers:
425            msg = mime_nonmultipart.MIMENonMultipart('application', 'http')
426            msg['Content-Transfer-Encoding'] = 'binary'
427            msg['Content-ID'] = self._ConvertIdToHeader(key)
428
429            body = self._SerializeRequest(
430                self.__request_response_handlers[key].request)
431            msg.set_payload(body)
432            message.attach(msg)
433
434        request = http_wrapper.Request(self.__batch_url, 'POST')
435        request.body = message.as_string()
436        request.headers['content-type'] = (
437            'multipart/mixed; boundary="%s"') % message.get_boundary()
438
439        response = http_wrapper.MakeRequest(http, request)
440
441        if response.status_code >= 300:
442            raise exceptions.HttpError.FromResponse(response)
443
444        # Prepend with a content-type header so Parser can handle it.
445        header = 'content-type: %s\r\n\r\n' % response.info['content-type']
446
447        parser = email_parser.Parser()
448        mime_response = parser.parsestr(header + response.content)
449
450        if not mime_response.is_multipart():
451            raise exceptions.BatchError(
452                'Response not in multipart/mixed format.')
453
454        for part in mime_response.get_payload():
455            request_id = self._ConvertHeaderToId(part['Content-ID'])
456            response = self._DeserializeResponse(part.get_payload())
457
458            # Disable protected access because namedtuple._replace(...)
459            # is not actually meant to be protected.
460            # pylint: disable=protected-access
461            self.__request_response_handlers[request_id] = (
462                self.__request_response_handlers[request_id]._replace(
463                    response=response))
464
465    def Execute(self, http):
466        """Execute all the requests as a single batched HTTP request.
467
468        Args:
469          http: A httplib2.Http object to be used with the request.
470
471        Returns:
472          None
473
474        Raises:
475          BatchError if the response is the wrong format.
476        """
477
478        self._Execute(http)
479
480        for key in self.__request_response_handlers:
481            response = self.__request_response_handlers[key].response
482            callback = self.__request_response_handlers[key].handler
483
484            exception = None
485
486            if response.status_code >= 300:
487                exception = exceptions.HttpError.FromResponse(response)
488
489            if callback is not None:
490                callback(response, exception)
491            if self.__callback is not None:
492                self.__callback(response, exception)
493