1# Copyright 2014 Google Inc. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Channel notifications support.
16
17Classes and functions to support channel subscriptions and notifications
18on those channels.
19
20Notes:
21  - This code is based on experimental APIs and is subject to change.
22  - Notification does not do deduplication of notification ids, that's up to
23    the receiver.
24  - Storing the Channel between calls is up to the caller.
25
26
27Example setting up a channel:
28
29  # Create a new channel that gets notifications via webhook.
30  channel = new_webhook_channel("https://example.com/my_web_hook")
31
32  # Store the channel, keyed by 'channel.id'. Store it before calling the
33  # watch method because notifications may start arriving before the watch
34  # method returns.
35  ...
36
37  resp = service.objects().watchAll(
38    bucket="some_bucket_id", body=channel.body()).execute()
39  channel.update(resp)
40
41  # Store the channel, keyed by 'channel.id'. Store it after being updated
42  # since the resource_id value will now be correct, and that's needed to
43  # stop a subscription.
44  ...
45
46
47An example Webhook implementation using webapp2. Note that webapp2 puts
48headers in a case insensitive dictionary, as headers aren't guaranteed to
49always be upper case.
50
51  id = self.request.headers[X_GOOG_CHANNEL_ID]
52
53  # Retrieve the channel by id.
54  channel = ...
55
56  # Parse notification from the headers, including validating the id.
57  n = notification_from_headers(channel, self.request.headers)
58
59  # Do app specific stuff with the notification here.
60  if n.resource_state == 'sync':
61    # Code to handle sync state.
62  elif n.resource_state == 'exists':
63    # Code to handle the exists state.
64  elif n.resource_state == 'not_exists':
65    # Code to handle the not exists state.
66
67
68Example of unsubscribing.
69
70  service.channels().stop(channel.body()).execute()
71"""
72from __future__ import absolute_import
73
74import datetime
75import uuid
76
77from googleapiclient import errors
78from googleapiclient import _helpers as util
79import six
80
81
82# The unix time epoch starts at midnight 1970.
83EPOCH = datetime.datetime.utcfromtimestamp(0)
84
85# Map the names of the parameters in the JSON channel description to
86# the parameter names we use in the Channel class.
87CHANNEL_PARAMS = {
88    "address": "address",
89    "id": "id",
90    "expiration": "expiration",
91    "params": "params",
92    "resourceId": "resource_id",
93    "resourceUri": "resource_uri",
94    "type": "type",
95    "token": "token",
96}
97
98X_GOOG_CHANNEL_ID = "X-GOOG-CHANNEL-ID"
99X_GOOG_MESSAGE_NUMBER = "X-GOOG-MESSAGE-NUMBER"
100X_GOOG_RESOURCE_STATE = "X-GOOG-RESOURCE-STATE"
101X_GOOG_RESOURCE_URI = "X-GOOG-RESOURCE-URI"
102X_GOOG_RESOURCE_ID = "X-GOOG-RESOURCE-ID"
103
104
105def _upper_header_keys(headers):
106    new_headers = {}
107    for k, v in six.iteritems(headers):
108        new_headers[k.upper()] = v
109    return new_headers
110
111
112class Notification(object):
113    """A Notification from a Channel.
114
115  Notifications are not usually constructed directly, but are returned
116  from functions like notification_from_headers().
117
118  Attributes:
119    message_number: int, The unique id number of this notification.
120    state: str, The state of the resource being monitored.
121    uri: str, The address of the resource being monitored.
122    resource_id: str, The unique identifier of the version of the resource at
123      this event.
124  """
125
126    @util.positional(5)
127    def __init__(self, message_number, state, resource_uri, resource_id):
128        """Notification constructor.
129
130    Args:
131      message_number: int, The unique id number of this notification.
132      state: str, The state of the resource being monitored. Can be one
133        of "exists", "not_exists", or "sync".
134      resource_uri: str, The address of the resource being monitored.
135      resource_id: str, The identifier of the watched resource.
136    """
137        self.message_number = message_number
138        self.state = state
139        self.resource_uri = resource_uri
140        self.resource_id = resource_id
141
142
143class Channel(object):
144    """A Channel for notifications.
145
146  Usually not constructed directly, instead it is returned from helper
147  functions like new_webhook_channel().
148
149  Attributes:
150    type: str, The type of delivery mechanism used by this channel. For
151      example, 'web_hook'.
152    id: str, A UUID for the channel.
153    token: str, An arbitrary string associated with the channel that
154      is delivered to the target address with each event delivered
155      over this channel.
156    address: str, The address of the receiving entity where events are
157      delivered. Specific to the channel type.
158    expiration: int, The time, in milliseconds from the epoch, when this
159      channel will expire.
160    params: dict, A dictionary of string to string, with additional parameters
161      controlling delivery channel behavior.
162    resource_id: str, An opaque id that identifies the resource that is
163      being watched. Stable across different API versions.
164    resource_uri: str, The canonicalized ID of the watched resource.
165  """
166
167    @util.positional(5)
168    def __init__(
169        self,
170        type,
171        id,
172        token,
173        address,
174        expiration=None,
175        params=None,
176        resource_id="",
177        resource_uri="",
178    ):
179        """Create a new Channel.
180
181    In user code, this Channel constructor will not typically be called
182    manually since there are functions for creating channels for each specific
183    type with a more customized set of arguments to pass.
184
185    Args:
186      type: str, The type of delivery mechanism used by this channel. For
187        example, 'web_hook'.
188      id: str, A UUID for the channel.
189      token: str, An arbitrary string associated with the channel that
190        is delivered to the target address with each event delivered
191        over this channel.
192      address: str,  The address of the receiving entity where events are
193        delivered. Specific to the channel type.
194      expiration: int, The time, in milliseconds from the epoch, when this
195        channel will expire.
196      params: dict, A dictionary of string to string, with additional parameters
197        controlling delivery channel behavior.
198      resource_id: str, An opaque id that identifies the resource that is
199        being watched. Stable across different API versions.
200      resource_uri: str, The canonicalized ID of the watched resource.
201    """
202        self.type = type
203        self.id = id
204        self.token = token
205        self.address = address
206        self.expiration = expiration
207        self.params = params
208        self.resource_id = resource_id
209        self.resource_uri = resource_uri
210
211    def body(self):
212        """Build a body from the Channel.
213
214    Constructs a dictionary that's appropriate for passing into watch()
215    methods as the value of body argument.
216
217    Returns:
218      A dictionary representation of the channel.
219    """
220        result = {
221            "id": self.id,
222            "token": self.token,
223            "type": self.type,
224            "address": self.address,
225        }
226        if self.params:
227            result["params"] = self.params
228        if self.resource_id:
229            result["resourceId"] = self.resource_id
230        if self.resource_uri:
231            result["resourceUri"] = self.resource_uri
232        if self.expiration:
233            result["expiration"] = self.expiration
234
235        return result
236
237    def update(self, resp):
238        """Update a channel with information from the response of watch().
239
240    When a request is sent to watch() a resource, the response returned
241    from the watch() request is a dictionary with updated channel information,
242    such as the resource_id, which is needed when stopping a subscription.
243
244    Args:
245      resp: dict, The response from a watch() method.
246    """
247        for json_name, param_name in six.iteritems(CHANNEL_PARAMS):
248            value = resp.get(json_name)
249            if value is not None:
250                setattr(self, param_name, value)
251
252
253def notification_from_headers(channel, headers):
254    """Parse a notification from the webhook request headers, validate
255    the notification, and return a Notification object.
256
257  Args:
258    channel: Channel, The channel that the notification is associated with.
259    headers: dict, A dictionary like object that contains the request headers
260      from the webhook HTTP request.
261
262  Returns:
263    A Notification object.
264
265  Raises:
266    errors.InvalidNotificationError if the notification is invalid.
267    ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int.
268  """
269    headers = _upper_header_keys(headers)
270    channel_id = headers[X_GOOG_CHANNEL_ID]
271    if channel.id != channel_id:
272        raise errors.InvalidNotificationError(
273            "Channel id mismatch: %s != %s" % (channel.id, channel_id)
274        )
275    else:
276        message_number = int(headers[X_GOOG_MESSAGE_NUMBER])
277        state = headers[X_GOOG_RESOURCE_STATE]
278        resource_uri = headers[X_GOOG_RESOURCE_URI]
279        resource_id = headers[X_GOOG_RESOURCE_ID]
280        return Notification(message_number, state, resource_uri, resource_id)
281
282
283@util.positional(2)
284def new_webhook_channel(url, token=None, expiration=None, params=None):
285    """Create a new webhook Channel.
286
287    Args:
288      url: str, URL to post notifications to.
289      token: str, An arbitrary string associated with the channel that
290        is delivered to the target address with each notification delivered
291        over this channel.
292      expiration: datetime.datetime, A time in the future when the channel
293        should expire. Can also be None if the subscription should use the
294        default expiration. Note that different services may have different
295        limits on how long a subscription lasts. Check the response from the
296        watch() method to see the value the service has set for an expiration
297        time.
298      params: dict, Extra parameters to pass on channel creation. Currently
299        not used for webhook channels.
300    """
301    expiration_ms = 0
302    if expiration:
303        delta = expiration - EPOCH
304        expiration_ms = (
305            delta.microseconds / 1000 + (delta.seconds + delta.days * 24 * 3600) * 1000
306        )
307        if expiration_ms < 0:
308            expiration_ms = 0
309
310    return Channel(
311        "web_hook",
312        str(uuid.uuid4()),
313        token,
314        url,
315        expiration=expiration_ms,
316        params=params,
317    )
318