#!/usr/bin/env python # # Copyright (C) 2018 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License") + "\n"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import logging from webapp.src import vtslab_status as Status from webapp.src.proto import model from webapp.src.scheduler import schedule_worker import webapp2 from google.appengine.api import taskqueue from google.appengine.ext import ndb PAGING_SIZE = 1000 DICT_MODELS = { "build": model.BuildModel, "device": model.DeviceModel, "lab": model.LabModel, "job": model.JobModel, "schedule": model.ScheduleModel } class CreateIndex(webapp2.RequestHandler): """Cron class for /tasks/indexing/{model}.""" def get(self, arg): """Creates a task to re-index, with given URL format.""" index_list = [] if arg: if arg.startswith("/") and arg[1:].lower() in DICT_MODELS.keys(): index_list.append(arg[1:].lower()) else: self.response.write("
Access Denied. Please visit "
                                    "/tasks/indexing/{model}
") return else: # accessed by /tasks/indexing index_list.extend(DICT_MODELS.keys()) self.response.write( "
Re-indexing task{} for {} {} going to be created.
". format("s" if len(index_list) > 1 else "", ", ".join(index_list), "are" if len(index_list) > 1 else "is")) for model_type in index_list: task = taskqueue.add( url="/worker/indexing", target="worker", queue_name="queue-indexing", transactional=False, params={ "model_type": model_type }) self.response.write( "
Re-indexing task for {} is created. ETA: {}
". format(model_type, task.eta)) class IndexingHandler(webapp2.RequestHandler): """Task queue handler class to re-index ndb model.""" def post(self): """Fetch entities and process model specific jobs.""" reload(model) model_type = self.request.get("model_type") num_updated = 0 next_cursor = None more = True while more: query = DICT_MODELS[model_type].query() entities, next_cursor, more = query.fetch_page( PAGING_SIZE, start_cursor=next_cursor) to_put = [] for entity in entities: if model_type == "build": pass elif model_type == "device": pass elif model_type == "lab": pass elif model_type == "job": # uses bits 0-1 to indicate version. test_type = schedule_worker.GetTestVersionType( entity.manifest_branch, entity.gsi_branch) # uses bit 2 if entity.require_signed_device_build: test_type |= ( Status.TEST_TYPE_DICT[Status.TEST_TYPE_SIGNED]) entity.test_type = test_type if not entity.parent_schedule: # finds and links to a parent schedule. parent_schedule_query = model.ScheduleModel.query( model.ScheduleModel.priority == entity.priority, model.ScheduleModel.test_name == entity.test_name, model.ScheduleModel.period == entity.period, model.ScheduleModel.build_storage_type == ( entity.build_storage_type), model.ScheduleModel.manifest_branch == ( entity.manifest_branch), model.ScheduleModel.build_target == ( entity.build_target), model.ScheduleModel.device_pab_account_id == ( entity.pab_account_id), model.ScheduleModel.shards == entity.shards, model.ScheduleModel.retry_count == ( entity.retry_count), model.ScheduleModel.gsi_storage_type == ( entity.gsi_storage_type), model.ScheduleModel.gsi_branch == ( entity.gsi_branch), model.ScheduleModel.gsi_build_target == ( entity.gsi_build_target), model.ScheduleModel.gsi_pab_account_id == ( entity.gsi_pab_account_id), model.ScheduleModel.gsi_vendor_version == ( entity.gsi_vendor_version), model.ScheduleModel.test_storage_type == ( entity.test_storage_type), model.ScheduleModel.test_branch == ( entity.test_branch), model.ScheduleModel.test_build_target == ( entity.test_build_target), model.ScheduleModel.test_pab_account_id == ( entity.test_pab_account_id)) parent_schedules = parent_schedule_query.fetch() if not parent_schedules: logging.error("Parent not found.") else: parent_schedule = parent_schedules[0] parent_schedule.children_jobs.append(entity.key) entity.parent_schedule = parent_schedule.key to_put.append(parent_schedule) elif model_type == "schedule": if entity.error_count is None: entity.error_count = 0 if entity.suspended is None: entity.suspended = False if entity.build_storage_type is None: entity.build_storage_type = Status.STORAGE_TYPE_DICT[ "PAB"] # remove None children jobs. if entity.children_jobs: entity.children_jobs = [ x for x in entity.children_jobs if x] else: entity.children_jobs = [] for attr in ["has_bootloader_img", "has_radio_img"]: if getattr(entity, attr, None) is None: setattr(entity, attr, True) # set priority_value for old schedules. if entity.priority_value is None: entity.priority_value = Status.GetPriorityValue( entity.priority) else: pass to_put.append(entity) if to_put: ndb.put_multi(to_put) num_updated += len(to_put) logging.info("{} indexing complete with {} updates!".format( model_type, num_updated))