1#!/usr/bin/env python 2"""Library for handling batch HTTP requests for apitools.""" 3 4import collections 5import email.generator as generator 6import email.mime.multipart as mime_multipart 7import email.mime.nonmultipart as mime_nonmultipart 8import email.parser as email_parser 9import itertools 10import time 11import uuid 12 13import six 14from six.moves import http_client 15from six.moves import urllib_parse 16 17from apitools.base.py import exceptions 18from apitools.base.py import http_wrapper 19 20__all__ = [ 21 'BatchApiRequest', 22] 23 24 25class RequestResponseAndHandler(collections.namedtuple( 26 'RequestResponseAndHandler', ['request', 'response', 'handler'])): 27 28 """Container for data related to completing an HTTP request. 29 30 This contains an HTTP request, its response, and a callback for handling 31 the response from the server. 32 33 Attributes: 34 request: An http_wrapper.Request object representing the HTTP request. 35 response: The http_wrapper.Response object returned from the server. 36 handler: A callback function accepting two arguments, response 37 and exception. Response is an http_wrapper.Response object, and 38 exception is an apiclient.errors.HttpError object if an error 39 occurred, or otherwise None. 40 """ 41 42 43class BatchApiRequest(object): 44 45 class ApiCall(object): 46 47 """Holds request and response information for each request. 48 49 ApiCalls are ultimately exposed to the client once the HTTP 50 batch request has been completed. 51 52 Attributes: 53 http_request: A client-supplied http_wrapper.Request to be 54 submitted to the server. 55 response: A http_wrapper.Response object given by the server as a 56 response to the user request, or None if an error occurred. 57 exception: An apiclient.errors.HttpError object if an error 58 occurred, or None. 59 60 """ 61 62 def __init__(self, request, retryable_codes, service, method_config): 63 """Initialize an individual API request. 64 65 Args: 66 request: An http_wrapper.Request object. 67 retryable_codes: A list of integer HTTP codes that can 68 be retried. 69 service: A service inheriting from base_api.BaseApiService. 70 method_config: Method config for the desired API request. 71 72 """ 73 self.__retryable_codes = list( 74 set(retryable_codes + [http_client.UNAUTHORIZED])) 75 self.__http_response = None 76 self.__service = service 77 self.__method_config = method_config 78 79 self.http_request = request 80 # TODO(user): Add some validation to these fields. 81 self.__response = None 82 self.__exception = None 83 84 @property 85 def is_error(self): 86 return self.exception is not None 87 88 @property 89 def response(self): 90 return self.__response 91 92 @property 93 def exception(self): 94 return self.__exception 95 96 @property 97 def authorization_failed(self): 98 return (self.__http_response and ( 99 self.__http_response.status_code == http_client.UNAUTHORIZED)) 100 101 @property 102 def terminal_state(self): 103 if self.__http_response is None: 104 return False 105 response_code = self.__http_response.status_code 106 return response_code not in self.__retryable_codes 107 108 def HandleResponse(self, http_response, exception): 109 """Handles an incoming http response to the request in http_request. 110 111 This is intended to be used as a callback function for 112 BatchHttpRequest.Add. 113 114 Args: 115 http_response: Deserialized http_wrapper.Response object. 116 exception: apiclient.errors.HttpError object if an error 117 occurred. 118 119 """ 120 self.__http_response = http_response 121 self.__exception = exception 122 if self.terminal_state and not self.__exception: 123 self.__response = self.__service.ProcessHttpResponse( 124 self.__method_config, self.__http_response) 125 126 def __init__(self, batch_url=None, retryable_codes=None): 127 """Initialize a batch API request object. 128 129 Args: 130 batch_url: Base URL for batch API calls. 131 retryable_codes: A list of integer HTTP codes that can be retried. 132 """ 133 self.api_requests = [] 134 self.retryable_codes = retryable_codes or [] 135 self.batch_url = batch_url or 'https://www.googleapis.com/batch' 136 137 def Add(self, service, method, request, global_params=None): 138 """Add a request to the batch. 139 140 Args: 141 service: A class inheriting base_api.BaseApiService. 142 method: A string indicated desired method from the service. See 143 the example in the class docstring. 144 request: An input message appropriate for the specified 145 service.method. 146 global_params: Optional additional parameters to pass into 147 method.PrepareHttpRequest. 148 149 Returns: 150 None 151 152 """ 153 # Retrieve the configs for the desired method and service. 154 method_config = service.GetMethodConfig(method) 155 upload_config = service.GetUploadConfig(method) 156 157 # Prepare the HTTP Request. 158 http_request = service.PrepareHttpRequest( 159 method_config, request, global_params=global_params, 160 upload_config=upload_config) 161 162 # Create the request and add it to our master list. 163 api_request = self.ApiCall( 164 http_request, self.retryable_codes, service, method_config) 165 self.api_requests.append(api_request) 166 167 def Execute(self, http, sleep_between_polls=5, max_retries=5): 168 """Execute all of the requests in the batch. 169 170 Args: 171 http: httplib2.Http object for use in the request. 172 sleep_between_polls: Integer number of seconds to sleep between 173 polls. 174 max_retries: Max retries. Any requests that have not succeeded by 175 this number of retries simply report the last response or 176 exception, whatever it happened to be. 177 178 Returns: 179 List of ApiCalls. 180 """ 181 requests = [request for request in self.api_requests 182 if not request.terminal_state] 183 184 for attempt in range(max_retries): 185 if attempt: 186 time.sleep(sleep_between_polls) 187 188 # Create a batch_http_request object and populate it with 189 # incomplete requests. 190 batch_http_request = BatchHttpRequest(batch_url=self.batch_url) 191 for request in requests: 192 batch_http_request.Add( 193 request.http_request, request.HandleResponse) 194 batch_http_request.Execute(http) 195 196 # Collect retryable requests. 197 requests = [request for request in self.api_requests if not 198 request.terminal_state] 199 200 if hasattr(http.request, 'credentials'): 201 if any(request.authorization_failed for request in requests): 202 http.request.credentials.refresh(http) 203 204 if not requests: 205 break 206 207 return self.api_requests 208 209 210class BatchHttpRequest(object): 211 212 """Batches multiple http_wrapper.Request objects into a single request.""" 213 214 def __init__(self, batch_url, callback=None): 215 """Constructor for a BatchHttpRequest. 216 217 Args: 218 batch_url: URL to send batch requests to. 219 callback: A callback to be called for each response, of the 220 form callback(response, exception). The first parameter is 221 the deserialized Response object. The second is an 222 apiclient.errors.HttpError exception object if an HTTP error 223 occurred while processing the request, or None if no error 224 occurred. 225 """ 226 # Endpoint to which these requests are sent. 227 self.__batch_url = batch_url 228 229 # Global callback to be called for each individual response in the 230 # batch. 231 self.__callback = callback 232 233 # List of requests, responses and handlers. 234 self.__request_response_handlers = {} 235 236 # The last auto generated id. 237 self.__last_auto_id = itertools.count() 238 239 # Unique ID on which to base the Content-ID headers. 240 self.__base_id = uuid.uuid4() 241 242 def _ConvertIdToHeader(self, request_id): 243 """Convert an id to a Content-ID header value. 244 245 Args: 246 request_id: String identifier for a individual request. 247 248 Returns: 249 A Content-ID header with the id_ encoded into it. A UUID is 250 prepended to the value because Content-ID headers are 251 supposed to be universally unique. 252 253 """ 254 return '<%s+%s>' % (self.__base_id, urllib_parse.quote(request_id)) 255 256 @staticmethod 257 def _ConvertHeaderToId(header): 258 """Convert a Content-ID header value to an id. 259 260 Presumes the Content-ID header conforms to the format that 261 _ConvertIdToHeader() returns. 262 263 Args: 264 header: A string indicating the Content-ID header value. 265 266 Returns: 267 The extracted id value. 268 269 Raises: 270 BatchError if the header is not in the expected format. 271 """ 272 if not (header.startswith('<') or header.endswith('>')): 273 raise exceptions.BatchError( 274 'Invalid value for Content-ID: %s' % header) 275 if '+' not in header: 276 raise exceptions.BatchError( 277 'Invalid value for Content-ID: %s' % header) 278 _, request_id = header[1:-1].rsplit('+', 1) 279 280 return urllib_parse.unquote(request_id) 281 282 def _SerializeRequest(self, request): 283 """Convert a http_wrapper.Request object into a string. 284 285 Args: 286 request: A http_wrapper.Request to serialize. 287 288 Returns: 289 The request as a string in application/http format. 290 """ 291 # Construct status line 292 parsed = urllib_parse.urlsplit(request.url) 293 request_line = urllib_parse.urlunsplit( 294 (None, None, parsed.path, parsed.query, None)) 295 status_line = u' '.join(( 296 request.http_method, 297 request_line.decode('utf-8'), 298 u'HTTP/1.1\n' 299 )) 300 major, minor = request.headers.get( 301 'content-type', 'application/json').split('/') 302 msg = mime_nonmultipart.MIMENonMultipart(major, minor) 303 304 # MIMENonMultipart adds its own Content-Type header. 305 # Keep all of the other headers in `request.headers`. 306 for key, value in request.headers.items(): 307 if key == 'content-type': 308 continue 309 msg[key] = value 310 311 msg['Host'] = parsed.netloc 312 msg.set_unixfrom(None) 313 314 if request.body is not None: 315 msg.set_payload(request.body) 316 317 # Serialize the mime message. 318 str_io = six.StringIO() 319 # maxheaderlen=0 means don't line wrap headers. 320 gen = generator.Generator(str_io, maxheaderlen=0) 321 gen.flatten(msg, unixfrom=False) 322 body = str_io.getvalue() 323 324 return status_line + body 325 326 def _DeserializeResponse(self, payload): 327 """Convert string into Response and content. 328 329 Args: 330 payload: Header and body string to be deserialized. 331 332 Returns: 333 A Response object 334 """ 335 # Strip off the status line. 336 status_line, payload = payload.split('\n', 1) 337 _, status, _ = status_line.split(' ', 2) 338 339 # Parse the rest of the response. 340 parser = email_parser.Parser() 341 msg = parser.parsestr(payload) 342 343 # Get the headers. 344 info = dict(msg) 345 info['status'] = status 346 347 # Create Response from the parsed headers. 348 content = msg.get_payload() 349 350 return http_wrapper.Response(info, content, self.__batch_url) 351 352 def _NewId(self): 353 """Create a new id. 354 355 Auto incrementing number that avoids conflicts with ids already used. 356 357 Returns: 358 A new unique id string. 359 """ 360 return str(next(self.__last_auto_id)) 361 362 def Add(self, request, callback=None): 363 """Add a new request. 364 365 Args: 366 request: A http_wrapper.Request to add to the batch. 367 callback: A callback to be called for this response, of the 368 form callback(response, exception). The first parameter is the 369 deserialized response object. The second is an 370 apiclient.errors.HttpError exception object if an HTTP error 371 occurred while processing the request, or None if no errors 372 occurred. 373 374 Returns: 375 None 376 """ 377 handler = RequestResponseAndHandler(request, None, callback) 378 self.__request_response_handlers[self._NewId()] = handler 379 380 def _Execute(self, http): 381 """Serialize batch request, send to server, process response. 382 383 Args: 384 http: A httplib2.Http object to be used to make the request with. 385 386 Raises: 387 httplib2.HttpLib2Error if a transport error has occured. 388 apiclient.errors.BatchError if the response is the wrong format. 389 """ 390 message = mime_multipart.MIMEMultipart('mixed') 391 # Message should not write out its own headers. 392 setattr(message, '_write_headers', lambda self: None) 393 394 # Add all the individual requests. 395 for key in self.__request_response_handlers: 396 msg = mime_nonmultipart.MIMENonMultipart('application', 'http') 397 msg['Content-Transfer-Encoding'] = 'binary' 398 msg['Content-ID'] = self._ConvertIdToHeader(key) 399 400 body = self._SerializeRequest( 401 self.__request_response_handlers[key].request) 402 msg.set_payload(body) 403 message.attach(msg) 404 405 request = http_wrapper.Request(self.__batch_url, 'POST') 406 request.body = message.as_string() 407 request.headers['content-type'] = ( 408 'multipart/mixed; boundary="%s"') % message.get_boundary() 409 410 response = http_wrapper.MakeRequest(http, request) 411 412 if response.status_code >= 300: 413 raise exceptions.HttpError.FromResponse(response) 414 415 # Prepend with a content-type header so Parser can handle it. 416 header = 'content-type: %s\r\n\r\n' % response.info['content-type'] 417 418 parser = email_parser.Parser() 419 mime_response = parser.parsestr(header + response.content) 420 421 if not mime_response.is_multipart(): 422 raise exceptions.BatchError( 423 'Response not in multipart/mixed format.') 424 425 for part in mime_response.get_payload(): 426 request_id = self._ConvertHeaderToId(part['Content-ID']) 427 response = self._DeserializeResponse(part.get_payload()) 428 429 # Disable protected access because namedtuple._replace(...) 430 # is not actually meant to be protected. 431 self.__request_response_handlers[request_id] = ( 432 self.__request_response_handlers[request_id]._replace( 433 response=response)) 434 435 def Execute(self, http): 436 """Execute all the requests as a single batched HTTP request. 437 438 Args: 439 http: A httplib2.Http object to be used with the request. 440 441 Returns: 442 None 443 444 Raises: 445 BatchError if the response is the wrong format. 446 """ 447 448 self._Execute(http) 449 450 for key in self.__request_response_handlers: 451 response = self.__request_response_handlers[key].response 452 callback = self.__request_response_handlers[key].handler 453 454 exception = None 455 456 if response.status_code >= 300: 457 exception = exceptions.HttpError.FromResponse(response) 458 459 if callback is not None: 460 callback(response, exception) 461 if self.__callback is not None: 462 self.__callback(response, exception) 463