1#!/usr/bin/env python 2# 3# Copyright 2010 Google Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Datastore models used by the Google App Engine Pipeline API.""" 18 19from google.appengine.ext import db 20from google.appengine.ext import blobstore 21 22try: 23 import json 24except ImportError: 25 import simplejson as json 26 27# Relative imports 28import util 29 30 31class _PipelineRecord(db.Model): 32 """Represents a Pipeline. 33 34 Key name is a randomly assigned UUID. No parent entity. 35 36 Properties: 37 class_path: Path of the Python class to use for this pipeline. 38 root_pipeline: The root of the whole workflow; set to itself this pipeline 39 is its own root. 40 fanned_out: List of child _PipelineRecords that were started when this 41 generator pipeline moved from WAITING to RUN. 42 start_time: For pipelines with no start _BarrierRecord, when this pipeline 43 was enqueued to run immediately. 44 finalized_time: When this pipeline moved from WAITING or RUN to DONE. 45 params: Serialized parameter dictionary. 46 status: The current status of the pipeline. 47 current_attempt: The current attempt (starting at 0) to run. 48 max_attempts: Maximum number of attempts (starting at 0) to run. 49 next_retry_time: ETA of the next retry attempt. 50 retry_message: Why the last attempt failed; None or empty if no message. 51 52 Root pipeline properties: 53 is_root_pipeline: This is a root pipeline. 54 abort_message: Why the whole pipeline was aborted; only saved on 55 root pipelines. 56 abort_requested: If an abort signal has been requested for this root 57 pipeline; only saved on root pipelines 58 """ 59 60 WAITING = 'waiting' 61 RUN = 'run' 62 DONE = 'done' 63 ABORTED = 'aborted' 64 65 class_path = db.StringProperty() 66 root_pipeline = db.SelfReferenceProperty( 67 collection_name='child_pipelines_set') 68 fanned_out = db.ListProperty(db.Key, indexed=False) 69 start_time = db.DateTimeProperty(indexed=True) 70 finalized_time = db.DateTimeProperty(indexed=False) 71 72 # One of these two will be set, depending on the size of the params. 73 params_text = db.TextProperty(name='params') 74 params_blob = blobstore.BlobReferenceProperty( 75 name='params_blob', indexed=False) 76 77 status = db.StringProperty(choices=(WAITING, RUN, DONE, ABORTED), 78 default=WAITING) 79 80 # Retry behavior 81 current_attempt = db.IntegerProperty(default=0, indexed=False) 82 max_attempts = db.IntegerProperty(default=1, indexed=False) 83 next_retry_time = db.DateTimeProperty(indexed=False) 84 retry_message = db.TextProperty() 85 86 # Root pipeline properties 87 is_root_pipeline = db.BooleanProperty() 88 abort_message = db.TextProperty() 89 abort_requested = db.BooleanProperty(indexed=False) 90 91 @classmethod 92 def kind(cls): 93 return '_AE_Pipeline_Record' 94 95 @property 96 def params(self): 97 """Returns the dictionary of parameters for this Pipeline.""" 98 if hasattr(self, '_params_decoded'): 99 return self._params_decoded 100 101 if self.params_blob is not None: 102 value_encoded = self.params_blob.open().read() 103 else: 104 value_encoded = self.params_text 105 106 value = json.loads(value_encoded, cls=util.JsonDecoder) 107 if isinstance(value, dict): 108 kwargs = value.get('kwargs') 109 if kwargs: 110 adjusted_kwargs = {} 111 for arg_key, arg_value in kwargs.iteritems(): 112 # Python only allows non-unicode strings as keyword arguments. 113 adjusted_kwargs[str(arg_key)] = arg_value 114 value['kwargs'] = adjusted_kwargs 115 116 self._params_decoded = value 117 return self._params_decoded 118 119 120class _SlotRecord(db.Model): 121 """Represents an output slot. 122 123 Key name is a randomly assigned UUID. No parent for slots of child pipelines. 124 For the outputs of root pipelines, the parent entity is the root 125 _PipelineRecord (see Pipeline.start()). 126 127 Properties: 128 root_pipeline: The root of the workflow. 129 filler: The pipeline that filled this slot. 130 value: Serialized value for this slot. 131 status: The current status of the slot. 132 fill_time: When the slot was filled by the filler. 133 """ 134 135 FILLED = 'filled' 136 WAITING = 'waiting' 137 138 root_pipeline = db.ReferenceProperty(_PipelineRecord) 139 filler = db.ReferenceProperty(_PipelineRecord, 140 collection_name='filled_slots_set') 141 142 # One of these two will be set, depending on the size of the value. 143 value_text = db.TextProperty(name='value') 144 value_blob = blobstore.BlobReferenceProperty( 145 name='value_blob', indexed=False) 146 147 status = db.StringProperty(choices=(FILLED, WAITING), default=WAITING, 148 indexed=False) 149 fill_time = db.DateTimeProperty(indexed=False) 150 151 @classmethod 152 def kind(cls): 153 return '_AE_Pipeline_Slot' 154 155 @property 156 def value(self): 157 """Returns the value of this Slot.""" 158 if hasattr(self, '_value_decoded'): 159 return self._value_decoded 160 161 if self.value_blob is not None: 162 encoded_value = self.value_blob.open().read() 163 else: 164 encoded_value = self.value_text 165 166 self._value_decoded = json.loads(encoded_value, cls=util.JsonDecoder) 167 return self._value_decoded 168 169 170class _BarrierRecord(db.Model): 171 """Represents a barrier. 172 173 Key name is the purpose of the barrier (START or FINALIZE). Parent entity 174 is the _PipelineRecord the barrier should trigger when all of its 175 blocking_slots are filled. 176 177 Properties: 178 root_pipeline: The root of the workflow. 179 target: The pipeline to run when the barrier fires. 180 blocking_slots: The slots that must be filled before this barrier fires. 181 trigger_time: When this barrier fired. 182 status: The current status of the barrier. 183 """ 184 185 # Barrier statuses 186 FIRED = 'fired' 187 WAITING = 'waiting' 188 189 # Barrier trigger reasons (used as key names) 190 START = 'start' 191 FINALIZE = 'finalize' 192 ABORT = 'abort' 193 194 root_pipeline = db.ReferenceProperty(_PipelineRecord) 195 target = db.ReferenceProperty(_PipelineRecord, 196 collection_name='called_barrier_set') 197 blocking_slots = db.ListProperty(db.Key) 198 trigger_time = db.DateTimeProperty(indexed=False) 199 status = db.StringProperty(choices=(FIRED, WAITING), default=WAITING, 200 indexed=False) 201 202 @classmethod 203 def kind(cls): 204 return '_AE_Pipeline_Barrier' 205 206 207class _BarrierIndex(db.Model): 208 """Indicates a _BarrierRecord that is dependent on a slot. 209 210 Previously, when a _SlotRecord was filled, notify_barriers() would query for 211 all _BarrierRecords where the 'blocking_slots' property equals the 212 _SlotRecord's key. The problem with that approach is the 'blocking_slots' 213 index is eventually consistent, meaning _BarrierRecords that were just written 214 will not match the query. When pipelines are created and barriers are notified 215 in rapid succession, the inconsistent queries can cause certain barriers never 216 to fire. The outcome is a pipeline is WAITING and never RUN, even though all 217 of its dependent slots have been filled. 218 219 This entity is used to make it so barrier fan-out is fully consistent 220 with the High Replication Datastore. It's used by notify_barriers() to 221 do fully consistent ancestor queries every time a slot is filled. This 222 ensures that even all _BarrierRecords dependent on a _SlotRecord will 223 be found regardless of eventual consistency. 224 225 The key path for _BarrierIndexes is this for root entities: 226 227 _PipelineRecord<owns_slot_id>/_SlotRecord<slot_id>/ 228 _PipelineRecord<dependent_pipeline_id>/_BarrierIndex<purpose> 229 230 And this for child pipelines: 231 232 _SlotRecord<slot_id>/_PipelineRecord<dependent_pipeline_id>/ 233 _BarrierIndex<purpose> 234 235 That path is translated to the _BarrierRecord it should fire: 236 237 _PipelineRecord<dependent_pipeline_id>/_BarrierRecord<purpose> 238 239 All queries for _BarrierIndexes are key-only and thus the model requires 240 no properties or helper methods. 241 """ 242 243 # Enable this entity to be cleaned up. 244 root_pipeline = db.ReferenceProperty(_PipelineRecord) 245 246 @classmethod 247 def kind(cls): 248 return '_AE_Barrier_Index' 249 250 @classmethod 251 def to_barrier_key(cls, barrier_index_key): 252 """Converts a _BarrierIndex key to a _BarrierRecord key. 253 254 Args: 255 barrier_index_key: db.Key for a _BarrierIndex entity. 256 257 Returns: 258 db.Key for the corresponding _BarrierRecord entity. 259 """ 260 barrier_index_path = barrier_index_key.to_path() 261 262 # Pick out the items from the _BarrierIndex key path that we need to 263 # construct the _BarrierRecord key path. 264 (pipeline_kind, dependent_pipeline_id, 265 unused_kind, purpose) = barrier_index_path[-4:] 266 267 barrier_record_path = ( 268 pipeline_kind, dependent_pipeline_id, 269 _BarrierRecord.kind(), purpose) 270 271 return db.Key.from_path(*barrier_record_path) 272 273 274class _StatusRecord(db.Model): 275 """Represents the current status of a pipeline. 276 277 Properties: 278 message: The textual message to show. 279 console_url: URL to iframe as the primary console for this pipeline. 280 link_names: Human display names for status links. 281 link_urls: URLs corresponding to human names for status links. 282 status_time: When the status was written. 283 """ 284 285 root_pipeline = db.ReferenceProperty(_PipelineRecord) 286 message = db.TextProperty() 287 console_url = db.TextProperty() 288 link_names = db.ListProperty(db.Text, indexed=False) 289 link_urls = db.ListProperty(db.Text, indexed=False) 290 status_time = db.DateTimeProperty(indexed=False) 291 292 @classmethod 293 def kind(cls): 294 return '_AE_Pipeline_Status' 295