1# Copyright 2014 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Channel notifications support. 16 17Classes and functions to support channel subscriptions and notifications 18on those channels. 19 20Notes: 21 - This code is based on experimental APIs and is subject to change. 22 - Notification does not do deduplication of notification ids, that's up to 23 the receiver. 24 - Storing the Channel between calls is up to the caller. 25 26 27Example setting up a channel: 28 29 # Create a new channel that gets notifications via webhook. 30 channel = new_webhook_channel("https://example.com/my_web_hook") 31 32 # Store the channel, keyed by 'channel.id'. Store it before calling the 33 # watch method because notifications may start arriving before the watch 34 # method returns. 35 ... 36 37 resp = service.objects().watchAll( 38 bucket="some_bucket_id", body=channel.body()).execute() 39 channel.update(resp) 40 41 # Store the channel, keyed by 'channel.id'. Store it after being updated 42 # since the resource_id value will now be correct, and that's needed to 43 # stop a subscription. 44 ... 45 46 47An example Webhook implementation using webapp2. Note that webapp2 puts 48headers in a case insensitive dictionary, as headers aren't guaranteed to 49always be upper case. 50 51 id = self.request.headers[X_GOOG_CHANNEL_ID] 52 53 # Retrieve the channel by id. 54 channel = ... 55 56 # Parse notification from the headers, including validating the id. 57 n = notification_from_headers(channel, self.request.headers) 58 59 # Do app specific stuff with the notification here. 60 if n.resource_state == 'sync': 61 # Code to handle sync state. 62 elif n.resource_state == 'exists': 63 # Code to handle the exists state. 64 elif n.resource_state == 'not_exists': 65 # Code to handle the not exists state. 66 67 68Example of unsubscribing. 69 70 service.channels().stop(channel.body()).execute() 71""" 72from __future__ import absolute_import 73 74import datetime 75import uuid 76 77from googleapiclient import errors 78from googleapiclient import _helpers as util 79import six 80 81 82# The unix time epoch starts at midnight 1970. 83EPOCH = datetime.datetime.utcfromtimestamp(0) 84 85# Map the names of the parameters in the JSON channel description to 86# the parameter names we use in the Channel class. 87CHANNEL_PARAMS = { 88 "address": "address", 89 "id": "id", 90 "expiration": "expiration", 91 "params": "params", 92 "resourceId": "resource_id", 93 "resourceUri": "resource_uri", 94 "type": "type", 95 "token": "token", 96} 97 98X_GOOG_CHANNEL_ID = "X-GOOG-CHANNEL-ID" 99X_GOOG_MESSAGE_NUMBER = "X-GOOG-MESSAGE-NUMBER" 100X_GOOG_RESOURCE_STATE = "X-GOOG-RESOURCE-STATE" 101X_GOOG_RESOURCE_URI = "X-GOOG-RESOURCE-URI" 102X_GOOG_RESOURCE_ID = "X-GOOG-RESOURCE-ID" 103 104 105def _upper_header_keys(headers): 106 new_headers = {} 107 for k, v in six.iteritems(headers): 108 new_headers[k.upper()] = v 109 return new_headers 110 111 112class Notification(object): 113 """A Notification from a Channel. 114 115 Notifications are not usually constructed directly, but are returned 116 from functions like notification_from_headers(). 117 118 Attributes: 119 message_number: int, The unique id number of this notification. 120 state: str, The state of the resource being monitored. 121 uri: str, The address of the resource being monitored. 122 resource_id: str, The unique identifier of the version of the resource at 123 this event. 124 """ 125 126 @util.positional(5) 127 def __init__(self, message_number, state, resource_uri, resource_id): 128 """Notification constructor. 129 130 Args: 131 message_number: int, The unique id number of this notification. 132 state: str, The state of the resource being monitored. Can be one 133 of "exists", "not_exists", or "sync". 134 resource_uri: str, The address of the resource being monitored. 135 resource_id: str, The identifier of the watched resource. 136 """ 137 self.message_number = message_number 138 self.state = state 139 self.resource_uri = resource_uri 140 self.resource_id = resource_id 141 142 143class Channel(object): 144 """A Channel for notifications. 145 146 Usually not constructed directly, instead it is returned from helper 147 functions like new_webhook_channel(). 148 149 Attributes: 150 type: str, The type of delivery mechanism used by this channel. For 151 example, 'web_hook'. 152 id: str, A UUID for the channel. 153 token: str, An arbitrary string associated with the channel that 154 is delivered to the target address with each event delivered 155 over this channel. 156 address: str, The address of the receiving entity where events are 157 delivered. Specific to the channel type. 158 expiration: int, The time, in milliseconds from the epoch, when this 159 channel will expire. 160 params: dict, A dictionary of string to string, with additional parameters 161 controlling delivery channel behavior. 162 resource_id: str, An opaque id that identifies the resource that is 163 being watched. Stable across different API versions. 164 resource_uri: str, The canonicalized ID of the watched resource. 165 """ 166 167 @util.positional(5) 168 def __init__( 169 self, 170 type, 171 id, 172 token, 173 address, 174 expiration=None, 175 params=None, 176 resource_id="", 177 resource_uri="", 178 ): 179 """Create a new Channel. 180 181 In user code, this Channel constructor will not typically be called 182 manually since there are functions for creating channels for each specific 183 type with a more customized set of arguments to pass. 184 185 Args: 186 type: str, The type of delivery mechanism used by this channel. For 187 example, 'web_hook'. 188 id: str, A UUID for the channel. 189 token: str, An arbitrary string associated with the channel that 190 is delivered to the target address with each event delivered 191 over this channel. 192 address: str, The address of the receiving entity where events are 193 delivered. Specific to the channel type. 194 expiration: int, The time, in milliseconds from the epoch, when this 195 channel will expire. 196 params: dict, A dictionary of string to string, with additional parameters 197 controlling delivery channel behavior. 198 resource_id: str, An opaque id that identifies the resource that is 199 being watched. Stable across different API versions. 200 resource_uri: str, The canonicalized ID of the watched resource. 201 """ 202 self.type = type 203 self.id = id 204 self.token = token 205 self.address = address 206 self.expiration = expiration 207 self.params = params 208 self.resource_id = resource_id 209 self.resource_uri = resource_uri 210 211 def body(self): 212 """Build a body from the Channel. 213 214 Constructs a dictionary that's appropriate for passing into watch() 215 methods as the value of body argument. 216 217 Returns: 218 A dictionary representation of the channel. 219 """ 220 result = { 221 "id": self.id, 222 "token": self.token, 223 "type": self.type, 224 "address": self.address, 225 } 226 if self.params: 227 result["params"] = self.params 228 if self.resource_id: 229 result["resourceId"] = self.resource_id 230 if self.resource_uri: 231 result["resourceUri"] = self.resource_uri 232 if self.expiration: 233 result["expiration"] = self.expiration 234 235 return result 236 237 def update(self, resp): 238 """Update a channel with information from the response of watch(). 239 240 When a request is sent to watch() a resource, the response returned 241 from the watch() request is a dictionary with updated channel information, 242 such as the resource_id, which is needed when stopping a subscription. 243 244 Args: 245 resp: dict, The response from a watch() method. 246 """ 247 for json_name, param_name in six.iteritems(CHANNEL_PARAMS): 248 value = resp.get(json_name) 249 if value is not None: 250 setattr(self, param_name, value) 251 252 253def notification_from_headers(channel, headers): 254 """Parse a notification from the webhook request headers, validate 255 the notification, and return a Notification object. 256 257 Args: 258 channel: Channel, The channel that the notification is associated with. 259 headers: dict, A dictionary like object that contains the request headers 260 from the webhook HTTP request. 261 262 Returns: 263 A Notification object. 264 265 Raises: 266 errors.InvalidNotificationError if the notification is invalid. 267 ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int. 268 """ 269 headers = _upper_header_keys(headers) 270 channel_id = headers[X_GOOG_CHANNEL_ID] 271 if channel.id != channel_id: 272 raise errors.InvalidNotificationError( 273 "Channel id mismatch: %s != %s" % (channel.id, channel_id) 274 ) 275 else: 276 message_number = int(headers[X_GOOG_MESSAGE_NUMBER]) 277 state = headers[X_GOOG_RESOURCE_STATE] 278 resource_uri = headers[X_GOOG_RESOURCE_URI] 279 resource_id = headers[X_GOOG_RESOURCE_ID] 280 return Notification(message_number, state, resource_uri, resource_id) 281 282 283@util.positional(2) 284def new_webhook_channel(url, token=None, expiration=None, params=None): 285 """Create a new webhook Channel. 286 287 Args: 288 url: str, URL to post notifications to. 289 token: str, An arbitrary string associated with the channel that 290 is delivered to the target address with each notification delivered 291 over this channel. 292 expiration: datetime.datetime, A time in the future when the channel 293 should expire. Can also be None if the subscription should use the 294 default expiration. Note that different services may have different 295 limits on how long a subscription lasts. Check the response from the 296 watch() method to see the value the service has set for an expiration 297 time. 298 params: dict, Extra parameters to pass on channel creation. Currently 299 not used for webhook channels. 300 """ 301 expiration_ms = 0 302 if expiration: 303 delta = expiration - EPOCH 304 expiration_ms = ( 305 delta.microseconds / 1000 + (delta.seconds + delta.days * 24 * 3600) * 1000 306 ) 307 if expiration_ms < 0: 308 expiration_ms = 0 309 310 return Channel( 311 "web_hook", 312 str(uuid.uuid4()), 313 token, 314 url, 315 expiration=expiration_ms, 316 params=params, 317 ) 318