1#!/usr/bin/env python 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import datetime 19import itertools 20import logging 21import re 22 23from google.appengine.ext import ndb 24 25from webapp.src import vtslab_status as Status 26from webapp.src.proto import model 27from webapp.src.utils import logger 28import webapp2 29 30MAX_LOG_CHARACTERS = 10000 # maximum number of characters per each log 31BOOTUP_ERROR_RETRY_INTERVAL_IN_MINS = 60 # retry minutes when boot-up error is occurred 32 33CREATE_JOB_SUCCESS = "success" 34CREATE_JOB_FAILED_NO_BUILD = "no_build" 35CREATE_JOB_FAILED_NO_DEVICE = "no_device" 36 37 38def GetTestVersionType(manifest_branch, gsi_branch, test_type=0): 39 """Compares manifest branch and gsi branch to get test type. 40 41 This function only completes two LSBs which represent version related 42 test type. 43 44 Args: 45 manifest_branch: a string, manifest branch name. 46 gsi_branch: a string, gsi branch name. 47 test_type: an integer, previous test type value. 48 49 Returns: 50 An integer, test type value. 51 """ 52 if not test_type: 53 value = 0 54 else: 55 # clear two bits 56 value = test_type & ~(1 | 1 << 1) 57 58 if not manifest_branch: 59 logging.debug("manifest branch cannot be empty or None.") 60 return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN] 61 62 if not gsi_branch: 63 logging.debug("gsi_branch is empty.") 64 return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_TOT] 65 66 gcs_pattern = "^gs://.*/v([0-9.]*)/.*" 67 q_pattern = "(git_)?(aosp-)?q.*" 68 p_pattern = "(git_)?(aosp-)?p.*" 69 o_mr1_pattern = "(git_)?(aosp-)?o[^-]*-m.*" 70 o_pattern = "(git_)?(aosp-)?o.*" 71 master_pattern = "(git_)?(aosp-)?master" 72 73 gcs_search = re.search(gcs_pattern, manifest_branch) 74 if gcs_search: 75 device_version = gcs_search.group(1) 76 elif re.match(q_pattern, manifest_branch): 77 device_version = "10.0" 78 elif re.match(p_pattern, manifest_branch): 79 device_version = "9.0" 80 elif re.match(o_mr1_pattern, manifest_branch): 81 device_version = "8.1" 82 elif re.match(o_pattern, manifest_branch): 83 device_version = "8.0" 84 elif re.match(master_pattern, manifest_branch): 85 device_version = "master" 86 else: 87 logging.debug("Unknown device version.") 88 return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN] 89 90 gcs_search = re.search(gcs_pattern, gsi_branch) 91 if gcs_search: 92 gsi_version = gcs_search.group(1) 93 elif re.match(q_pattern, gsi_branch): 94 gsi_version = "10.0" 95 elif re.match(p_pattern, gsi_branch): 96 gsi_version = "9.0" 97 elif re.match(o_mr1_pattern, gsi_branch): 98 gsi_version = "8.1" 99 elif re.match(o_pattern, gsi_branch): 100 gsi_version = "8.0" 101 elif re.match(master_pattern, gsi_branch): 102 gsi_version = "master" 103 else: 104 logging.debug("Unknown gsi version.") 105 return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN] 106 107 if device_version == gsi_version: 108 return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_TOT] 109 else: 110 return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_OTA] 111 112 113class ScheduleHandler(webapp2.RequestHandler): 114 """Background worker class for /worker/schedule_handler. 115 116 This class pull tasks from 'queue-schedule' queue and processes in 117 background service 'worker'. 118 119 Attributes: 120 logger: Logger class 121 """ 122 logger = logger.Logger() 123 124 def ReserveDevices(self, target_device_serials): 125 """Reserves devices. 126 127 Args: 128 target_device_serials: a list of strings, containing target device 129 serial numbers. 130 """ 131 device_query = model.DeviceModel.query( 132 model.DeviceModel.serial.IN(target_device_serials)) 133 devices = device_query.fetch() 134 devices_to_put = [] 135 for device in devices: 136 device.scheduling_status = Status.DEVICE_SCHEDULING_STATUS_DICT[ 137 "reserved"] 138 devices_to_put.append(device) 139 if devices_to_put: 140 ndb.put_multi(devices_to_put) 141 142 def FindBuildId(self, artifact_type, manifest_branch, target, 143 signed=False): 144 """Finds a designated build ID. 145 146 Args: 147 artifact_type: a string, build artifact type. 148 manifest_branch: a string, build manifest branch. 149 target: a string which build target and type are joined by '-'. 150 signed: a boolean to get a signed build. 151 152 Return: 153 string, build ID found. 154 """ 155 build_id = "" 156 if "-" in target: 157 build_target, build_type = target.split("-") 158 else: 159 build_target = target 160 build_type = "" 161 if not artifact_type or not manifest_branch or not build_target: 162 self.logger.Println("The argument format is invalid.") 163 return build_id 164 build_query = model.BuildModel.query( 165 model.BuildModel.artifact_type == artifact_type, 166 model.BuildModel.manifest_branch == manifest_branch, 167 model.BuildModel.build_target == build_target, 168 model.BuildModel.build_type == build_type) 169 builds = build_query.fetch() 170 171 if builds: 172 builds = [ 173 build for build in builds 174 if (build.timestamp > 175 datetime.datetime.now() - datetime.timedelta(hours=72)) 176 ] 177 178 if builds: 179 self.logger.Println("-- Found build ID") 180 builds.sort(key=lambda x: x.build_id, reverse=True) 181 for build in builds: 182 if not signed or build.signed: 183 build_id = build.build_id 184 break 185 return build_id 186 187 def post(self): 188 self.logger.Clear() 189 manual_job = False 190 schedule_key = self.request.get("schedule_key") 191 if schedule_key: 192 key = ndb.key.Key(urlsafe=schedule_key) 193 manual_job = True 194 schedules = [key.get()] 195 else: 196 schedule_query = model.ScheduleModel.query( 197 model.ScheduleModel.suspended != True) 198 schedules = schedule_query.fetch() 199 200 if schedules: 201 # filter out the schedules which are not updated within 72 hours. 202 schedules = [ 203 schedule for schedule in schedules 204 if (schedule.timestamp > 205 datetime.datetime.now() - datetime.timedelta(hours=72)) 206 ] 207 schedules = self.FilterWithPeriod(schedules) 208 209 if schedules: 210 schedules.sort(key=lambda x: self.GetProductName(x)) 211 group_by_product = [ 212 list(g) 213 for _, g in itertools.groupby(schedules, 214 lambda x: self.GetProductName(x)) 215 ] 216 for group in group_by_product: 217 group.sort(key=lambda x: x.priority_value if ( 218 x.priority_value) else Status.GetPriorityValue(x.priority)) 219 create_result = { 220 CREATE_JOB_SUCCESS: [], 221 CREATE_JOB_FAILED_NO_BUILD: [], 222 CREATE_JOB_FAILED_NO_DEVICE: [] 223 } 224 for schedule in group: 225 self.logger.Println("") 226 self.logger.Println("Schedule: %s (branch: %s)" % 227 (schedule.test_name, 228 schedule.manifest_branch)) 229 self.logger.Println( 230 "Build Target: %s" % schedule.build_target) 231 self.logger.Println("Device: %s" % schedule.device) 232 self.logger.Indent() 233 result, lab = self.CreateJob(schedule, manual_job) 234 if result == CREATE_JOB_SUCCESS: 235 create_result[result].append(lab) 236 else: 237 create_result[result].append(schedule) 238 self.logger.Unindent() 239 # if any schedule in group created a job, increase priority of 240 # the schedules which couldn't create due to out of devices. 241 schedules_to_put = [] 242 for lab in create_result[CREATE_JOB_SUCCESS]: 243 for schedule in create_result[CREATE_JOB_FAILED_NO_DEVICE]: 244 if any([lab in target for target in schedule.device 245 ]) and schedule not in schedules_to_put: 246 if schedule.priority_value is None: 247 schedule.priority_value = ( 248 Status.GetPriorityValue(schedule.priority)) 249 if schedule.priority_value > 0: 250 schedule.priority_value -= 1 251 schedules_to_put.append(schedule) 252 if schedules_to_put: 253 ndb.put_multi(schedules_to_put) 254 255 self.logger.Println("Scheduling completed.") 256 257 lines = self.logger.Get() 258 lines = [line.strip() for line in lines] 259 outputs = [] 260 chars = 0 261 for line in lines: 262 chars += len(line) 263 if chars > MAX_LOG_CHARACTERS: 264 logging.info("\n".join(outputs)) 265 outputs = [] 266 chars = len(line) 267 outputs.append(line) 268 logging.info("\n".join(outputs)) 269 270 def CreateJob(self, schedule, manual_job=False): 271 """Creates a job for given schedule. 272 273 Args: 274 schedule: model.ScheduleModel instance. 275 manual_job: True if a job is created by a user, False otherwise. 276 277 Returns: 278 a string of job creation result message. 279 a string of lab name if job is created, otherwise empty string. 280 """ 281 target_host, target_device, target_device_serials = ( 282 self.SelectTargetLab(schedule)) 283 if not target_host: 284 return CREATE_JOB_FAILED_NO_DEVICE, "" 285 286 self.logger.Println("- Target host: %s" % target_host) 287 self.logger.Println("- Target device: %s" % target_device) 288 self.logger.Println("- Target serials: %s" % target_device_serials) 289 290 # create job and add. 291 new_job = model.JobModel() 292 new_job.hostname = target_host 293 new_job.priority = schedule.priority 294 new_job.test_name = schedule.test_name 295 new_job.require_signed_device_build = ( 296 schedule.require_signed_device_build) 297 new_job.device = target_device 298 new_job.period = schedule.period 299 new_job.serial.extend(target_device_serials) 300 new_job.build_storage_type = schedule.build_storage_type 301 new_job.manifest_branch = schedule.manifest_branch 302 new_job.build_target = schedule.build_target 303 new_job.pab_account_id = schedule.device_pab_account_id 304 new_job.shards = schedule.shards 305 new_job.param = schedule.param 306 new_job.retry_count = schedule.retry_count 307 new_job.gsi_storage_type = schedule.gsi_storage_type 308 new_job.gsi_branch = schedule.gsi_branch 309 new_job.gsi_build_target = schedule.gsi_build_target 310 new_job.gsi_pab_account_id = schedule.gsi_pab_account_id 311 new_job.gsi_vendor_version = schedule.gsi_vendor_version 312 new_job.test_storage_type = schedule.test_storage_type 313 new_job.test_branch = schedule.test_branch 314 new_job.test_build_target = schedule.test_build_target 315 new_job.test_pab_account_id = schedule.test_pab_account_id 316 new_job.parent_schedule = schedule.key 317 new_job.image_package_repo_base = schedule.image_package_repo_base 318 new_job.required_host_equipment = schedule.required_host_equipment 319 new_job.required_device_equipment = schedule.required_device_equipment 320 new_job.has_bootloader_img = schedule.has_bootloader_img 321 new_job.has_radio_img = schedule.has_radio_img 322 new_job.report_bucket = schedule.report_bucket 323 new_job.report_spreadsheet_id = schedule.report_spreadsheet_id 324 new_job.report_persistent_url = schedule.report_persistent_url 325 new_job.report_reference_url = schedule.report_reference_url 326 327 # uses bit 0-1 to indicate version. 328 test_type = GetTestVersionType(schedule.manifest_branch, 329 schedule.gsi_branch) 330 # uses bit 2 331 if schedule.require_signed_device_build: 332 test_type |= Status.TEST_TYPE_DICT[Status.TEST_TYPE_SIGNED] 333 334 if manual_job: 335 test_type |= Status.TEST_TYPE_DICT[Status.TEST_TYPE_MANUAL] 336 337 new_job.test_type = test_type 338 339 new_job.build_id = "" 340 new_job.gsi_build_id = "" 341 new_job.test_build_id = "" 342 for artifact_type in ["device", "gsi", "test"]: 343 if artifact_type == "device": 344 storage_type_text = "build_storage_type" 345 manifest_branch_text = "manifest_branch" 346 build_target_text = "build_target" 347 build_id_text = "build_id" 348 signed = new_job.require_signed_device_build 349 else: 350 storage_type_text = artifact_type + "_storage_type" 351 manifest_branch_text = artifact_type + "_branch" 352 build_target_text = artifact_type + "_build_target" 353 build_id_text = artifact_type + "_build_id" 354 signed = False 355 356 manifest_branch = getattr(new_job, manifest_branch_text) 357 build_target = getattr(new_job, build_target_text) 358 storage_type = getattr(new_job, storage_type_text) 359 if storage_type == Status.STORAGE_TYPE_DICT["PAB"]: 360 build_id = self.FindBuildId( 361 artifact_type=artifact_type, 362 manifest_branch=manifest_branch, 363 target=build_target, 364 signed=signed) 365 elif storage_type == Status.STORAGE_TYPE_DICT["GCS"]: 366 # temp value to distinguish from empty values. 367 build_id = "gcs" 368 else: 369 build_id = "" 370 self.logger.Println( 371 "Unexpected storage type (%s)." % storage_type) 372 setattr(new_job, build_id_text, build_id) 373 374 if ((not new_job.manifest_branch or new_job.build_id) 375 and (not new_job.gsi_branch or new_job.gsi_build_id) 376 and (not new_job.test_branch or new_job.test_build_id)): 377 new_job.build_id = new_job.build_id.replace("gcs", "") 378 new_job.gsi_build_id = (new_job.gsi_build_id.replace("gcs", "")) 379 new_job.test_build_id = (new_job.test_build_id.replace("gcs", "")) 380 self.ReserveDevices(target_device_serials) 381 new_job.status = Status.JOB_STATUS_DICT["ready"] 382 new_job.timestamp = datetime.datetime.now() 383 new_job_key = new_job.put() 384 schedule.children_jobs.append(new_job_key) 385 schedule.priority_value = Status.GetPriorityValue( 386 schedule.priority) 387 schedule.put() 388 self.logger.Println("A new job has been created.") 389 labs = model.LabModel.query( 390 model.LabModel.hostname == target_host).fetch() 391 return CREATE_JOB_SUCCESS, labs[0].name 392 else: 393 self.logger.Println("Cannot find builds to create a job.") 394 self.logger.Println("- Device branch / build - {} / {}".format( 395 new_job.manifest_branch, new_job.build_id)) 396 self.logger.Println("- GSI branch / build - {} / {}".format( 397 new_job.gsi_branch, new_job.gsi_build_id)) 398 self.logger.Println("- Test branch / build - {} / {}".format( 399 new_job.test_branch, new_job.test_build_id)) 400 return CREATE_JOB_FAILED_NO_BUILD, "" 401 402 def FilterWithPeriod(self, schedules): 403 """Filters schedules with period. 404 405 This method filters schedules if any children jobs are created within 406 period time. 407 408 Args: 409 schedules: a list of model.ScheduleModel instances. 410 411 Returns: 412 a list of model.ScheduleModel instances which need to create a new 413 job. 414 """ 415 ret_list = [] 416 if not schedules: 417 return ret_list 418 419 if type(schedules) is not list: 420 schedules = [schedules] 421 422 for schedule in schedules: 423 if not schedule.children_jobs: 424 ret_list.append(schedule) 425 continue 426 427 latest_job_key = schedule.children_jobs[-1] 428 latest_job = latest_job_key.get() 429 430 if datetime.datetime.now() - latest_job.timestamp > ( 431 datetime.timedelta( 432 minutes=self.GetCorrectedPeriod(schedule))): 433 ret_list.append(schedule) 434 435 return ret_list 436 437 def SelectTargetLab(self, schedule): 438 """Find target host and devices to schedule a new job. 439 440 Args: 441 schedule: a proto containing the information of a schedule. 442 443 Returns: 444 a string which represents hostname, 445 a string containing target lab and product with '/' separator, 446 a list of selected devices serial (see whether devices will be 447 selected later when the job is picked up.) 448 """ 449 450 available_devices = [] 451 for target_device in schedule.device: 452 if "/" not in target_device: 453 self.logger.Println( 454 "Device malformed - {}".format(target_device)) 455 continue 456 457 target_lab, target_product_type = target_device.split("/") 458 self.logger.Println("- Lab %s" % target_lab) 459 self.logger.Indent() 460 host_query = model.LabModel.query( 461 model.LabModel.name == target_lab) 462 target_hosts = host_query.fetch() 463 464 if target_hosts: 465 for host in target_hosts: 466 if not (set(schedule.required_host_equipment) <= set( 467 host.host_equipment)): 468 continue 469 self.logger.Println("- Host: %s" % host.hostname) 470 self.logger.Indent() 471 device_query = model.DeviceModel.query( 472 model.DeviceModel.hostname == host.hostname, 473 model.DeviceModel.scheduling_status == 474 Status.DEVICE_SCHEDULING_STATUS_DICT["free"], 475 model.DeviceModel.status.IN([ 476 Status.DEVICE_STATUS_DICT["fastboot"], 477 Status.DEVICE_STATUS_DICT["online"], 478 Status.DEVICE_STATUS_DICT["ready"] 479 ])) 480 host_devices = device_query.fetch() 481 host_devices = [ 482 x for x in host_devices 483 if x.product.lower() == target_product_type.lower() and 484 (set(schedule.required_device_equipment) <= set( 485 x.device_equipment)) 486 ] 487 if len(host_devices) < schedule.shards: 488 self.logger.Println( 489 "A host {} does not have enough devices. " 490 "# of devices = {}, shards = {}".format( 491 host.hostname, len(host_devices), 492 schedule.shards)) 493 self.logger.Unindent() 494 continue 495 host_devices.sort( 496 key=lambda x: (len(x.device_equipment) 497 if x.device_equipment else 0)) 498 available_devices.append((host_devices, target_device)) 499 self.logger.Unindent() 500 501 self.logger.Unindent() 502 503 if not available_devices: 504 self.logger.Println("No hosts have enough devices for schedule!") 505 return None, None, [] 506 507 available_devices.sort(key=lambda x: ( 508 sum([len(y.device_equipment) for y in x[0][:schedule.shards]]))) 509 selected_host_devices = available_devices[0] 510 return selected_host_devices[0][0].hostname, selected_host_devices[ 511 1], [x.serial for x in selected_host_devices[0][:schedule.shards]] 512 513 def GetProductName(self, schedule): 514 """Gets a product name from schedule instance. 515 516 Args: 517 schedule: a schedule instance. 518 519 Returns: 520 a string, product name in lowercase. 521 """ 522 if not schedule or not schedule.device: 523 return "" 524 525 if "/" not in schedule.device[0]: 526 return "" 527 528 return schedule.device[0].split("/")[1].lower() 529 530 def GetCorrectedPeriod(self, schedule): 531 """Corrects and returns period value based on latest children jobs. 532 533 Args: 534 schedule: a model.ScheduleModel instance containing schedule 535 information. 536 537 Returns: 538 an integer, corrected schedule period. 539 """ 540 if not schedule.error_count or not schedule.children_jobs or ( 541 schedule.period <= BOOTUP_ERROR_RETRY_INTERVAL_IN_MINS): 542 return schedule.period 543 544 latest_job = schedule.children_jobs[-1].get() 545 546 if latest_job.status == Status.JOB_STATUS_DICT["bootup-err"]: 547 return BOOTUP_ERROR_RETRY_INTERVAL_IN_MINS 548 else: 549 return schedule.period 550