1#!/usr/bin/env python
2#
3# Copyright 2010 Google Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Transport library for ProtoRPC.
19
20Contains underlying infrastructure used for communicating RPCs over low level
21transports such as HTTP.
22
23Includes HTTP transport built over urllib2.
24"""
25
26import six.moves.http_client
27import logging
28import os
29import socket
30import sys
31import urlparse
32
33from . import messages
34from . import protobuf
35from . import remote
36from . import util
37import six
38
39__all__ = [
40  'RpcStateError',
41
42  'HttpTransport',
43  'LocalTransport',
44  'Rpc',
45  'Transport',
46]
47
48
49class RpcStateError(messages.Error):
50  """Raised when trying to put RPC in to an invalid state."""
51
52
53class Rpc(object):
54  """Represents a client side RPC.
55
56  An RPC is created by the transport class and is used with a single RPC.  While
57  an RPC is still in process, the response is set to None.  When it is complete
58  the response will contain the response message.
59  """
60
61  def __init__(self, request):
62    """Constructor.
63
64    Args:
65      request: Request associated with this RPC.
66    """
67    self.__request = request
68    self.__response = None
69    self.__state = remote.RpcState.RUNNING
70    self.__error_message = None
71    self.__error_name = None
72
73  @property
74  def request(self):
75    """Request associated with RPC."""
76    return self.__request
77
78  @property
79  def response(self):
80    """Response associated with RPC."""
81    self.wait()
82    self.__check_status()
83    return self.__response
84
85  @property
86  def state(self):
87    """State associated with RPC."""
88    return self.__state
89
90  @property
91  def error_message(self):
92    """Error, if any, associated with RPC."""
93    self.wait()
94    return self.__error_message
95
96  @property
97  def error_name(self):
98    """Error name, if any, associated with RPC."""
99    self.wait()
100    return self.__error_name
101
102  def wait(self):
103    """Wait for an RPC to finish."""
104    if self.__state == remote.RpcState.RUNNING:
105      self._wait_impl()
106
107  def _wait_impl(self):
108    """Implementation for wait()."""
109    raise NotImplementedError()
110
111  def __check_status(self):
112    error_class = remote.RpcError.from_state(self.__state)
113    if error_class is not None:
114      if error_class is remote.ApplicationError:
115        raise error_class(self.__error_message, self.__error_name)
116      else:
117        raise error_class(self.__error_message)
118
119  def __set_state(self, state, error_message=None, error_name=None):
120    if self.__state != remote.RpcState.RUNNING:
121      raise RpcStateError(
122        'RPC must be in RUNNING state to change to %s' % state)
123    if state == remote.RpcState.RUNNING:
124      raise RpcStateError('RPC is already in RUNNING state')
125    self.__state = state
126    self.__error_message = error_message
127    self.__error_name = error_name
128
129  def set_response(self, response):
130    # TODO: Even more specific type checking.
131    if not isinstance(response, messages.Message):
132      raise TypeError('Expected Message type, received %r' % (response))
133
134    self.__response = response
135    self.__set_state(remote.RpcState.OK)
136
137  def set_status(self, status):
138    status.check_initialized()
139    self.__set_state(status.state, status.error_message, status.error_name)
140
141
142class Transport(object):
143  """Transport base class.
144
145  Provides basic support for implementing a ProtoRPC transport such as one
146  that can send and receive messages over HTTP.
147
148  Implementations override _start_rpc.  This method receives a RemoteInfo
149  instance and a request Message. The transport is expected to set the rpc
150  response or raise an exception before termination.
151  """
152
153  @util.positional(1)
154  def __init__(self, protocol=protobuf):
155    """Constructor.
156
157    Args:
158      protocol: If string, will look up a protocol from the default Protocols
159        instance by name.  Can also be an instance of remote.ProtocolConfig.
160        If neither, it must be an object that implements a protocol interface
161        by implementing encode_message, decode_message and set CONTENT_TYPE.
162        For example, the modules protobuf and protojson can be used directly.
163    """
164    if isinstance(protocol, six.string_types):
165      protocols = remote.Protocols.get_default()
166      try:
167        protocol = protocols.lookup_by_name(protocol)
168      except KeyError:
169        protocol = protocols.lookup_by_content_type(protocol)
170    if isinstance(protocol, remote.ProtocolConfig):
171      self.__protocol = protocol.protocol
172      self.__protocol_config = protocol
173    else:
174      self.__protocol = protocol
175      self.__protocol_config = remote.ProtocolConfig(
176        protocol, 'default', default_content_type=protocol.CONTENT_TYPE)
177
178  @property
179  def protocol(self):
180    """Protocol associated with this transport."""
181    return self.__protocol
182
183  @property
184  def protocol_config(self):
185    """Protocol associated with this transport."""
186    return self.__protocol_config
187
188  def send_rpc(self, remote_info, request):
189    """Initiate sending an RPC over the transport.
190
191    Args:
192      remote_info: RemoteInfo instance describing remote method.
193      request: Request message to send to service.
194
195    Returns:
196      An Rpc instance intialized with the request..
197    """
198    request.check_initialized()
199
200    rpc = self._start_rpc(remote_info, request)
201
202    return rpc
203
204  def _start_rpc(self, remote_info, request):
205    """Start a remote procedure call.
206
207    Args:
208      remote_info: RemoteInfo instance describing remote method.
209      request: Request message to send to service.
210
211    Returns:
212      An Rpc instance initialized with the request.
213    """
214    raise NotImplementedError()
215
216
217class HttpTransport(Transport):
218  """Transport for communicating with HTTP servers."""
219
220  @util.positional(2)
221  def __init__(self,
222               service_url,
223               protocol=protobuf):
224    """Constructor.
225
226    Args:
227      service_url: URL where the service is located.  All communication via
228        the transport will go to this URL.
229      protocol: The protocol implementation.  Must implement encode_message and
230        decode_message.  Can also be an instance of remote.ProtocolConfig.
231    """
232    super(HttpTransport, self).__init__(protocol=protocol)
233    self.__service_url = service_url
234
235  def __get_rpc_status(self, response, content):
236    """Get RPC status from HTTP response.
237
238    Args:
239      response: HTTPResponse object.
240      content: Content read from HTTP response.
241
242    Returns:
243      RpcStatus object parsed from response, else an RpcStatus with a generic
244      HTTP error.
245    """
246    # Status above 400 may have RpcStatus content.
247    if response.status >= 400:
248      content_type = response.getheader('content-type')
249      if content_type == self.protocol_config.default_content_type:
250        try:
251          rpc_status = self.protocol.decode_message(remote.RpcStatus, content)
252        except Exception as decode_err:
253          logging.warning(
254            'An error occurred trying to parse status: %s\n%s',
255            str(decode_err), content)
256        else:
257          if rpc_status.is_initialized():
258            return rpc_status
259          else:
260            logging.warning(
261              'Body does not result in an initialized RpcStatus message:\n%s',
262              content)
263
264    # If no RpcStatus message present, attempt to forward any content.  If empty
265    # use standard error message.
266    if not content.strip():
267      content = six.moves.http_client.responses.get(response.status, 'Unknown Error')
268    return remote.RpcStatus(state=remote.RpcState.SERVER_ERROR,
269                            error_message='HTTP Error %s: %s' % (
270                              response.status, content or 'Unknown Error'))
271
272  def __set_response(self, remote_info, connection, rpc):
273    """Set response on RPC.
274
275    Sets response or status from HTTP request.  Implements the wait method of
276    Rpc instance.
277
278    Args:
279      remote_info: Remote info for invoked RPC.
280      connection: HTTPConnection that is making request.
281      rpc: Rpc instance.
282    """
283    try:
284      response = connection.getresponse()
285
286      content = response.read()
287
288      if response.status == six.moves.http_client.OK:
289        response = self.protocol.decode_message(remote_info.response_type,
290                                                content)
291        rpc.set_response(response)
292      else:
293        status = self.__get_rpc_status(response, content)
294        rpc.set_status(status)
295    finally:
296      connection.close()
297
298  def _start_rpc(self, remote_info, request):
299    """Start a remote procedure call.
300
301    Args:
302      remote_info: A RemoteInfo instance for this RPC.
303      request: The request message for this RPC.
304
305    Returns:
306      An Rpc instance initialized with a Request.
307    """
308    method_url = '%s.%s' % (self.__service_url, remote_info.method.__name__)
309    encoded_request = self.protocol.encode_message(request)
310
311    url = urlparse.urlparse(method_url)
312    if url.scheme == 'https':
313      connection_type = six.moves.http_client.HTTPSConnection
314    else:
315      connection_type = six.moves.http_client.HTTPConnection
316    connection = connection_type(url.hostname, url.port)
317    try:
318      self._send_http_request(connection, url.path, encoded_request)
319      rpc = Rpc(request)
320    except remote.RpcError:
321      # Pass through all ProtoRPC errors
322      connection.close()
323      raise
324    except socket.error as err:
325      connection.close()
326      raise remote.NetworkError('Socket error: %s %r' % (type(err).__name__,
327                                                         err.args),
328                                err)
329    except Exception as err:
330      connection.close()
331      raise remote.NetworkError('Error communicating with HTTP server',
332                                err)
333    else:
334      wait_impl = lambda: self.__set_response(remote_info, connection, rpc)
335      rpc._wait_impl = wait_impl
336
337      return rpc
338
339  def _send_http_request(self, connection, http_path, encoded_request):
340    connection.request(
341        'POST',
342        http_path,
343        encoded_request,
344        headers={'Content-type': self.protocol_config.default_content_type,
345                 'Content-length': len(encoded_request)})
346
347
348class LocalTransport(Transport):
349  """Local transport that sends messages directly to services.
350
351  Useful in tests or creating code that can work with either local or remote
352  services.  Using LocalTransport is preferrable to simply instantiating a
353  single instance of a service and reusing it.  The entire request process
354  involves instantiating a new instance of a service, initializing it with
355  request state and then invoking the remote method for every request.
356  """
357
358  def __init__(self, service_factory):
359    """Constructor.
360
361    Args:
362      service_factory: Service factory or class.
363    """
364    super(LocalTransport, self).__init__()
365    self.__service_class = getattr(service_factory,
366                                   'service_class',
367                                   service_factory)
368    self.__service_factory = service_factory
369
370  @property
371  def service_class(self):
372    return self.__service_class
373
374  @property
375  def service_factory(self):
376    return self.__service_factory
377
378  def _start_rpc(self, remote_info, request):
379    """Start a remote procedure call.
380
381    Args:
382      remote_info: RemoteInfo instance describing remote method.
383      request: Request message to send to service.
384
385    Returns:
386      An Rpc instance initialized with the request.
387    """
388    rpc = Rpc(request)
389    def wait_impl():
390      instance = self.__service_factory()
391      try:
392        initalize_request_state = instance.initialize_request_state
393      except AttributeError:
394        pass
395      else:
396        host = six.text_type(os.uname()[1])
397        initalize_request_state(remote.RequestState(remote_host=host,
398                                                    remote_address=u'127.0.0.1',
399                                                    server_host=host,
400                                                    server_port=-1))
401      try:
402        response = remote_info.method(instance, request)
403        assert isinstance(response, remote_info.response_type)
404      except remote.ApplicationError:
405        raise
406      except:
407        exc_type, exc_value, traceback = sys.exc_info()
408        message = 'Unexpected error %s: %s' % (exc_type.__name__, exc_value)
409        six.reraise(remote.ServerError, message, traceback)
410      rpc.set_response(response)
411    rpc._wait_impl = wait_impl
412    return rpc
413