1# Copyright (c) 2015 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4import datetime
5import json
6import logging
7import math
8import os
9import urllib
10import uuid
11import webapp2
12
13from google.appengine.api import taskqueue
14from google.appengine.api import urlfetch
15from perf_insights.endpoints.cloud_mapper import cloud_helper
16from perf_insights.endpoints.cloud_mapper import job_info
17from perf_insights import cloud_config
18
19# If you modify this, you need to change the max_concurrent_requests in
20# queue.yaml.
21DEFAULT_TRACES_PER_INSTANCE = 4
22
23class TaskPage(webapp2.RequestHandler):
24
25  def _QueryForTraces(self, corpus, query):
26    payload = urllib.urlencode({'q': query})
27    query_url = '%s/query?%s' % (corpus, payload)
28
29    headers = {
30        'X-URLFetch-Service-Id': cloud_config.Get().urlfetch_service_id
31    }
32
33    result = urlfetch.fetch(url=query_url,
34                            payload=payload,
35                            method=urlfetch.GET,
36                            headers=headers,
37                            follow_redirects=False,
38                            deadline=10)
39    return json.loads(result.content)
40
41  def _DispatchTracesAndWaitForResult(self, job, traces, num_instances):
42    def _slice_it(li, cols=2):
43      start = 0
44      for i in xrange(cols):
45        stop = start + len(li[i::cols])
46        yield li[start:stop]
47        start = stop
48
49    # TODO(simonhatch): In the future it might be possibly to only specify a
50    # reducer and no mapper. Revisit this.
51    bucket_path = cloud_config.Get().control_bucket_path + "/jobs/"
52
53    mapper_url = ''
54    reducer_url = ''
55
56    if job.reducer:
57      reducer_url = '%s%s.reducer' % (bucket_path, job.key.id())
58      reducer_text = job.reducer.encode('ascii', 'ignore')
59      cloud_helper.WriteGCS(reducer_url, reducer_text)
60
61    if job.mapper:
62      mapper_url = '%s%s.mapper' % (bucket_path, job.key.id())
63      mapper_text = job.mapper.encode('ascii', 'ignore')
64      cloud_helper.WriteGCS(mapper_url, mapper_text)
65
66      version = self._GetVersion()
67
68      tasks = {}
69
70      # Split the traces up into N buckets.
71      logging.info('Splitting traces across %d instances.' % num_instances)
72      for current_traces in _slice_it(traces, num_instances):
73        logging.info('Submitting %d traces job.' % len(current_traces))
74        task_id = str(uuid.uuid4())
75
76        payload = {
77            'revision': job.revision,
78            'traces': json.dumps(current_traces),
79            'result': '%s%s.result' % (bucket_path, task_id),
80            'mapper': mapper_url,
81            'mapper_function': job.mapper_function,
82            'timeout': job.function_timeout,
83        }
84        taskqueue.add(
85            queue_name='mapper-queue',
86            url='/cloud_worker/task',
87            target=version,
88            name=task_id,
89            params=payload)
90        tasks[task_id] = {'status': 'IN_PROGRESS'}
91
92      # On production servers, we could just sit and wait for the results, but
93      # dev_server is single threaded and won't run any other tasks until the
94      # current one is finished. We'll just do the easy thing for now and
95      # queue a task to check for the result.
96      mapper_timeout = int(job.timeout - job.function_timeout)
97      timeout = (
98          datetime.datetime.now() + datetime.timedelta(
99              seconds=mapper_timeout)).strftime(
100                  '%Y-%m-%d %H:%M:%S')
101      taskqueue.add(
102          queue_name='default',
103          url='/cloud_mapper/task',
104          target=version,
105          countdown=1,
106          params={'jobid': job.key.id(),
107                  'type': 'check_map_results',
108                  'reducer': reducer_url,
109                  'tasks': json.dumps(tasks),
110                  'timeout': timeout})
111
112  def _GetVersion(self):
113    version = os.environ['CURRENT_VERSION_ID'].split('.')[0]
114    if cloud_config._is_devserver():
115      version = taskqueue.DEFAULT_APP_VERSION
116    return version
117
118  def _CheckOnMapResults(self, job):
119    if job.status != 'IN_PROGRESS':
120      return
121
122    tasks = json.loads(self.request.get('tasks'))
123    reducer_url = self.request.get('reducer')
124    reducer_function = job.reducer_function
125    revision = job.revision
126    timeout = datetime.datetime.strptime(
127        self.request.get('timeout'), '%Y-%m-%d %H:%M:%S')
128
129    # TODO: There's no reducer yet, so we can't actually collapse multiple
130    # results into one results file.
131    mappers_done = True
132    for task_id, task_values in tasks.iteritems():
133      if task_values['status'] == 'DONE':
134        continue
135      task_results_path = '%s/jobs/%s.result' % (
136          cloud_config.Get().control_bucket_path, task_id)
137      stat_result = cloud_helper.StatGCS(task_results_path)
138      if stat_result is not None:
139        logging.info(str(stat_result))
140        tasks[task_id]['status'] = 'DONE'
141      else:
142        mappers_done = False
143
144    logging.info("Tasks: %s" % str(tasks))
145
146    if not mappers_done and datetime.datetime.now() < timeout:
147      taskqueue.add(
148          url='/cloud_mapper/task',
149          target=self._GetVersion(),
150          countdown=1,
151          params={'jobid': job.key.id(),
152                  'type': 'check_map_results',
153                  'reducer': reducer_url,
154                  'tasks': json.dumps(tasks),
155                  'timeout': self.request.get('timeout')})
156      return
157
158    # Clear out any leftover tasks in case we just hit the timeout.
159    self._CancelTasks(tasks)
160
161    map_results = []
162    for task_id, _ in tasks.iteritems():
163      if tasks[task_id]['status'] != 'DONE':
164        continue
165      task_results_path = '%s/jobs/%s.result' % (
166          cloud_config.Get().control_bucket_path, task_id)
167      map_results.append(task_results_path)
168
169    # We'll only do 1 reduce job for now, maybe shard it better later
170    logging.info("Kicking off reduce.")
171    task_id = str(uuid.uuid4())
172    payload = {
173        'revision': revision,
174        'traces': json.dumps(map_results),
175        'result': '%s/jobs/%s.result' % (
176            cloud_config.Get().control_bucket_path, task_id),
177        'reducer': reducer_url,
178        'reducer_function': reducer_function,
179        'timeout': job.function_timeout,
180    }
181    taskqueue.add(
182        queue_name='mapper-queue',
183        url='/cloud_worker/task',
184        target=self._GetVersion(),
185        name=task_id,
186        params=payload)
187
188    tasks = {}
189    tasks[task_id] = {'status': 'IN_PROGRESS'}
190
191    job.running_tasks = [task_id for task_id, _ in tasks.iteritems()]
192    job.put()
193
194    reduce_tasks = {}
195    reduce_tasks[task_id] = {'status': 'IN_PROGRESS'}
196
197    # On production servers, we could just sit and wait for the results, but
198    # dev_server is single threaded and won't run any other tasks until the
199    # current one is finished. We'll just do the easy thing for now and
200    # queue a task to check for the result.
201    reducer_timeout = int(job.function_timeout)
202    timeout = (
203        datetime.datetime.now() + datetime.timedelta(
204            seconds=reducer_timeout)).strftime(
205                '%Y-%m-%d %H:%M:%S')
206    taskqueue.add(
207        queue_name='default',
208        url='/cloud_mapper/task',
209        target=self._GetVersion(),
210        countdown=1,
211        params={'jobid': job.key.id(),
212                'type': 'check_reduce_results',
213                'tasks': json.dumps(reduce_tasks),
214                'timeout': timeout})
215
216  def _CancelTasks(self, tasks):
217    task_names = [task_id for task_id, _ in tasks.iteritems()]
218    taskqueue.Queue('mapper-queue').delete_tasks_by_name(task_names)
219
220  def _CheckOnReduceResults(self, job):
221    if job.status != 'IN_PROGRESS':
222      return
223
224    tasks = json.loads(self.request.get('tasks'))
225
226    # TODO: There's really only one reducer job at the moment
227    results = None
228    for task_id, _ in tasks.iteritems():
229      task_results_path = '%s/jobs/%s.result' % (
230          cloud_config.Get().control_bucket_path, task_id)
231      stat_result = cloud_helper.StatGCS(task_results_path)
232      if stat_result is not None:
233        tasks[task_id]['status'] = 'DONE'
234        results = task_results_path
235
236    logging.info("Reduce results: %s" % str(tasks))
237
238    if not results:
239      timeout = datetime.datetime.strptime(
240          self.request.get('timeout'), '%Y-%m-%d %H:%M:%S')
241      if datetime.datetime.now() > timeout:
242        self._CancelTasks(tasks)
243        job.status = 'ERROR'
244        job.put()
245        logging.error('Task timed out waiting for results.')
246        return
247      taskqueue.add(
248          url='/cloud_mapper/task',
249          target=self._GetVersion(),
250          countdown=1,
251          params={'jobid': job.key.id(),
252                  'type': 'check_reduce_results',
253                  'tasks': json.dumps(tasks),
254                  'timeout': self.request.get('timeout')})
255      return
256
257    logging.info("Finished all tasks.")
258
259    job.status = 'COMPLETE'
260    job.results = results
261    job.put()
262
263  def _CalculateNumInstancesNeeded(self, num_traces):
264    return int(math.ceil(float(num_traces) / DEFAULT_TRACES_PER_INSTANCE))
265
266  def _RunMappers(self, job):
267    # Get all the traces to process
268    traces = self._QueryForTraces(job.corpus, job.query)
269
270    # We can probably be smarter about this down the road, maybe breaking
271    # this into many smaller tasks and allowing each instance to run
272    # several tasks at once. For now we'll just break it into a few big ones.
273    num_instances = self._CalculateNumInstancesNeeded(len(traces))
274
275    return self._DispatchTracesAndWaitForResult(job, traces, num_instances)
276
277  def _CreateMapperJob(self, job):
278    if job.status != 'QUEUED':
279      return
280
281    job.status = 'IN_PROGRESS'
282    job.put()
283
284    self._RunMappers(job)
285
286  def post(self):
287    self.response.headers['Content-Type'] = 'text/plain'
288
289    jobid = self.request.get('jobid')
290    job = job_info.JobInfo.get_by_id(jobid)
291    if not job:
292      return
293
294    try:
295      if self.request.get('type') == 'create':
296        self._CreateMapperJob(job)
297      elif self.request.get('type') == 'check_map_results':
298        self._CheckOnMapResults(job)
299      elif self.request.get('type') == 'check_reduce_results':
300        self._CheckOnReduceResults(job)
301    except Exception as e:
302      job.status = 'ERROR'
303      job.put()
304      logging.exception('Failed job: %s' % e.message)
305
306
307app = webapp2.WSGIApplication([('/cloud_mapper/task', TaskPage)])
308