1# Copyright (c) 2015 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4import datetime 5import json 6import logging 7import math 8import os 9import urllib 10import uuid 11import webapp2 12 13from google.appengine.api import taskqueue 14from google.appengine.api import urlfetch 15from perf_insights.endpoints.cloud_mapper import cloud_helper 16from perf_insights.endpoints.cloud_mapper import job_info 17from perf_insights import cloud_config 18 19# If you modify this, you need to change the max_concurrent_requests in 20# queue.yaml. 21DEFAULT_TRACES_PER_INSTANCE = 4 22 23class TaskPage(webapp2.RequestHandler): 24 25 def _QueryForTraces(self, corpus, query): 26 payload = urllib.urlencode({'q': query}) 27 query_url = '%s/query?%s' % (corpus, payload) 28 29 headers = { 30 'X-URLFetch-Service-Id': cloud_config.Get().urlfetch_service_id 31 } 32 33 result = urlfetch.fetch(url=query_url, 34 payload=payload, 35 method=urlfetch.GET, 36 headers=headers, 37 follow_redirects=False, 38 deadline=10) 39 return json.loads(result.content) 40 41 def _DispatchTracesAndWaitForResult(self, job, traces, num_instances): 42 def _slice_it(li, cols=2): 43 start = 0 44 for i in xrange(cols): 45 stop = start + len(li[i::cols]) 46 yield li[start:stop] 47 start = stop 48 49 # TODO(simonhatch): In the future it might be possibly to only specify a 50 # reducer and no mapper. Revisit this. 51 bucket_path = cloud_config.Get().control_bucket_path + "/jobs/" 52 53 mapper_url = '' 54 reducer_url = '' 55 56 if job.reducer: 57 reducer_url = '%s%s.reducer' % (bucket_path, job.key.id()) 58 reducer_text = job.reducer.encode('ascii', 'ignore') 59 cloud_helper.WriteGCS(reducer_url, reducer_text) 60 61 if job.mapper: 62 mapper_url = '%s%s.mapper' % (bucket_path, job.key.id()) 63 mapper_text = job.mapper.encode('ascii', 'ignore') 64 cloud_helper.WriteGCS(mapper_url, mapper_text) 65 66 version = self._GetVersion() 67 68 tasks = {} 69 70 # Split the traces up into N buckets. 71 logging.info('Splitting traces across %d instances.' % num_instances) 72 for current_traces in _slice_it(traces, num_instances): 73 logging.info('Submitting %d traces job.' % len(current_traces)) 74 task_id = str(uuid.uuid4()) 75 76 payload = { 77 'revision': job.revision, 78 'traces': json.dumps(current_traces), 79 'result': '%s%s.result' % (bucket_path, task_id), 80 'mapper': mapper_url, 81 'mapper_function': job.mapper_function, 82 'timeout': job.function_timeout, 83 } 84 taskqueue.add( 85 queue_name='mapper-queue', 86 url='/cloud_worker/task', 87 target=version, 88 name=task_id, 89 params=payload) 90 tasks[task_id] = {'status': 'IN_PROGRESS'} 91 92 # On production servers, we could just sit and wait for the results, but 93 # dev_server is single threaded and won't run any other tasks until the 94 # current one is finished. We'll just do the easy thing for now and 95 # queue a task to check for the result. 96 mapper_timeout = int(job.timeout - job.function_timeout) 97 timeout = ( 98 datetime.datetime.now() + datetime.timedelta( 99 seconds=mapper_timeout)).strftime( 100 '%Y-%m-%d %H:%M:%S') 101 taskqueue.add( 102 queue_name='default', 103 url='/cloud_mapper/task', 104 target=version, 105 countdown=1, 106 params={'jobid': job.key.id(), 107 'type': 'check_map_results', 108 'reducer': reducer_url, 109 'tasks': json.dumps(tasks), 110 'timeout': timeout}) 111 112 def _GetVersion(self): 113 version = os.environ['CURRENT_VERSION_ID'].split('.')[0] 114 if cloud_config._is_devserver(): 115 version = taskqueue.DEFAULT_APP_VERSION 116 return version 117 118 def _CheckOnMapResults(self, job): 119 if job.status != 'IN_PROGRESS': 120 return 121 122 tasks = json.loads(self.request.get('tasks')) 123 reducer_url = self.request.get('reducer') 124 reducer_function = job.reducer_function 125 revision = job.revision 126 timeout = datetime.datetime.strptime( 127 self.request.get('timeout'), '%Y-%m-%d %H:%M:%S') 128 129 # TODO: There's no reducer yet, so we can't actually collapse multiple 130 # results into one results file. 131 mappers_done = True 132 for task_id, task_values in tasks.iteritems(): 133 if task_values['status'] == 'DONE': 134 continue 135 task_results_path = '%s/jobs/%s.result' % ( 136 cloud_config.Get().control_bucket_path, task_id) 137 stat_result = cloud_helper.StatGCS(task_results_path) 138 if stat_result is not None: 139 logging.info(str(stat_result)) 140 tasks[task_id]['status'] = 'DONE' 141 else: 142 mappers_done = False 143 144 logging.info("Tasks: %s" % str(tasks)) 145 146 if not mappers_done and datetime.datetime.now() < timeout: 147 taskqueue.add( 148 url='/cloud_mapper/task', 149 target=self._GetVersion(), 150 countdown=1, 151 params={'jobid': job.key.id(), 152 'type': 'check_map_results', 153 'reducer': reducer_url, 154 'tasks': json.dumps(tasks), 155 'timeout': self.request.get('timeout')}) 156 return 157 158 # Clear out any leftover tasks in case we just hit the timeout. 159 self._CancelTasks(tasks) 160 161 map_results = [] 162 for task_id, _ in tasks.iteritems(): 163 if tasks[task_id]['status'] != 'DONE': 164 continue 165 task_results_path = '%s/jobs/%s.result' % ( 166 cloud_config.Get().control_bucket_path, task_id) 167 map_results.append(task_results_path) 168 169 # We'll only do 1 reduce job for now, maybe shard it better later 170 logging.info("Kicking off reduce.") 171 task_id = str(uuid.uuid4()) 172 payload = { 173 'revision': revision, 174 'traces': json.dumps(map_results), 175 'result': '%s/jobs/%s.result' % ( 176 cloud_config.Get().control_bucket_path, task_id), 177 'reducer': reducer_url, 178 'reducer_function': reducer_function, 179 'timeout': job.function_timeout, 180 } 181 taskqueue.add( 182 queue_name='mapper-queue', 183 url='/cloud_worker/task', 184 target=self._GetVersion(), 185 name=task_id, 186 params=payload) 187 188 tasks = {} 189 tasks[task_id] = {'status': 'IN_PROGRESS'} 190 191 job.running_tasks = [task_id for task_id, _ in tasks.iteritems()] 192 job.put() 193 194 reduce_tasks = {} 195 reduce_tasks[task_id] = {'status': 'IN_PROGRESS'} 196 197 # On production servers, we could just sit and wait for the results, but 198 # dev_server is single threaded and won't run any other tasks until the 199 # current one is finished. We'll just do the easy thing for now and 200 # queue a task to check for the result. 201 reducer_timeout = int(job.function_timeout) 202 timeout = ( 203 datetime.datetime.now() + datetime.timedelta( 204 seconds=reducer_timeout)).strftime( 205 '%Y-%m-%d %H:%M:%S') 206 taskqueue.add( 207 queue_name='default', 208 url='/cloud_mapper/task', 209 target=self._GetVersion(), 210 countdown=1, 211 params={'jobid': job.key.id(), 212 'type': 'check_reduce_results', 213 'tasks': json.dumps(reduce_tasks), 214 'timeout': timeout}) 215 216 def _CancelTasks(self, tasks): 217 task_names = [task_id for task_id, _ in tasks.iteritems()] 218 taskqueue.Queue('mapper-queue').delete_tasks_by_name(task_names) 219 220 def _CheckOnReduceResults(self, job): 221 if job.status != 'IN_PROGRESS': 222 return 223 224 tasks = json.loads(self.request.get('tasks')) 225 226 # TODO: There's really only one reducer job at the moment 227 results = None 228 for task_id, _ in tasks.iteritems(): 229 task_results_path = '%s/jobs/%s.result' % ( 230 cloud_config.Get().control_bucket_path, task_id) 231 stat_result = cloud_helper.StatGCS(task_results_path) 232 if stat_result is not None: 233 tasks[task_id]['status'] = 'DONE' 234 results = task_results_path 235 236 logging.info("Reduce results: %s" % str(tasks)) 237 238 if not results: 239 timeout = datetime.datetime.strptime( 240 self.request.get('timeout'), '%Y-%m-%d %H:%M:%S') 241 if datetime.datetime.now() > timeout: 242 self._CancelTasks(tasks) 243 job.status = 'ERROR' 244 job.put() 245 logging.error('Task timed out waiting for results.') 246 return 247 taskqueue.add( 248 url='/cloud_mapper/task', 249 target=self._GetVersion(), 250 countdown=1, 251 params={'jobid': job.key.id(), 252 'type': 'check_reduce_results', 253 'tasks': json.dumps(tasks), 254 'timeout': self.request.get('timeout')}) 255 return 256 257 logging.info("Finished all tasks.") 258 259 job.status = 'COMPLETE' 260 job.results = results 261 job.put() 262 263 def _CalculateNumInstancesNeeded(self, num_traces): 264 return int(math.ceil(float(num_traces) / DEFAULT_TRACES_PER_INSTANCE)) 265 266 def _RunMappers(self, job): 267 # Get all the traces to process 268 traces = self._QueryForTraces(job.corpus, job.query) 269 270 # We can probably be smarter about this down the road, maybe breaking 271 # this into many smaller tasks and allowing each instance to run 272 # several tasks at once. For now we'll just break it into a few big ones. 273 num_instances = self._CalculateNumInstancesNeeded(len(traces)) 274 275 return self._DispatchTracesAndWaitForResult(job, traces, num_instances) 276 277 def _CreateMapperJob(self, job): 278 if job.status != 'QUEUED': 279 return 280 281 job.status = 'IN_PROGRESS' 282 job.put() 283 284 self._RunMappers(job) 285 286 def post(self): 287 self.response.headers['Content-Type'] = 'text/plain' 288 289 jobid = self.request.get('jobid') 290 job = job_info.JobInfo.get_by_id(jobid) 291 if not job: 292 return 293 294 try: 295 if self.request.get('type') == 'create': 296 self._CreateMapperJob(job) 297 elif self.request.get('type') == 'check_map_results': 298 self._CheckOnMapResults(job) 299 elif self.request.get('type') == 'check_reduce_results': 300 self._CheckOnReduceResults(job) 301 except Exception as e: 302 job.status = 'ERROR' 303 job.put() 304 logging.exception('Failed job: %s' % e.message) 305 306 307app = webapp2.WSGIApplication([('/cloud_mapper/task', TaskPage)]) 308