1#!/usr/bin/env python 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License") + "\n</pre>"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import logging 19 20from webapp.src import vtslab_status as Status 21from webapp.src.proto import model 22from webapp.src.scheduler import schedule_worker 23 24import webapp2 25from google.appengine.api import taskqueue 26from google.appengine.ext import ndb 27 28PAGING_SIZE = 1000 29DICT_MODELS = { 30 "build": model.BuildModel, 31 "device": model.DeviceModel, 32 "lab": model.LabModel, 33 "job": model.JobModel, 34 "schedule": model.ScheduleModel 35} 36 37 38class CreateIndex(webapp2.RequestHandler): 39 """Cron class for /tasks/indexing/{model}.""" 40 41 def get(self, arg): 42 """Creates a task to re-index, with given URL format.""" 43 index_list = [] 44 if arg: 45 if arg.startswith("/") and arg[1:].lower() in DICT_MODELS.keys(): 46 index_list.append(arg[1:].lower()) 47 else: 48 self.response.write("<pre>Access Denied. Please visit " 49 "/tasks/indexing/{model}</pre>") 50 return 51 else: 52 # accessed by /tasks/indexing 53 index_list.extend(DICT_MODELS.keys()) 54 self.response.write( 55 "<pre>Re-indexing task{} for {} {} going to be created.</pre>". 56 format("s" 57 if len(index_list) > 1 else "", ", ".join(index_list), "are" 58 if len(index_list) > 1 else "is")) 59 60 for model_type in index_list: 61 task = taskqueue.add( 62 url="/worker/indexing", 63 target="worker", 64 queue_name="queue-indexing", 65 transactional=False, 66 params={ 67 "model_type": model_type 68 }) 69 self.response.write( 70 "<pre>Re-indexing task for {} is created. ETA: {}</pre>". 71 format(model_type, task.eta)) 72 73 74class IndexingHandler(webapp2.RequestHandler): 75 """Task queue handler class to re-index ndb model.""" 76 77 def post(self): 78 """Fetch entities and process model specific jobs.""" 79 reload(model) 80 model_type = self.request.get("model_type") 81 82 num_updated = 0 83 next_cursor = None 84 more = True 85 86 while more: 87 query = DICT_MODELS[model_type].query() 88 entities, next_cursor, more = query.fetch_page( 89 PAGING_SIZE, start_cursor=next_cursor) 90 91 to_put = [] 92 for entity in entities: 93 if model_type == "build": 94 pass 95 elif model_type == "device": 96 pass 97 elif model_type == "lab": 98 pass 99 elif model_type == "job": 100 # uses bits 0-1 to indicate version. 101 test_type = schedule_worker.GetTestVersionType( 102 entity.manifest_branch, entity.gsi_branch) 103 # uses bit 2 104 if entity.require_signed_device_build: 105 test_type |= ( 106 Status.TEST_TYPE_DICT[Status.TEST_TYPE_SIGNED]) 107 entity.test_type = test_type 108 109 if not entity.parent_schedule: 110 # finds and links to a parent schedule. 111 parent_schedule_query = model.ScheduleModel.query( 112 model.ScheduleModel.priority == entity.priority, 113 model.ScheduleModel.test_name == entity.test_name, 114 model.ScheduleModel.period == entity.period, 115 model.ScheduleModel.build_storage_type == ( 116 entity.build_storage_type), 117 model.ScheduleModel.manifest_branch == ( 118 entity.manifest_branch), 119 model.ScheduleModel.build_target == ( 120 entity.build_target), 121 model.ScheduleModel.device_pab_account_id == ( 122 entity.pab_account_id), 123 model.ScheduleModel.shards == entity.shards, 124 model.ScheduleModel.retry_count == ( 125 entity.retry_count), 126 model.ScheduleModel.gsi_storage_type == ( 127 entity.gsi_storage_type), 128 model.ScheduleModel.gsi_branch == ( 129 entity.gsi_branch), 130 model.ScheduleModel.gsi_build_target == ( 131 entity.gsi_build_target), 132 model.ScheduleModel.gsi_pab_account_id == ( 133 entity.gsi_pab_account_id), 134 model.ScheduleModel.gsi_vendor_version == ( 135 entity.gsi_vendor_version), 136 model.ScheduleModel.test_storage_type == ( 137 entity.test_storage_type), 138 model.ScheduleModel.test_branch == ( 139 entity.test_branch), 140 model.ScheduleModel.test_build_target == ( 141 entity.test_build_target), 142 model.ScheduleModel.test_pab_account_id == ( 143 entity.test_pab_account_id)) 144 parent_schedules = parent_schedule_query.fetch() 145 if not parent_schedules: 146 logging.error("Parent not found.") 147 else: 148 parent_schedule = parent_schedules[0] 149 parent_schedule.children_jobs.append(entity.key) 150 entity.parent_schedule = parent_schedule.key 151 to_put.append(parent_schedule) 152 153 elif model_type == "schedule": 154 if entity.error_count is None: 155 entity.error_count = 0 156 if entity.suspended is None: 157 entity.suspended = False 158 if entity.build_storage_type is None: 159 entity.build_storage_type = Status.STORAGE_TYPE_DICT[ 160 "PAB"] 161 # remove None children jobs. 162 if entity.children_jobs: 163 entity.children_jobs = [ 164 x for x in entity.children_jobs if x] 165 else: 166 entity.children_jobs = [] 167 168 for attr in ["has_bootloader_img", "has_radio_img"]: 169 if getattr(entity, attr, None) is None: 170 setattr(entity, attr, True) 171 172 # set priority_value for old schedules. 173 if entity.priority_value is None: 174 entity.priority_value = Status.GetPriorityValue( 175 entity.priority) 176 else: 177 pass 178 to_put.append(entity) 179 180 if to_put: 181 ndb.put_multi(to_put) 182 num_updated += len(to_put) 183 184 logging.info("{} indexing complete with {} updates!".format( 185 model_type, num_updated)) 186