1#!/usr/bin/env python
2#
3# Copyright (C) 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import datetime
19import itertools
20import logging
21import re
22
23from google.appengine.ext import ndb
24
25from webapp.src import vtslab_status as Status
26from webapp.src.proto import model
27from webapp.src.utils import logger
28import webapp2
29
30MAX_LOG_CHARACTERS = 10000  # maximum number of characters per each log
31BOOTUP_ERROR_RETRY_INTERVAL_IN_MINS = 60  # retry minutes when boot-up error is occurred
32
33CREATE_JOB_SUCCESS = "success"
34CREATE_JOB_FAILED_NO_BUILD = "no_build"
35CREATE_JOB_FAILED_NO_DEVICE = "no_device"
36
37
38def GetTestVersionType(manifest_branch, gsi_branch, test_type=0):
39    """Compares manifest branch and gsi branch to get test type.
40
41    This function only completes two LSBs which represent version related
42    test type.
43
44    Args:
45        manifest_branch: a string, manifest branch name.
46        gsi_branch: a string, gsi branch name.
47        test_type: an integer, previous test type value.
48
49    Returns:
50        An integer, test type value.
51    """
52    if not test_type:
53        value = 0
54    else:
55        # clear two bits
56        value = test_type & ~(1 | 1 << 1)
57
58    if not manifest_branch:
59        logging.debug("manifest branch cannot be empty or None.")
60        return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN]
61
62    if not gsi_branch:
63        logging.debug("gsi_branch is empty.")
64        return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_TOT]
65
66    gcs_pattern = "^gs://.*/v([0-9.]*)/.*"
67    q_pattern = "(git_)?(aosp-)?q.*"
68    p_pattern = "(git_)?(aosp-)?p.*"
69    o_mr1_pattern = "(git_)?(aosp-)?o[^-]*-m.*"
70    o_pattern = "(git_)?(aosp-)?o.*"
71    master_pattern = "(git_)?(aosp-)?master"
72
73    gcs_search = re.search(gcs_pattern, manifest_branch)
74    if gcs_search:
75        device_version = gcs_search.group(1)
76    elif re.match(q_pattern, manifest_branch):
77        device_version = "10.0"
78    elif re.match(p_pattern, manifest_branch):
79        device_version = "9.0"
80    elif re.match(o_mr1_pattern, manifest_branch):
81        device_version = "8.1"
82    elif re.match(o_pattern, manifest_branch):
83        device_version = "8.0"
84    elif re.match(master_pattern, manifest_branch):
85        device_version = "master"
86    else:
87        logging.debug("Unknown device version.")
88        return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN]
89
90    gcs_search = re.search(gcs_pattern, gsi_branch)
91    if gcs_search:
92        gsi_version = gcs_search.group(1)
93    elif re.match(q_pattern, gsi_branch):
94        gsi_version = "10.0"
95    elif re.match(p_pattern, gsi_branch):
96        gsi_version = "9.0"
97    elif re.match(o_mr1_pattern, gsi_branch):
98        gsi_version = "8.1"
99    elif re.match(o_pattern, gsi_branch):
100        gsi_version = "8.0"
101    elif re.match(master_pattern, gsi_branch):
102        gsi_version = "master"
103    else:
104        logging.debug("Unknown gsi version.")
105        return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN]
106
107    if device_version == gsi_version:
108        return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_TOT]
109    else:
110        return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_OTA]
111
112
113class ScheduleHandler(webapp2.RequestHandler):
114    """Background worker class for /worker/schedule_handler.
115
116    This class pull tasks from 'queue-schedule' queue and processes in
117    background service 'worker'.
118
119    Attributes:
120        logger: Logger class
121    """
122    logger = logger.Logger()
123
124    def ReserveDevices(self, target_device_serials):
125        """Reserves devices.
126
127        Args:
128            target_device_serials: a list of strings, containing target device
129                                   serial numbers.
130        """
131        device_query = model.DeviceModel.query(
132            model.DeviceModel.serial.IN(target_device_serials))
133        devices = device_query.fetch()
134        devices_to_put = []
135        for device in devices:
136            device.scheduling_status = Status.DEVICE_SCHEDULING_STATUS_DICT[
137                "reserved"]
138            devices_to_put.append(device)
139        if devices_to_put:
140            ndb.put_multi(devices_to_put)
141
142    def FindBuildId(self, artifact_type, manifest_branch, target,
143                    signed=False):
144        """Finds a designated build ID.
145
146        Args:
147            artifact_type: a string, build artifact type.
148            manifest_branch: a string, build manifest branch.
149            target: a string which build target and type are joined by '-'.
150            signed: a boolean to get a signed build.
151
152        Return:
153            string, build ID found.
154        """
155        build_id = ""
156        if "-" in target:
157            build_target, build_type = target.split("-")
158        else:
159            build_target = target
160            build_type = ""
161        if not artifact_type or not manifest_branch or not build_target:
162            self.logger.Println("The argument format is invalid.")
163            return build_id
164        build_query = model.BuildModel.query(
165            model.BuildModel.artifact_type == artifact_type,
166            model.BuildModel.manifest_branch == manifest_branch,
167            model.BuildModel.build_target == build_target,
168            model.BuildModel.build_type == build_type)
169        builds = build_query.fetch()
170
171        if builds:
172            builds = [
173                build for build in builds
174                if (build.timestamp >
175                    datetime.datetime.now() - datetime.timedelta(hours=72))
176            ]
177
178        if builds:
179            self.logger.Println("-- Found build ID")
180            builds.sort(key=lambda x: x.build_id, reverse=True)
181            for build in builds:
182                if not signed or build.signed:
183                    build_id = build.build_id
184                    break
185        return build_id
186
187    def post(self):
188        self.logger.Clear()
189        manual_job = False
190        schedule_key = self.request.get("schedule_key")
191        if schedule_key:
192            key = ndb.key.Key(urlsafe=schedule_key)
193            manual_job = True
194            schedules = [key.get()]
195        else:
196            schedule_query = model.ScheduleModel.query(
197                model.ScheduleModel.suspended != True)
198            schedules = schedule_query.fetch()
199
200        if schedules:
201            # filter out the schedules which are not updated within 72 hours.
202            schedules = [
203                schedule for schedule in schedules
204                if (schedule.timestamp >
205                    datetime.datetime.now() - datetime.timedelta(hours=72))
206            ]
207            schedules = self.FilterWithPeriod(schedules)
208
209        if schedules:
210            schedules.sort(key=lambda x: self.GetProductName(x))
211            group_by_product = [
212                list(g)
213                for _, g in itertools.groupby(schedules,
214                                              lambda x: self.GetProductName(x))
215            ]
216            for group in group_by_product:
217                group.sort(key=lambda x: x.priority_value if (
218                    x.priority_value) else Status.GetPriorityValue(x.priority))
219                create_result = {
220                    CREATE_JOB_SUCCESS: [],
221                    CREATE_JOB_FAILED_NO_BUILD: [],
222                    CREATE_JOB_FAILED_NO_DEVICE: []
223                }
224                for schedule in group:
225                    self.logger.Println("")
226                    self.logger.Println("Schedule: %s (branch: %s)" %
227                                        (schedule.test_name,
228                                         schedule.manifest_branch))
229                    self.logger.Println(
230                        "Build Target: %s" % schedule.build_target)
231                    self.logger.Println("Device: %s" % schedule.device)
232                    self.logger.Indent()
233                    result, lab = self.CreateJob(schedule, manual_job)
234                    if result == CREATE_JOB_SUCCESS:
235                        create_result[result].append(lab)
236                    else:
237                        create_result[result].append(schedule)
238                    self.logger.Unindent()
239                # if any schedule in group created a job, increase priority of
240                # the schedules which couldn't create due to out of devices.
241                schedules_to_put = []
242                for lab in create_result[CREATE_JOB_SUCCESS]:
243                    for schedule in create_result[CREATE_JOB_FAILED_NO_DEVICE]:
244                        if any([lab in target for target in schedule.device
245                                ]) and schedule not in schedules_to_put:
246                            if schedule.priority_value is None:
247                                schedule.priority_value = (
248                                    Status.GetPriorityValue(schedule.priority))
249                            if schedule.priority_value > 0:
250                                schedule.priority_value -= 1
251                                schedules_to_put.append(schedule)
252                if schedules_to_put:
253                    ndb.put_multi(schedules_to_put)
254
255        self.logger.Println("Scheduling completed.")
256
257        lines = self.logger.Get()
258        lines = [line.strip() for line in lines]
259        outputs = []
260        chars = 0
261        for line in lines:
262            chars += len(line)
263            if chars > MAX_LOG_CHARACTERS:
264                logging.info("\n".join(outputs))
265                outputs = []
266                chars = len(line)
267            outputs.append(line)
268        logging.info("\n".join(outputs))
269
270    def CreateJob(self, schedule, manual_job=False):
271        """Creates a job for given schedule.
272
273        Args:
274            schedule: model.ScheduleModel instance.
275            manual_job: True if a job is created by a user, False otherwise.
276
277        Returns:
278            a string of job creation result message.
279            a string of lab name if job is created, otherwise empty string.
280        """
281        target_host, target_device, target_device_serials = (
282            self.SelectTargetLab(schedule))
283        if not target_host:
284            return CREATE_JOB_FAILED_NO_DEVICE, ""
285
286        self.logger.Println("- Target host: %s" % target_host)
287        self.logger.Println("- Target device: %s" % target_device)
288        self.logger.Println("- Target serials: %s" % target_device_serials)
289
290        # create job and add.
291        new_job = model.JobModel()
292        new_job.hostname = target_host
293        new_job.priority = schedule.priority
294        new_job.test_name = schedule.test_name
295        new_job.require_signed_device_build = (
296            schedule.require_signed_device_build)
297        new_job.device = target_device
298        new_job.period = schedule.period
299        new_job.serial.extend(target_device_serials)
300        new_job.build_storage_type = schedule.build_storage_type
301        new_job.manifest_branch = schedule.manifest_branch
302        new_job.build_target = schedule.build_target
303        new_job.pab_account_id = schedule.device_pab_account_id
304        new_job.shards = schedule.shards
305        new_job.param = schedule.param
306        new_job.retry_count = schedule.retry_count
307        new_job.gsi_storage_type = schedule.gsi_storage_type
308        new_job.gsi_branch = schedule.gsi_branch
309        new_job.gsi_build_target = schedule.gsi_build_target
310        new_job.gsi_pab_account_id = schedule.gsi_pab_account_id
311        new_job.gsi_vendor_version = schedule.gsi_vendor_version
312        new_job.test_storage_type = schedule.test_storage_type
313        new_job.test_branch = schedule.test_branch
314        new_job.test_build_target = schedule.test_build_target
315        new_job.test_pab_account_id = schedule.test_pab_account_id
316        new_job.parent_schedule = schedule.key
317        new_job.image_package_repo_base = schedule.image_package_repo_base
318        new_job.required_host_equipment = schedule.required_host_equipment
319        new_job.required_device_equipment = schedule.required_device_equipment
320        new_job.has_bootloader_img = schedule.has_bootloader_img
321        new_job.has_radio_img = schedule.has_radio_img
322        new_job.report_bucket = schedule.report_bucket
323        new_job.report_spreadsheet_id = schedule.report_spreadsheet_id
324        new_job.report_persistent_url = schedule.report_persistent_url
325        new_job.report_reference_url = schedule.report_reference_url
326
327        # uses bit 0-1 to indicate version.
328        test_type = GetTestVersionType(schedule.manifest_branch,
329                                       schedule.gsi_branch)
330        # uses bit 2
331        if schedule.require_signed_device_build:
332            test_type |= Status.TEST_TYPE_DICT[Status.TEST_TYPE_SIGNED]
333
334        if manual_job:
335            test_type |= Status.TEST_TYPE_DICT[Status.TEST_TYPE_MANUAL]
336
337        new_job.test_type = test_type
338
339        new_job.build_id = ""
340        new_job.gsi_build_id = ""
341        new_job.test_build_id = ""
342        for artifact_type in ["device", "gsi", "test"]:
343            if artifact_type == "device":
344                storage_type_text = "build_storage_type"
345                manifest_branch_text = "manifest_branch"
346                build_target_text = "build_target"
347                build_id_text = "build_id"
348                signed = new_job.require_signed_device_build
349            else:
350                storage_type_text = artifact_type + "_storage_type"
351                manifest_branch_text = artifact_type + "_branch"
352                build_target_text = artifact_type + "_build_target"
353                build_id_text = artifact_type + "_build_id"
354                signed = False
355
356            manifest_branch = getattr(new_job, manifest_branch_text)
357            build_target = getattr(new_job, build_target_text)
358            storage_type = getattr(new_job, storage_type_text)
359            if storage_type == Status.STORAGE_TYPE_DICT["PAB"]:
360                build_id = self.FindBuildId(
361                    artifact_type=artifact_type,
362                    manifest_branch=manifest_branch,
363                    target=build_target,
364                    signed=signed)
365            elif storage_type == Status.STORAGE_TYPE_DICT["GCS"]:
366                # temp value to distinguish from empty values.
367                build_id = "gcs"
368            else:
369                build_id = ""
370                self.logger.Println(
371                    "Unexpected storage type (%s)." % storage_type)
372            setattr(new_job, build_id_text, build_id)
373
374        if ((not new_job.manifest_branch or new_job.build_id)
375                and (not new_job.gsi_branch or new_job.gsi_build_id)
376                and (not new_job.test_branch or new_job.test_build_id)):
377            new_job.build_id = new_job.build_id.replace("gcs", "")
378            new_job.gsi_build_id = (new_job.gsi_build_id.replace("gcs", ""))
379            new_job.test_build_id = (new_job.test_build_id.replace("gcs", ""))
380            self.ReserveDevices(target_device_serials)
381            new_job.status = Status.JOB_STATUS_DICT["ready"]
382            new_job.timestamp = datetime.datetime.now()
383            new_job_key = new_job.put()
384            schedule.children_jobs.append(new_job_key)
385            schedule.priority_value = Status.GetPriorityValue(
386                schedule.priority)
387            schedule.put()
388            self.logger.Println("A new job has been created.")
389            labs = model.LabModel.query(
390                model.LabModel.hostname == target_host).fetch()
391            return CREATE_JOB_SUCCESS, labs[0].name
392        else:
393            self.logger.Println("Cannot find builds to create a job.")
394            self.logger.Println("- Device branch / build - {} / {}".format(
395                new_job.manifest_branch, new_job.build_id))
396            self.logger.Println("- GSI branch / build - {} / {}".format(
397                new_job.gsi_branch, new_job.gsi_build_id))
398            self.logger.Println("- Test branch / build - {} / {}".format(
399                new_job.test_branch, new_job.test_build_id))
400            return CREATE_JOB_FAILED_NO_BUILD, ""
401
402    def FilterWithPeriod(self, schedules):
403        """Filters schedules with period.
404
405        This method filters schedules if any children jobs are created within
406        period time.
407
408        Args:
409            schedules: a list of model.ScheduleModel instances.
410
411        Returns:
412            a list of model.ScheduleModel instances which need to create a new
413            job.
414        """
415        ret_list = []
416        if not schedules:
417            return ret_list
418
419        if type(schedules) is not list:
420            schedules = [schedules]
421
422        for schedule in schedules:
423            if not schedule.children_jobs:
424                ret_list.append(schedule)
425                continue
426
427            latest_job_key = schedule.children_jobs[-1]
428            latest_job = latest_job_key.get()
429
430            if datetime.datetime.now() - latest_job.timestamp > (
431                    datetime.timedelta(
432                        minutes=self.GetCorrectedPeriod(schedule))):
433                ret_list.append(schedule)
434
435        return ret_list
436
437    def SelectTargetLab(self, schedule):
438        """Find target host and devices to schedule a new job.
439
440        Args:
441            schedule: a proto containing the information of a schedule.
442
443        Returns:
444            a string which represents hostname,
445            a string containing target lab and product with '/' separator,
446            a list of selected devices serial (see whether devices will be
447            selected later when the job is picked up.)
448        """
449
450        available_devices = []
451        for target_device in schedule.device:
452            if "/" not in target_device:
453                self.logger.Println(
454                    "Device malformed - {}".format(target_device))
455                continue
456
457            target_lab, target_product_type = target_device.split("/")
458            self.logger.Println("- Lab %s" % target_lab)
459            self.logger.Indent()
460            host_query = model.LabModel.query(
461                model.LabModel.name == target_lab)
462            target_hosts = host_query.fetch()
463
464            if target_hosts:
465                for host in target_hosts:
466                    if not (set(schedule.required_host_equipment) <= set(
467                            host.host_equipment)):
468                        continue
469                    self.logger.Println("- Host: %s" % host.hostname)
470                    self.logger.Indent()
471                    device_query = model.DeviceModel.query(
472                        model.DeviceModel.hostname == host.hostname,
473                        model.DeviceModel.scheduling_status ==
474                        Status.DEVICE_SCHEDULING_STATUS_DICT["free"],
475                        model.DeviceModel.status.IN([
476                            Status.DEVICE_STATUS_DICT["fastboot"],
477                            Status.DEVICE_STATUS_DICT["online"],
478                            Status.DEVICE_STATUS_DICT["ready"]
479                        ]))
480                    host_devices = device_query.fetch()
481                    host_devices = [
482                        x for x in host_devices
483                        if x.product.lower() == target_product_type.lower() and
484                        (set(schedule.required_device_equipment) <= set(
485                            x.device_equipment))
486                    ]
487                    if len(host_devices) < schedule.shards:
488                        self.logger.Println(
489                            "A host {} does not have enough devices. "
490                            "# of devices = {}, shards = {}".format(
491                                host.hostname, len(host_devices),
492                                schedule.shards))
493                        self.logger.Unindent()
494                        continue
495                    host_devices.sort(
496                        key=lambda x: (len(x.device_equipment)
497                                       if x.device_equipment else 0))
498                    available_devices.append((host_devices, target_device))
499                    self.logger.Unindent()
500
501            self.logger.Unindent()
502
503        if not available_devices:
504            self.logger.Println("No hosts have enough devices for schedule!")
505            return None, None, []
506
507        available_devices.sort(key=lambda x: (
508            sum([len(y.device_equipment) for y in x[0][:schedule.shards]])))
509        selected_host_devices = available_devices[0]
510        return selected_host_devices[0][0].hostname, selected_host_devices[
511            1], [x.serial for x in selected_host_devices[0][:schedule.shards]]
512
513    def GetProductName(self, schedule):
514        """Gets a product name from schedule instance.
515
516        Args:
517            schedule: a schedule instance.
518
519        Returns:
520            a string, product name in lowercase.
521        """
522        if not schedule or not schedule.device:
523            return ""
524
525        if "/" not in schedule.device[0]:
526            return ""
527
528        return schedule.device[0].split("/")[1].lower()
529
530    def GetCorrectedPeriod(self, schedule):
531        """Corrects and returns period value based on latest children jobs.
532
533        Args:
534            schedule: a model.ScheduleModel instance containing schedule
535                      information.
536
537        Returns:
538            an integer, corrected schedule period.
539        """
540        if not schedule.error_count or not schedule.children_jobs or (
541                schedule.period <= BOOTUP_ERROR_RETRY_INTERVAL_IN_MINS):
542            return schedule.period
543
544        latest_job = schedule.children_jobs[-1].get()
545
546        if latest_job.status == Status.JOB_STATUS_DICT["bootup-err"]:
547            return BOOTUP_ERROR_RETRY_INTERVAL_IN_MINS
548        else:
549            return schedule.period
550