1#!/usr/bin/env python 2# 3# Copyright 2015 Google Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Library for handling batch HTTP requests for apitools.""" 18 19import collections 20import email.generator as generator 21import email.mime.multipart as mime_multipart 22import email.mime.nonmultipart as mime_nonmultipart 23import email.parser as email_parser 24import itertools 25import time 26import uuid 27 28import six 29from six.moves import http_client 30from six.moves import urllib_parse 31from six.moves import range # pylint: disable=redefined-builtin 32 33from apitools.base.py import exceptions 34from apitools.base.py import http_wrapper 35 36__all__ = [ 37 'BatchApiRequest', 38] 39 40 41class RequestResponseAndHandler(collections.namedtuple( 42 'RequestResponseAndHandler', ['request', 'response', 'handler'])): 43 44 """Container for data related to completing an HTTP request. 45 46 This contains an HTTP request, its response, and a callback for handling 47 the response from the server. 48 49 Attributes: 50 request: An http_wrapper.Request object representing the HTTP request. 51 response: The http_wrapper.Response object returned from the server. 52 handler: A callback function accepting two arguments, response 53 and exception. Response is an http_wrapper.Response object, and 54 exception is an apiclient.errors.HttpError object if an error 55 occurred, or otherwise None. 56 """ 57 58 59class BatchApiRequest(object): 60 """Batches multiple api requests into a single request.""" 61 62 class ApiCall(object): 63 64 """Holds request and response information for each request. 65 66 ApiCalls are ultimately exposed to the client once the HTTP 67 batch request has been completed. 68 69 Attributes: 70 http_request: A client-supplied http_wrapper.Request to be 71 submitted to the server. 72 response: A http_wrapper.Response object given by the server as a 73 response to the user request, or None if an error occurred. 74 exception: An apiclient.errors.HttpError object if an error 75 occurred, or None. 76 77 """ 78 79 def __init__(self, request, retryable_codes, service, method_config): 80 """Initialize an individual API request. 81 82 Args: 83 request: An http_wrapper.Request object. 84 retryable_codes: A list of integer HTTP codes that can 85 be retried. 86 service: A service inheriting from base_api.BaseApiService. 87 method_config: Method config for the desired API request. 88 89 """ 90 self.__retryable_codes = list( 91 set(retryable_codes + [http_client.UNAUTHORIZED])) 92 self.__http_response = None 93 self.__service = service 94 self.__method_config = method_config 95 96 self.http_request = request 97 # TODO(user): Add some validation to these fields. 98 self.__response = None 99 self.__exception = None 100 101 @property 102 def is_error(self): 103 return self.exception is not None 104 105 @property 106 def response(self): 107 return self.__response 108 109 @property 110 def exception(self): 111 return self.__exception 112 113 @property 114 def authorization_failed(self): 115 return (self.__http_response and ( 116 self.__http_response.status_code == http_client.UNAUTHORIZED)) 117 118 @property 119 def terminal_state(self): 120 if self.__http_response is None: 121 return False 122 response_code = self.__http_response.status_code 123 return response_code not in self.__retryable_codes 124 125 def HandleResponse(self, http_response, exception): 126 """Handles an incoming http response to the request in http_request. 127 128 This is intended to be used as a callback function for 129 BatchHttpRequest.Add. 130 131 Args: 132 http_response: Deserialized http_wrapper.Response object. 133 exception: apiclient.errors.HttpError object if an error 134 occurred. 135 136 """ 137 self.__http_response = http_response 138 self.__exception = exception 139 if self.terminal_state and not self.__exception: 140 self.__response = self.__service.ProcessHttpResponse( 141 self.__method_config, self.__http_response) 142 143 def __init__(self, batch_url=None, retryable_codes=None): 144 """Initialize a batch API request object. 145 146 Args: 147 batch_url: Base URL for batch API calls. 148 retryable_codes: A list of integer HTTP codes that can be retried. 149 """ 150 self.api_requests = [] 151 self.retryable_codes = retryable_codes or [] 152 self.batch_url = batch_url or 'https://www.googleapis.com/batch' 153 154 def Add(self, service, method, request, global_params=None): 155 """Add a request to the batch. 156 157 Args: 158 service: A class inheriting base_api.BaseApiService. 159 method: A string indicated desired method from the service. See 160 the example in the class docstring. 161 request: An input message appropriate for the specified 162 service.method. 163 global_params: Optional additional parameters to pass into 164 method.PrepareHttpRequest. 165 166 Returns: 167 None 168 169 """ 170 # Retrieve the configs for the desired method and service. 171 method_config = service.GetMethodConfig(method) 172 upload_config = service.GetUploadConfig(method) 173 174 # Prepare the HTTP Request. 175 http_request = service.PrepareHttpRequest( 176 method_config, request, global_params=global_params, 177 upload_config=upload_config) 178 179 # Create the request and add it to our master list. 180 api_request = self.ApiCall( 181 http_request, self.retryable_codes, service, method_config) 182 self.api_requests.append(api_request) 183 184 def Execute(self, http, sleep_between_polls=5, max_retries=5, 185 max_batch_size=None, batch_request_callback=None): 186 """Execute all of the requests in the batch. 187 188 Args: 189 http: httplib2.Http object for use in the request. 190 sleep_between_polls: Integer number of seconds to sleep between 191 polls. 192 max_retries: Max retries. Any requests that have not succeeded by 193 this number of retries simply report the last response or 194 exception, whatever it happened to be. 195 max_batch_size: int, if specified requests will be split in batches 196 of given size. 197 batch_request_callback: function of (http_response, exception) passed 198 to BatchHttpRequest which will be run on any given results. 199 200 Returns: 201 List of ApiCalls. 202 """ 203 requests = [request for request in self.api_requests 204 if not request.terminal_state] 205 batch_size = max_batch_size or len(requests) 206 207 for attempt in range(max_retries): 208 if attempt: 209 time.sleep(sleep_between_polls) 210 211 for i in range(0, len(requests), batch_size): 212 # Create a batch_http_request object and populate it with 213 # incomplete requests. 214 batch_http_request = BatchHttpRequest( 215 batch_url=self.batch_url, 216 callback=batch_request_callback 217 ) 218 for request in itertools.islice(requests, 219 i, i + batch_size): 220 batch_http_request.Add( 221 request.http_request, request.HandleResponse) 222 batch_http_request.Execute(http) 223 224 if hasattr(http.request, 'credentials'): 225 if any(request.authorization_failed 226 for request in itertools.islice(requests, 227 i, i + batch_size)): 228 http.request.credentials.refresh(http) 229 230 # Collect retryable requests. 231 requests = [request for request in self.api_requests if not 232 request.terminal_state] 233 if not requests: 234 break 235 236 return self.api_requests 237 238 239class BatchHttpRequest(object): 240 241 """Batches multiple http_wrapper.Request objects into a single request.""" 242 243 def __init__(self, batch_url, callback=None): 244 """Constructor for a BatchHttpRequest. 245 246 Args: 247 batch_url: URL to send batch requests to. 248 callback: A callback to be called for each response, of the 249 form callback(response, exception). The first parameter is 250 the deserialized Response object. The second is an 251 apiclient.errors.HttpError exception object if an HTTP error 252 occurred while processing the request, or None if no error 253 occurred. 254 """ 255 # Endpoint to which these requests are sent. 256 self.__batch_url = batch_url 257 258 # Global callback to be called for each individual response in the 259 # batch. 260 self.__callback = callback 261 262 # List of requests, responses and handlers. 263 self.__request_response_handlers = {} 264 265 # The last auto generated id. 266 self.__last_auto_id = itertools.count() 267 268 # Unique ID on which to base the Content-ID headers. 269 self.__base_id = uuid.uuid4() 270 271 def _ConvertIdToHeader(self, request_id): 272 """Convert an id to a Content-ID header value. 273 274 Args: 275 request_id: String identifier for a individual request. 276 277 Returns: 278 A Content-ID header with the id_ encoded into it. A UUID is 279 prepended to the value because Content-ID headers are 280 supposed to be universally unique. 281 282 """ 283 return '<%s+%s>' % (self.__base_id, urllib_parse.quote(request_id)) 284 285 @staticmethod 286 def _ConvertHeaderToId(header): 287 """Convert a Content-ID header value to an id. 288 289 Presumes the Content-ID header conforms to the format that 290 _ConvertIdToHeader() returns. 291 292 Args: 293 header: A string indicating the Content-ID header value. 294 295 Returns: 296 The extracted id value. 297 298 Raises: 299 BatchError if the header is not in the expected format. 300 """ 301 if not (header.startswith('<') or header.endswith('>')): 302 raise exceptions.BatchError( 303 'Invalid value for Content-ID: %s' % header) 304 if '+' not in header: 305 raise exceptions.BatchError( 306 'Invalid value for Content-ID: %s' % header) 307 _, request_id = header[1:-1].rsplit('+', 1) 308 309 return urllib_parse.unquote(request_id) 310 311 def _SerializeRequest(self, request): 312 """Convert a http_wrapper.Request object into a string. 313 314 Args: 315 request: A http_wrapper.Request to serialize. 316 317 Returns: 318 The request as a string in application/http format. 319 """ 320 # Construct status line 321 parsed = urllib_parse.urlsplit(request.url) 322 request_line = urllib_parse.urlunsplit( 323 (None, None, parsed.path, parsed.query, None)) 324 status_line = u' '.join(( 325 request.http_method, 326 request_line.decode('utf-8'), 327 u'HTTP/1.1\n' 328 )) 329 major, minor = request.headers.get( 330 'content-type', 'application/json').split('/') 331 msg = mime_nonmultipart.MIMENonMultipart(major, minor) 332 333 # MIMENonMultipart adds its own Content-Type header. 334 # Keep all of the other headers in `request.headers`. 335 for key, value in request.headers.items(): 336 if key == 'content-type': 337 continue 338 msg[key] = value 339 340 msg['Host'] = parsed.netloc 341 msg.set_unixfrom(None) 342 343 if request.body is not None: 344 msg.set_payload(request.body) 345 346 # Serialize the mime message. 347 str_io = six.StringIO() 348 # maxheaderlen=0 means don't line wrap headers. 349 gen = generator.Generator(str_io, maxheaderlen=0) 350 gen.flatten(msg, unixfrom=False) 351 body = str_io.getvalue() 352 353 return status_line + body 354 355 def _DeserializeResponse(self, payload): 356 """Convert string into Response and content. 357 358 Args: 359 payload: Header and body string to be deserialized. 360 361 Returns: 362 A Response object 363 """ 364 # Strip off the status line. 365 status_line, payload = payload.split('\n', 1) 366 _, status, _ = status_line.split(' ', 2) 367 368 # Parse the rest of the response. 369 parser = email_parser.Parser() 370 msg = parser.parsestr(payload) 371 372 # Get the headers. 373 info = dict(msg) 374 info['status'] = status 375 376 # Create Response from the parsed headers. 377 content = msg.get_payload() 378 379 return http_wrapper.Response(info, content, self.__batch_url) 380 381 def _NewId(self): 382 """Create a new id. 383 384 Auto incrementing number that avoids conflicts with ids already used. 385 386 Returns: 387 A new unique id string. 388 """ 389 return str(next(self.__last_auto_id)) 390 391 def Add(self, request, callback=None): 392 """Add a new request. 393 394 Args: 395 request: A http_wrapper.Request to add to the batch. 396 callback: A callback to be called for this response, of the 397 form callback(response, exception). The first parameter is the 398 deserialized response object. The second is an 399 apiclient.errors.HttpError exception object if an HTTP error 400 occurred while processing the request, or None if no errors 401 occurred. 402 403 Returns: 404 None 405 """ 406 handler = RequestResponseAndHandler(request, None, callback) 407 self.__request_response_handlers[self._NewId()] = handler 408 409 def _Execute(self, http): 410 """Serialize batch request, send to server, process response. 411 412 Args: 413 http: A httplib2.Http object to be used to make the request with. 414 415 Raises: 416 httplib2.HttpLib2Error if a transport error has occured. 417 apiclient.errors.BatchError if the response is the wrong format. 418 """ 419 message = mime_multipart.MIMEMultipart('mixed') 420 # Message should not write out its own headers. 421 setattr(message, '_write_headers', lambda self: None) 422 423 # Add all the individual requests. 424 for key in self.__request_response_handlers: 425 msg = mime_nonmultipart.MIMENonMultipart('application', 'http') 426 msg['Content-Transfer-Encoding'] = 'binary' 427 msg['Content-ID'] = self._ConvertIdToHeader(key) 428 429 body = self._SerializeRequest( 430 self.__request_response_handlers[key].request) 431 msg.set_payload(body) 432 message.attach(msg) 433 434 request = http_wrapper.Request(self.__batch_url, 'POST') 435 request.body = message.as_string() 436 request.headers['content-type'] = ( 437 'multipart/mixed; boundary="%s"') % message.get_boundary() 438 439 response = http_wrapper.MakeRequest(http, request) 440 441 if response.status_code >= 300: 442 raise exceptions.HttpError.FromResponse(response) 443 444 # Prepend with a content-type header so Parser can handle it. 445 header = 'content-type: %s\r\n\r\n' % response.info['content-type'] 446 447 parser = email_parser.Parser() 448 mime_response = parser.parsestr(header + response.content) 449 450 if not mime_response.is_multipart(): 451 raise exceptions.BatchError( 452 'Response not in multipart/mixed format.') 453 454 for part in mime_response.get_payload(): 455 request_id = self._ConvertHeaderToId(part['Content-ID']) 456 response = self._DeserializeResponse(part.get_payload()) 457 458 # Disable protected access because namedtuple._replace(...) 459 # is not actually meant to be protected. 460 # pylint: disable=protected-access 461 self.__request_response_handlers[request_id] = ( 462 self.__request_response_handlers[request_id]._replace( 463 response=response)) 464 465 def Execute(self, http): 466 """Execute all the requests as a single batched HTTP request. 467 468 Args: 469 http: A httplib2.Http object to be used with the request. 470 471 Returns: 472 None 473 474 Raises: 475 BatchError if the response is the wrong format. 476 """ 477 478 self._Execute(http) 479 480 for key in self.__request_response_handlers: 481 response = self.__request_response_handlers[key].response 482 callback = self.__request_response_handlers[key].handler 483 484 exception = None 485 486 if response.status_code >= 300: 487 exception = exceptions.HttpError.FromResponse(response) 488 489 if callback is not None: 490 callback(response, exception) 491 if self.__callback is not None: 492 self.__callback(response, exception) 493