1#!/usr/bin/env python 2# Copyright 2010 Google Inc. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16"""Base handler class for all mapreduce handlers.""" 17 18 19 20# pylint: disable=protected-access 21# pylint: disable=g-bad-name 22# pylint: disable=g-import-not-at-top 23 24import httplib 25import logging 26 27try: 28 import json 29except ImportError: 30 import simplejson as json 31 32try: 33 from mapreduce import pipeline_base 34except ImportError: 35 pipeline_base = None 36try: 37 # Check if the full cloudstorage package exists. The stub part is in runtime. 38 import cloudstorage 39 if hasattr(cloudstorage, "_STUB"): 40 cloudstorage = None 41except ImportError: 42 cloudstorage = None 43 44from google.appengine.ext import webapp 45from mapreduce import errors 46from mapreduce import json_util 47from mapreduce import model 48from mapreduce import parameters 49 50 51class Error(Exception): 52 """Base-class for exceptions in this module.""" 53 54 55class BadRequestPathError(Error): 56 """The request path for the handler is invalid.""" 57 58 59class TaskQueueHandler(webapp.RequestHandler): 60 """Base class for handlers intended to be run only from the task queue. 61 62 Sub-classes should implement 63 1. the 'handle' method for all POST request. 64 2. '_preprocess' method for decoding or validations before handle. 65 3. '_drop_gracefully' method if _preprocess fails and the task has to 66 be dropped. 67 68 In Python27 runtime, webapp2 will automatically replace webapp. 69 """ 70 71 _DEFAULT_USER_AGENT = "App Engine Python MR" 72 73 def __init__(self, *args, **kwargs): 74 # webapp framework invokes initialize after __init__. 75 # webapp2 framework invokes initialize within __init__. 76 # Python27 runtime swap webapp with webapp2 underneath us. 77 # Since initialize will conditionally change this field, 78 # it needs to be set before calling super's __init__. 79 self._preprocess_success = False 80 super(TaskQueueHandler, self).__init__(*args, **kwargs) 81 if cloudstorage: 82 cloudstorage.set_default_retry_params( 83 cloudstorage.RetryParams( 84 min_retries=5, 85 max_retries=10, 86 urlfetch_timeout=parameters._GCS_URLFETCH_TIMEOUT_SEC, 87 save_access_token=True, 88 _user_agent=self._DEFAULT_USER_AGENT)) 89 90 def initialize(self, request, response): 91 """Initialize. 92 93 1. call webapp init. 94 2. check request is indeed from taskqueue. 95 3. check the task has not been retried too many times. 96 4. run handler specific processing logic. 97 5. run error handling logic if precessing failed. 98 99 Args: 100 request: a webapp.Request instance. 101 response: a webapp.Response instance. 102 """ 103 super(TaskQueueHandler, self).initialize(request, response) 104 105 # Check request is from taskqueue. 106 if "X-AppEngine-QueueName" not in self.request.headers: 107 logging.error(self.request.headers) 108 logging.error("Task queue handler received non-task queue request") 109 self.response.set_status( 110 403, message="Task queue handler received non-task queue request") 111 return 112 113 # Check task has not been retried too many times. 114 if self.task_retry_count() + 1 > parameters.config.TASK_MAX_ATTEMPTS: 115 logging.error( 116 "Task %s has been attempted %s times. Dropping it permanently.", 117 self.request.headers["X-AppEngine-TaskName"], 118 self.task_retry_count() + 1) 119 self._drop_gracefully() 120 return 121 122 try: 123 self._preprocess() 124 self._preprocess_success = True 125 # pylint: disable=bare-except 126 except: 127 self._preprocess_success = False 128 logging.error( 129 "Preprocess task %s failed. Dropping it permanently.", 130 self.request.headers["X-AppEngine-TaskName"]) 131 self._drop_gracefully() 132 133 def post(self): 134 if self._preprocess_success: 135 self.handle() 136 137 def handle(self): 138 """To be implemented by subclasses.""" 139 raise NotImplementedError() 140 141 def _preprocess(self): 142 """Preprocess. 143 144 This method is called after webapp initialization code has been run 145 successfully. It can thus access self.request, self.response and so on. 146 """ 147 pass 148 149 def _drop_gracefully(self): 150 """Drop task gracefully. 151 152 When preprocess failed, this method is called before the task is dropped. 153 """ 154 pass 155 156 def task_retry_count(self): 157 """Number of times this task has been retried.""" 158 return int(self.request.headers.get("X-AppEngine-TaskExecutionCount", 0)) 159 160 def retry_task(self): 161 """Ask taskqueue to retry this task. 162 163 Even though raising an exception can cause a task retry, it 164 will flood logs with highly visible ERROR logs. Handlers should uses 165 this method to perform controlled task retries. Only raise exceptions 166 for those deserve ERROR log entries. 167 """ 168 self.response.set_status(httplib.SERVICE_UNAVAILABLE, "Retry task") 169 self.response.clear() 170 171 172class JsonHandler(webapp.RequestHandler): 173 """Base class for JSON handlers for user interface. 174 175 Sub-classes should implement the 'handle' method. They should put their 176 response data in the 'self.json_response' dictionary. Any exceptions raised 177 by the sub-class implementation will be sent in a JSON response with the 178 name of the error_class and the error_message. 179 """ 180 181 def __init__(self, *args): 182 """Initializer.""" 183 super(JsonHandler, self).__init__(*args) 184 self.json_response = {} 185 186 def base_path(self): 187 """Base path for all mapreduce-related urls. 188 189 JSON handlers are mapped to /base_path/command/command_name thus they 190 require special treatment. 191 192 Raises: 193 BadRequestPathError: if the path does not end with "/command". 194 195 Returns: 196 The base path. 197 """ 198 path = self.request.path 199 base_path = path[:path.rfind("/")] 200 if not base_path.endswith("/command"): 201 raise BadRequestPathError( 202 "Json handlers should have /command path prefix") 203 return base_path[:base_path.rfind("/")] 204 205 def _handle_wrapper(self): 206 """The helper method for handling JSON Post and Get requests.""" 207 if self.request.headers.get("X-Requested-With") != "XMLHttpRequest": 208 logging.error("Got JSON request with no X-Requested-With header") 209 self.response.set_status( 210 403, message="Got JSON request with no X-Requested-With header") 211 return 212 213 self.json_response.clear() 214 try: 215 self.handle() 216 except errors.MissingYamlError: 217 logging.debug("Could not find 'mapreduce.yaml' file.") 218 self.json_response.clear() 219 self.json_response["error_class"] = "Notice" 220 self.json_response["error_message"] = "Could not find 'mapreduce.yaml'" 221 except Exception, e: 222 logging.exception("Error in JsonHandler, returning exception.") 223 # TODO(user): Include full traceback here for the end-user. 224 self.json_response.clear() 225 self.json_response["error_class"] = e.__class__.__name__ 226 self.json_response["error_message"] = str(e) 227 228 self.response.headers["Content-Type"] = "text/javascript" 229 try: 230 output = json.dumps(self.json_response, cls=json_util.JsonEncoder) 231 # pylint: disable=broad-except 232 except Exception, e: 233 logging.exception("Could not serialize to JSON") 234 self.response.set_status(500, message="Could not serialize to JSON") 235 return 236 else: 237 self.response.out.write(output) 238 239 def handle(self): 240 """To be implemented by sub-classes.""" 241 raise NotImplementedError() 242 243 244class PostJsonHandler(JsonHandler): 245 """JSON handler that accepts POST requests.""" 246 247 def post(self): 248 self._handle_wrapper() 249 250 251class GetJsonHandler(JsonHandler): 252 """JSON handler that accepts GET posts.""" 253 254 def get(self): 255 self._handle_wrapper() 256 257 258class HugeTaskHandler(TaskQueueHandler): 259 """Base handler for processing HugeTasks.""" 260 261 class _RequestWrapper(object): 262 """Container of a request and associated parameters.""" 263 264 def __init__(self, request): 265 self._request = request 266 self._params = model.HugeTask.decode_payload(request) 267 268 def get(self, name, default=""): 269 return self._params.get(name, default) 270 271 def set(self, name, value): 272 self._params[name] = value 273 274 def __getattr__(self, name): 275 return getattr(self._request, name) 276 277 def __init__(self, *args, **kwargs): 278 super(HugeTaskHandler, self).__init__(*args, **kwargs) 279 280 def _preprocess(self): 281 self.request = self._RequestWrapper(self.request) 282 283 284if pipeline_base: 285 # For backward compatiblity. 286 PipelineBase = pipeline_base.PipelineBase 287else: 288 PipelineBase = None 289