1#!/usr/bin/env python 2# 3# Copyright 2010 Google Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18"""Transport library for ProtoRPC. 19 20Contains underlying infrastructure used for communicating RPCs over low level 21transports such as HTTP. 22 23Includes HTTP transport built over urllib2. 24""" 25 26import six.moves.http_client 27import logging 28import os 29import socket 30import sys 31import urlparse 32 33from . import messages 34from . import protobuf 35from . import remote 36from . import util 37import six 38 39__all__ = [ 40 'RpcStateError', 41 42 'HttpTransport', 43 'LocalTransport', 44 'Rpc', 45 'Transport', 46] 47 48 49class RpcStateError(messages.Error): 50 """Raised when trying to put RPC in to an invalid state.""" 51 52 53class Rpc(object): 54 """Represents a client side RPC. 55 56 An RPC is created by the transport class and is used with a single RPC. While 57 an RPC is still in process, the response is set to None. When it is complete 58 the response will contain the response message. 59 """ 60 61 def __init__(self, request): 62 """Constructor. 63 64 Args: 65 request: Request associated with this RPC. 66 """ 67 self.__request = request 68 self.__response = None 69 self.__state = remote.RpcState.RUNNING 70 self.__error_message = None 71 self.__error_name = None 72 73 @property 74 def request(self): 75 """Request associated with RPC.""" 76 return self.__request 77 78 @property 79 def response(self): 80 """Response associated with RPC.""" 81 self.wait() 82 self.__check_status() 83 return self.__response 84 85 @property 86 def state(self): 87 """State associated with RPC.""" 88 return self.__state 89 90 @property 91 def error_message(self): 92 """Error, if any, associated with RPC.""" 93 self.wait() 94 return self.__error_message 95 96 @property 97 def error_name(self): 98 """Error name, if any, associated with RPC.""" 99 self.wait() 100 return self.__error_name 101 102 def wait(self): 103 """Wait for an RPC to finish.""" 104 if self.__state == remote.RpcState.RUNNING: 105 self._wait_impl() 106 107 def _wait_impl(self): 108 """Implementation for wait().""" 109 raise NotImplementedError() 110 111 def __check_status(self): 112 error_class = remote.RpcError.from_state(self.__state) 113 if error_class is not None: 114 if error_class is remote.ApplicationError: 115 raise error_class(self.__error_message, self.__error_name) 116 else: 117 raise error_class(self.__error_message) 118 119 def __set_state(self, state, error_message=None, error_name=None): 120 if self.__state != remote.RpcState.RUNNING: 121 raise RpcStateError( 122 'RPC must be in RUNNING state to change to %s' % state) 123 if state == remote.RpcState.RUNNING: 124 raise RpcStateError('RPC is already in RUNNING state') 125 self.__state = state 126 self.__error_message = error_message 127 self.__error_name = error_name 128 129 def set_response(self, response): 130 # TODO: Even more specific type checking. 131 if not isinstance(response, messages.Message): 132 raise TypeError('Expected Message type, received %r' % (response)) 133 134 self.__response = response 135 self.__set_state(remote.RpcState.OK) 136 137 def set_status(self, status): 138 status.check_initialized() 139 self.__set_state(status.state, status.error_message, status.error_name) 140 141 142class Transport(object): 143 """Transport base class. 144 145 Provides basic support for implementing a ProtoRPC transport such as one 146 that can send and receive messages over HTTP. 147 148 Implementations override _start_rpc. This method receives a RemoteInfo 149 instance and a request Message. The transport is expected to set the rpc 150 response or raise an exception before termination. 151 """ 152 153 @util.positional(1) 154 def __init__(self, protocol=protobuf): 155 """Constructor. 156 157 Args: 158 protocol: If string, will look up a protocol from the default Protocols 159 instance by name. Can also be an instance of remote.ProtocolConfig. 160 If neither, it must be an object that implements a protocol interface 161 by implementing encode_message, decode_message and set CONTENT_TYPE. 162 For example, the modules protobuf and protojson can be used directly. 163 """ 164 if isinstance(protocol, six.string_types): 165 protocols = remote.Protocols.get_default() 166 try: 167 protocol = protocols.lookup_by_name(protocol) 168 except KeyError: 169 protocol = protocols.lookup_by_content_type(protocol) 170 if isinstance(protocol, remote.ProtocolConfig): 171 self.__protocol = protocol.protocol 172 self.__protocol_config = protocol 173 else: 174 self.__protocol = protocol 175 self.__protocol_config = remote.ProtocolConfig( 176 protocol, 'default', default_content_type=protocol.CONTENT_TYPE) 177 178 @property 179 def protocol(self): 180 """Protocol associated with this transport.""" 181 return self.__protocol 182 183 @property 184 def protocol_config(self): 185 """Protocol associated with this transport.""" 186 return self.__protocol_config 187 188 def send_rpc(self, remote_info, request): 189 """Initiate sending an RPC over the transport. 190 191 Args: 192 remote_info: RemoteInfo instance describing remote method. 193 request: Request message to send to service. 194 195 Returns: 196 An Rpc instance intialized with the request.. 197 """ 198 request.check_initialized() 199 200 rpc = self._start_rpc(remote_info, request) 201 202 return rpc 203 204 def _start_rpc(self, remote_info, request): 205 """Start a remote procedure call. 206 207 Args: 208 remote_info: RemoteInfo instance describing remote method. 209 request: Request message to send to service. 210 211 Returns: 212 An Rpc instance initialized with the request. 213 """ 214 raise NotImplementedError() 215 216 217class HttpTransport(Transport): 218 """Transport for communicating with HTTP servers.""" 219 220 @util.positional(2) 221 def __init__(self, 222 service_url, 223 protocol=protobuf): 224 """Constructor. 225 226 Args: 227 service_url: URL where the service is located. All communication via 228 the transport will go to this URL. 229 protocol: The protocol implementation. Must implement encode_message and 230 decode_message. Can also be an instance of remote.ProtocolConfig. 231 """ 232 super(HttpTransport, self).__init__(protocol=protocol) 233 self.__service_url = service_url 234 235 def __get_rpc_status(self, response, content): 236 """Get RPC status from HTTP response. 237 238 Args: 239 response: HTTPResponse object. 240 content: Content read from HTTP response. 241 242 Returns: 243 RpcStatus object parsed from response, else an RpcStatus with a generic 244 HTTP error. 245 """ 246 # Status above 400 may have RpcStatus content. 247 if response.status >= 400: 248 content_type = response.getheader('content-type') 249 if content_type == self.protocol_config.default_content_type: 250 try: 251 rpc_status = self.protocol.decode_message(remote.RpcStatus, content) 252 except Exception as decode_err: 253 logging.warning( 254 'An error occurred trying to parse status: %s\n%s', 255 str(decode_err), content) 256 else: 257 if rpc_status.is_initialized(): 258 return rpc_status 259 else: 260 logging.warning( 261 'Body does not result in an initialized RpcStatus message:\n%s', 262 content) 263 264 # If no RpcStatus message present, attempt to forward any content. If empty 265 # use standard error message. 266 if not content.strip(): 267 content = six.moves.http_client.responses.get(response.status, 'Unknown Error') 268 return remote.RpcStatus(state=remote.RpcState.SERVER_ERROR, 269 error_message='HTTP Error %s: %s' % ( 270 response.status, content or 'Unknown Error')) 271 272 def __set_response(self, remote_info, connection, rpc): 273 """Set response on RPC. 274 275 Sets response or status from HTTP request. Implements the wait method of 276 Rpc instance. 277 278 Args: 279 remote_info: Remote info for invoked RPC. 280 connection: HTTPConnection that is making request. 281 rpc: Rpc instance. 282 """ 283 try: 284 response = connection.getresponse() 285 286 content = response.read() 287 288 if response.status == six.moves.http_client.OK: 289 response = self.protocol.decode_message(remote_info.response_type, 290 content) 291 rpc.set_response(response) 292 else: 293 status = self.__get_rpc_status(response, content) 294 rpc.set_status(status) 295 finally: 296 connection.close() 297 298 def _start_rpc(self, remote_info, request): 299 """Start a remote procedure call. 300 301 Args: 302 remote_info: A RemoteInfo instance for this RPC. 303 request: The request message for this RPC. 304 305 Returns: 306 An Rpc instance initialized with a Request. 307 """ 308 method_url = '%s.%s' % (self.__service_url, remote_info.method.__name__) 309 encoded_request = self.protocol.encode_message(request) 310 311 url = urlparse.urlparse(method_url) 312 if url.scheme == 'https': 313 connection_type = six.moves.http_client.HTTPSConnection 314 else: 315 connection_type = six.moves.http_client.HTTPConnection 316 connection = connection_type(url.hostname, url.port) 317 try: 318 self._send_http_request(connection, url.path, encoded_request) 319 rpc = Rpc(request) 320 except remote.RpcError: 321 # Pass through all ProtoRPC errors 322 connection.close() 323 raise 324 except socket.error as err: 325 connection.close() 326 raise remote.NetworkError('Socket error: %s %r' % (type(err).__name__, 327 err.args), 328 err) 329 except Exception as err: 330 connection.close() 331 raise remote.NetworkError('Error communicating with HTTP server', 332 err) 333 else: 334 wait_impl = lambda: self.__set_response(remote_info, connection, rpc) 335 rpc._wait_impl = wait_impl 336 337 return rpc 338 339 def _send_http_request(self, connection, http_path, encoded_request): 340 connection.request( 341 'POST', 342 http_path, 343 encoded_request, 344 headers={'Content-type': self.protocol_config.default_content_type, 345 'Content-length': len(encoded_request)}) 346 347 348class LocalTransport(Transport): 349 """Local transport that sends messages directly to services. 350 351 Useful in tests or creating code that can work with either local or remote 352 services. Using LocalTransport is preferrable to simply instantiating a 353 single instance of a service and reusing it. The entire request process 354 involves instantiating a new instance of a service, initializing it with 355 request state and then invoking the remote method for every request. 356 """ 357 358 def __init__(self, service_factory): 359 """Constructor. 360 361 Args: 362 service_factory: Service factory or class. 363 """ 364 super(LocalTransport, self).__init__() 365 self.__service_class = getattr(service_factory, 366 'service_class', 367 service_factory) 368 self.__service_factory = service_factory 369 370 @property 371 def service_class(self): 372 return self.__service_class 373 374 @property 375 def service_factory(self): 376 return self.__service_factory 377 378 def _start_rpc(self, remote_info, request): 379 """Start a remote procedure call. 380 381 Args: 382 remote_info: RemoteInfo instance describing remote method. 383 request: Request message to send to service. 384 385 Returns: 386 An Rpc instance initialized with the request. 387 """ 388 rpc = Rpc(request) 389 def wait_impl(): 390 instance = self.__service_factory() 391 try: 392 initalize_request_state = instance.initialize_request_state 393 except AttributeError: 394 pass 395 else: 396 host = six.text_type(os.uname()[1]) 397 initalize_request_state(remote.RequestState(remote_host=host, 398 remote_address=u'127.0.0.1', 399 server_host=host, 400 server_port=-1)) 401 try: 402 response = remote_info.method(instance, request) 403 assert isinstance(response, remote_info.response_type) 404 except remote.ApplicationError: 405 raise 406 except: 407 exc_type, exc_value, traceback = sys.exc_info() 408 message = 'Unexpected error %s: %s' % (exc_type.__name__, exc_value) 409 six.reraise(remote.ServerError, message, traceback) 410 rpc.set_response(response) 411 rpc._wait_impl = wait_impl 412 return rpc 413