1#!/usr/bin/env python
2"""Library for handling batch HTTP requests for apitools."""
3
4import collections
5import email.generator as generator
6import email.mime.multipart as mime_multipart
7import email.mime.nonmultipart as mime_nonmultipart
8import email.parser as email_parser
9import itertools
10import time
11import uuid
12
13import six
14from six.moves import http_client
15from six.moves import urllib_parse
16
17from apitools.base.py import exceptions
18from apitools.base.py import http_wrapper
19
20__all__ = [
21    'BatchApiRequest',
22]
23
24
25class RequestResponseAndHandler(collections.namedtuple(
26        'RequestResponseAndHandler', ['request', 'response', 'handler'])):
27
28    """Container for data related to completing an HTTP request.
29
30    This contains an HTTP request, its response, and a callback for handling
31    the response from the server.
32
33    Attributes:
34      request: An http_wrapper.Request object representing the HTTP request.
35      response: The http_wrapper.Response object returned from the server.
36      handler: A callback function accepting two arguments, response
37        and exception. Response is an http_wrapper.Response object, and
38        exception is an apiclient.errors.HttpError object if an error
39        occurred, or otherwise None.
40    """
41
42
43class BatchApiRequest(object):
44
45    class ApiCall(object):
46
47        """Holds request and response information for each request.
48
49        ApiCalls are ultimately exposed to the client once the HTTP
50        batch request has been completed.
51
52        Attributes:
53          http_request: A client-supplied http_wrapper.Request to be
54              submitted to the server.
55          response: A http_wrapper.Response object given by the server as a
56              response to the user request, or None if an error occurred.
57          exception: An apiclient.errors.HttpError object if an error
58              occurred, or None.
59
60        """
61
62        def __init__(self, request, retryable_codes, service, method_config):
63            """Initialize an individual API request.
64
65            Args:
66              request: An http_wrapper.Request object.
67              retryable_codes: A list of integer HTTP codes that can
68                  be retried.
69              service: A service inheriting from base_api.BaseApiService.
70              method_config: Method config for the desired API request.
71
72            """
73            self.__retryable_codes = list(
74                set(retryable_codes + [http_client.UNAUTHORIZED]))
75            self.__http_response = None
76            self.__service = service
77            self.__method_config = method_config
78
79            self.http_request = request
80            # TODO(user): Add some validation to these fields.
81            self.__response = None
82            self.__exception = None
83
84        @property
85        def is_error(self):
86            return self.exception is not None
87
88        @property
89        def response(self):
90            return self.__response
91
92        @property
93        def exception(self):
94            return self.__exception
95
96        @property
97        def authorization_failed(self):
98            return (self.__http_response and (
99                self.__http_response.status_code == http_client.UNAUTHORIZED))
100
101        @property
102        def terminal_state(self):
103            if self.__http_response is None:
104                return False
105            response_code = self.__http_response.status_code
106            return response_code not in self.__retryable_codes
107
108        def HandleResponse(self, http_response, exception):
109            """Handles an incoming http response to the request in http_request.
110
111            This is intended to be used as a callback function for
112            BatchHttpRequest.Add.
113
114            Args:
115              http_response: Deserialized http_wrapper.Response object.
116              exception: apiclient.errors.HttpError object if an error
117                  occurred.
118
119            """
120            self.__http_response = http_response
121            self.__exception = exception
122            if self.terminal_state and not self.__exception:
123                self.__response = self.__service.ProcessHttpResponse(
124                    self.__method_config, self.__http_response)
125
126    def __init__(self, batch_url=None, retryable_codes=None):
127        """Initialize a batch API request object.
128
129        Args:
130          batch_url: Base URL for batch API calls.
131          retryable_codes: A list of integer HTTP codes that can be retried.
132        """
133        self.api_requests = []
134        self.retryable_codes = retryable_codes or []
135        self.batch_url = batch_url or 'https://www.googleapis.com/batch'
136
137    def Add(self, service, method, request, global_params=None):
138        """Add a request to the batch.
139
140        Args:
141          service: A class inheriting base_api.BaseApiService.
142          method: A string indicated desired method from the service. See
143              the example in the class docstring.
144          request: An input message appropriate for the specified
145              service.method.
146          global_params: Optional additional parameters to pass into
147              method.PrepareHttpRequest.
148
149        Returns:
150          None
151
152        """
153        # Retrieve the configs for the desired method and service.
154        method_config = service.GetMethodConfig(method)
155        upload_config = service.GetUploadConfig(method)
156
157        # Prepare the HTTP Request.
158        http_request = service.PrepareHttpRequest(
159            method_config, request, global_params=global_params,
160            upload_config=upload_config)
161
162        # Create the request and add it to our master list.
163        api_request = self.ApiCall(
164            http_request, self.retryable_codes, service, method_config)
165        self.api_requests.append(api_request)
166
167    def Execute(self, http, sleep_between_polls=5, max_retries=5):
168        """Execute all of the requests in the batch.
169
170        Args:
171          http: httplib2.Http object for use in the request.
172          sleep_between_polls: Integer number of seconds to sleep between
173              polls.
174          max_retries: Max retries. Any requests that have not succeeded by
175              this number of retries simply report the last response or
176              exception, whatever it happened to be.
177
178        Returns:
179          List of ApiCalls.
180        """
181        requests = [request for request in self.api_requests
182                    if not request.terminal_state]
183
184        for attempt in range(max_retries):
185            if attempt:
186                time.sleep(sleep_between_polls)
187
188            # Create a batch_http_request object and populate it with
189            # incomplete requests.
190            batch_http_request = BatchHttpRequest(batch_url=self.batch_url)
191            for request in requests:
192                batch_http_request.Add(
193                    request.http_request, request.HandleResponse)
194            batch_http_request.Execute(http)
195
196            # Collect retryable requests.
197            requests = [request for request in self.api_requests if not
198                        request.terminal_state]
199
200            if hasattr(http.request, 'credentials'):
201                if any(request.authorization_failed for request in requests):
202                    http.request.credentials.refresh(http)
203
204            if not requests:
205                break
206
207        return self.api_requests
208
209
210class BatchHttpRequest(object):
211
212    """Batches multiple http_wrapper.Request objects into a single request."""
213
214    def __init__(self, batch_url, callback=None):
215        """Constructor for a BatchHttpRequest.
216
217        Args:
218          batch_url: URL to send batch requests to.
219          callback: A callback to be called for each response, of the
220              form callback(response, exception). The first parameter is
221              the deserialized Response object. The second is an
222              apiclient.errors.HttpError exception object if an HTTP error
223              occurred while processing the request, or None if no error
224              occurred.
225        """
226        # Endpoint to which these requests are sent.
227        self.__batch_url = batch_url
228
229        # Global callback to be called for each individual response in the
230        # batch.
231        self.__callback = callback
232
233        # List of requests, responses and handlers.
234        self.__request_response_handlers = {}
235
236        # The last auto generated id.
237        self.__last_auto_id = itertools.count()
238
239        # Unique ID on which to base the Content-ID headers.
240        self.__base_id = uuid.uuid4()
241
242    def _ConvertIdToHeader(self, request_id):
243        """Convert an id to a Content-ID header value.
244
245        Args:
246          request_id: String identifier for a individual request.
247
248        Returns:
249          A Content-ID header with the id_ encoded into it. A UUID is
250          prepended to the value because Content-ID headers are
251          supposed to be universally unique.
252
253        """
254        return '<%s+%s>' % (self.__base_id, urllib_parse.quote(request_id))
255
256    @staticmethod
257    def _ConvertHeaderToId(header):
258        """Convert a Content-ID header value to an id.
259
260        Presumes the Content-ID header conforms to the format that
261        _ConvertIdToHeader() returns.
262
263        Args:
264          header: A string indicating the Content-ID header value.
265
266        Returns:
267          The extracted id value.
268
269        Raises:
270          BatchError if the header is not in the expected format.
271        """
272        if not (header.startswith('<') or header.endswith('>')):
273            raise exceptions.BatchError(
274                'Invalid value for Content-ID: %s' % header)
275        if '+' not in header:
276            raise exceptions.BatchError(
277                'Invalid value for Content-ID: %s' % header)
278        _, request_id = header[1:-1].rsplit('+', 1)
279
280        return urllib_parse.unquote(request_id)
281
282    def _SerializeRequest(self, request):
283        """Convert a http_wrapper.Request object into a string.
284
285        Args:
286          request: A http_wrapper.Request to serialize.
287
288        Returns:
289          The request as a string in application/http format.
290        """
291        # Construct status line
292        parsed = urllib_parse.urlsplit(request.url)
293        request_line = urllib_parse.urlunsplit(
294            (None, None, parsed.path, parsed.query, None))
295        status_line = u' '.join((
296            request.http_method,
297            request_line.decode('utf-8'),
298            u'HTTP/1.1\n'
299        ))
300        major, minor = request.headers.get(
301            'content-type', 'application/json').split('/')
302        msg = mime_nonmultipart.MIMENonMultipart(major, minor)
303
304        # MIMENonMultipart adds its own Content-Type header.
305        # Keep all of the other headers in `request.headers`.
306        for key, value in request.headers.items():
307            if key == 'content-type':
308                continue
309            msg[key] = value
310
311        msg['Host'] = parsed.netloc
312        msg.set_unixfrom(None)
313
314        if request.body is not None:
315            msg.set_payload(request.body)
316
317        # Serialize the mime message.
318        str_io = six.StringIO()
319        # maxheaderlen=0 means don't line wrap headers.
320        gen = generator.Generator(str_io, maxheaderlen=0)
321        gen.flatten(msg, unixfrom=False)
322        body = str_io.getvalue()
323
324        return status_line + body
325
326    def _DeserializeResponse(self, payload):
327        """Convert string into Response and content.
328
329        Args:
330          payload: Header and body string to be deserialized.
331
332        Returns:
333          A Response object
334        """
335        # Strip off the status line.
336        status_line, payload = payload.split('\n', 1)
337        _, status, _ = status_line.split(' ', 2)
338
339        # Parse the rest of the response.
340        parser = email_parser.Parser()
341        msg = parser.parsestr(payload)
342
343        # Get the headers.
344        info = dict(msg)
345        info['status'] = status
346
347        # Create Response from the parsed headers.
348        content = msg.get_payload()
349
350        return http_wrapper.Response(info, content, self.__batch_url)
351
352    def _NewId(self):
353        """Create a new id.
354
355        Auto incrementing number that avoids conflicts with ids already used.
356
357        Returns:
358           A new unique id string.
359        """
360        return str(next(self.__last_auto_id))
361
362    def Add(self, request, callback=None):
363        """Add a new request.
364
365        Args:
366          request: A http_wrapper.Request to add to the batch.
367          callback: A callback to be called for this response, of the
368              form callback(response, exception). The first parameter is the
369              deserialized response object. The second is an
370              apiclient.errors.HttpError exception object if an HTTP error
371              occurred while processing the request, or None if no errors
372              occurred.
373
374        Returns:
375          None
376        """
377        handler = RequestResponseAndHandler(request, None, callback)
378        self.__request_response_handlers[self._NewId()] = handler
379
380    def _Execute(self, http):
381        """Serialize batch request, send to server, process response.
382
383        Args:
384          http: A httplib2.Http object to be used to make the request with.
385
386        Raises:
387          httplib2.HttpLib2Error if a transport error has occured.
388          apiclient.errors.BatchError if the response is the wrong format.
389        """
390        message = mime_multipart.MIMEMultipart('mixed')
391        # Message should not write out its own headers.
392        setattr(message, '_write_headers', lambda self: None)
393
394        # Add all the individual requests.
395        for key in self.__request_response_handlers:
396            msg = mime_nonmultipart.MIMENonMultipart('application', 'http')
397            msg['Content-Transfer-Encoding'] = 'binary'
398            msg['Content-ID'] = self._ConvertIdToHeader(key)
399
400            body = self._SerializeRequest(
401                self.__request_response_handlers[key].request)
402            msg.set_payload(body)
403            message.attach(msg)
404
405        request = http_wrapper.Request(self.__batch_url, 'POST')
406        request.body = message.as_string()
407        request.headers['content-type'] = (
408            'multipart/mixed; boundary="%s"') % message.get_boundary()
409
410        response = http_wrapper.MakeRequest(http, request)
411
412        if response.status_code >= 300:
413            raise exceptions.HttpError.FromResponse(response)
414
415        # Prepend with a content-type header so Parser can handle it.
416        header = 'content-type: %s\r\n\r\n' % response.info['content-type']
417
418        parser = email_parser.Parser()
419        mime_response = parser.parsestr(header + response.content)
420
421        if not mime_response.is_multipart():
422            raise exceptions.BatchError(
423                'Response not in multipart/mixed format.')
424
425        for part in mime_response.get_payload():
426            request_id = self._ConvertHeaderToId(part['Content-ID'])
427            response = self._DeserializeResponse(part.get_payload())
428
429            # Disable protected access because namedtuple._replace(...)
430            # is not actually meant to be protected.
431            self.__request_response_handlers[request_id] = (
432                self.__request_response_handlers[request_id]._replace(
433                    response=response))
434
435    def Execute(self, http):
436        """Execute all the requests as a single batched HTTP request.
437
438        Args:
439          http: A httplib2.Http object to be used with the request.
440
441        Returns:
442          None
443
444        Raises:
445          BatchError if the response is the wrong format.
446        """
447
448        self._Execute(http)
449
450        for key in self.__request_response_handlers:
451            response = self.__request_response_handlers[key].response
452            callback = self.__request_response_handlers[key].handler
453
454            exception = None
455
456            if response.status_code >= 300:
457                exception = exceptions.HttpError.FromResponse(response)
458
459            if callback is not None:
460                callback(response, exception)
461            if self.__callback is not None:
462                self.__callback(response, exception)
463