1"""Channel notifications support.
2
3Classes and functions to support channel subscriptions and notifications
4on those channels.
5
6Notes:
7  - This code is based on experimental APIs and is subject to change.
8  - Notification does not do deduplication of notification ids, that's up to
9    the receiver.
10  - Storing the Channel between calls is up to the caller.
11
12
13Example setting up a channel:
14
15  # Create a new channel that gets notifications via webhook.
16  channel = new_webhook_channel("https://example.com/my_web_hook")
17
18  # Store the channel, keyed by 'channel.id'. Store it before calling the
19  # watch method because notifications may start arriving before the watch
20  # method returns.
21  ...
22
23  resp = service.objects().watchAll(
24    bucket="some_bucket_id", body=channel.body()).execute()
25  channel.update(resp)
26
27  # Store the channel, keyed by 'channel.id'. Store it after being updated
28  # since the resource_id value will now be correct, and that's needed to
29  # stop a subscription.
30  ...
31
32
33An example Webhook implementation using webapp2. Note that webapp2 puts
34headers in a case insensitive dictionary, as headers aren't guaranteed to
35always be upper case.
36
37  id = self.request.headers[X_GOOG_CHANNEL_ID]
38
39  # Retrieve the channel by id.
40  channel = ...
41
42  # Parse notification from the headers, including validating the id.
43  n = notification_from_headers(channel, self.request.headers)
44
45  # Do app specific stuff with the notification here.
46  if n.resource_state == 'sync':
47    # Code to handle sync state.
48  elif n.resource_state == 'exists':
49    # Code to handle the exists state.
50  elif n.resource_state == 'not_exists':
51    # Code to handle the not exists state.
52
53
54Example of unsubscribing.
55
56  service.channels().stop(channel.body())
57"""
58from __future__ import absolute_import
59
60import datetime
61import uuid
62
63from googleapiclient import errors
64from oauth2client import util
65import six
66
67
68# The unix time epoch starts at midnight 1970.
69EPOCH = datetime.datetime.utcfromtimestamp(0)
70
71# Map the names of the parameters in the JSON channel description to
72# the parameter names we use in the Channel class.
73CHANNEL_PARAMS = {
74    'address': 'address',
75    'id': 'id',
76    'expiration': 'expiration',
77    'params': 'params',
78    'resourceId': 'resource_id',
79    'resourceUri': 'resource_uri',
80    'type': 'type',
81    'token': 'token',
82    }
83
84X_GOOG_CHANNEL_ID     = 'X-GOOG-CHANNEL-ID'
85X_GOOG_MESSAGE_NUMBER = 'X-GOOG-MESSAGE-NUMBER'
86X_GOOG_RESOURCE_STATE = 'X-GOOG-RESOURCE-STATE'
87X_GOOG_RESOURCE_URI   = 'X-GOOG-RESOURCE-URI'
88X_GOOG_RESOURCE_ID    = 'X-GOOG-RESOURCE-ID'
89
90
91def _upper_header_keys(headers):
92  new_headers = {}
93  for k, v in six.iteritems(headers):
94    new_headers[k.upper()] = v
95  return new_headers
96
97
98class Notification(object):
99  """A Notification from a Channel.
100
101  Notifications are not usually constructed directly, but are returned
102  from functions like notification_from_headers().
103
104  Attributes:
105    message_number: int, The unique id number of this notification.
106    state: str, The state of the resource being monitored.
107    uri: str, The address of the resource being monitored.
108    resource_id: str, The unique identifier of the version of the resource at
109      this event.
110  """
111  @util.positional(5)
112  def __init__(self, message_number, state, resource_uri, resource_id):
113    """Notification constructor.
114
115    Args:
116      message_number: int, The unique id number of this notification.
117      state: str, The state of the resource being monitored. Can be one
118        of "exists", "not_exists", or "sync".
119      resource_uri: str, The address of the resource being monitored.
120      resource_id: str, The identifier of the watched resource.
121    """
122    self.message_number = message_number
123    self.state = state
124    self.resource_uri = resource_uri
125    self.resource_id = resource_id
126
127
128class Channel(object):
129  """A Channel for notifications.
130
131  Usually not constructed directly, instead it is returned from helper
132  functions like new_webhook_channel().
133
134  Attributes:
135    type: str, The type of delivery mechanism used by this channel. For
136      example, 'web_hook'.
137    id: str, A UUID for the channel.
138    token: str, An arbitrary string associated with the channel that
139      is delivered to the target address with each event delivered
140      over this channel.
141    address: str, The address of the receiving entity where events are
142      delivered. Specific to the channel type.
143    expiration: int, The time, in milliseconds from the epoch, when this
144      channel will expire.
145    params: dict, A dictionary of string to string, with additional parameters
146      controlling delivery channel behavior.
147    resource_id: str, An opaque id that identifies the resource that is
148      being watched. Stable across different API versions.
149    resource_uri: str, The canonicalized ID of the watched resource.
150  """
151
152  @util.positional(5)
153  def __init__(self, type, id, token, address, expiration=None,
154               params=None, resource_id="", resource_uri=""):
155    """Create a new Channel.
156
157    In user code, this Channel constructor will not typically be called
158    manually since there are functions for creating channels for each specific
159    type with a more customized set of arguments to pass.
160
161    Args:
162      type: str, The type of delivery mechanism used by this channel. For
163        example, 'web_hook'.
164      id: str, A UUID for the channel.
165      token: str, An arbitrary string associated with the channel that
166        is delivered to the target address with each event delivered
167        over this channel.
168      address: str,  The address of the receiving entity where events are
169        delivered. Specific to the channel type.
170      expiration: int, The time, in milliseconds from the epoch, when this
171        channel will expire.
172      params: dict, A dictionary of string to string, with additional parameters
173        controlling delivery channel behavior.
174      resource_id: str, An opaque id that identifies the resource that is
175        being watched. Stable across different API versions.
176      resource_uri: str, The canonicalized ID of the watched resource.
177    """
178    self.type = type
179    self.id = id
180    self.token = token
181    self.address = address
182    self.expiration = expiration
183    self.params = params
184    self.resource_id = resource_id
185    self.resource_uri = resource_uri
186
187  def body(self):
188    """Build a body from the Channel.
189
190    Constructs a dictionary that's appropriate for passing into watch()
191    methods as the value of body argument.
192
193    Returns:
194      A dictionary representation of the channel.
195    """
196    result = {
197        'id': self.id,
198        'token': self.token,
199        'type': self.type,
200        'address': self.address
201        }
202    if self.params:
203      result['params'] = self.params
204    if self.resource_id:
205      result['resourceId'] = self.resource_id
206    if self.resource_uri:
207      result['resourceUri'] = self.resource_uri
208    if self.expiration:
209      result['expiration'] = self.expiration
210
211    return result
212
213  def update(self, resp):
214    """Update a channel with information from the response of watch().
215
216    When a request is sent to watch() a resource, the response returned
217    from the watch() request is a dictionary with updated channel information,
218    such as the resource_id, which is needed when stopping a subscription.
219
220    Args:
221      resp: dict, The response from a watch() method.
222    """
223    for json_name, param_name in six.iteritems(CHANNEL_PARAMS):
224      value = resp.get(json_name)
225      if value is not None:
226        setattr(self, param_name, value)
227
228
229def notification_from_headers(channel, headers):
230  """Parse a notification from the webhook request headers, validate
231    the notification, and return a Notification object.
232
233  Args:
234    channel: Channel, The channel that the notification is associated with.
235    headers: dict, A dictionary like object that contains the request headers
236      from the webhook HTTP request.
237
238  Returns:
239    A Notification object.
240
241  Raises:
242    errors.InvalidNotificationError if the notification is invalid.
243    ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int.
244  """
245  headers = _upper_header_keys(headers)
246  channel_id = headers[X_GOOG_CHANNEL_ID]
247  if channel.id != channel_id:
248    raise errors.InvalidNotificationError(
249        'Channel id mismatch: %s != %s' % (channel.id, channel_id))
250  else:
251    message_number = int(headers[X_GOOG_MESSAGE_NUMBER])
252    state = headers[X_GOOG_RESOURCE_STATE]
253    resource_uri = headers[X_GOOG_RESOURCE_URI]
254    resource_id = headers[X_GOOG_RESOURCE_ID]
255    return Notification(message_number, state, resource_uri, resource_id)
256
257
258@util.positional(2)
259def new_webhook_channel(url, token=None, expiration=None, params=None):
260    """Create a new webhook Channel.
261
262    Args:
263      url: str, URL to post notifications to.
264      token: str, An arbitrary string associated with the channel that
265        is delivered to the target address with each notification delivered
266        over this channel.
267      expiration: datetime.datetime, A time in the future when the channel
268        should expire. Can also be None if the subscription should use the
269        default expiration. Note that different services may have different
270        limits on how long a subscription lasts. Check the response from the
271        watch() method to see the value the service has set for an expiration
272        time.
273      params: dict, Extra parameters to pass on channel creation. Currently
274        not used for webhook channels.
275    """
276    expiration_ms = 0
277    if expiration:
278      delta = expiration - EPOCH
279      expiration_ms = delta.microseconds/1000 + (
280          delta.seconds + delta.days*24*3600)*1000
281      if expiration_ms < 0:
282        expiration_ms = 0
283
284    return Channel('web_hook', str(uuid.uuid4()),
285                   token, url, expiration=expiration_ms,
286                   params=params)
287
288