1# Copyright 2017 Google Inc. All rights reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Job Queue Info APIs implemented using Google Cloud Endpoints.""" 15 16import datetime 17import endpoints 18import logging 19import re 20 21from webapp.src import vtslab_status as Status 22from webapp.src.endpoint import endpoint_base 23from webapp.src.proto import model 24from webapp.src.utils import email_util 25from webapp.src.utils import model_util 26 27from google.appengine.ext import ndb 28 29JOB_QUEUE_RESOURCE = endpoints.ResourceContainer(model.JobMessage) 30GCS_URL_PREFIX = "gs://" 31HTTP_HTTPS_REGEX = "^https?://" 32STORAGE_API_URL = "https://storage.cloud.google.com/" 33 34 35@endpoints.api(name='job', version='v1') 36class JobQueueApi(endpoint_base.EndpointBase): 37 """Endpoint API for job_queue.""" 38 39 @endpoints.method( 40 JOB_QUEUE_RESOURCE, 41 model.JobLeaseResponse, 42 path='lease', 43 http_method='POST', 44 name='lease') 45 def lease(self, request): 46 """Gets the job(s) based on the condition specified in `request`.""" 47 job_query = model.JobModel.query( 48 model.JobModel.hostname == request.hostname, 49 model.JobModel.status == Status.JOB_STATUS_DICT["ready"]) 50 existing_jobs = job_query.fetch() 51 52 priority_sorted_jobs = sorted( 53 existing_jobs, 54 key=lambda x: (Status.GetPriorityValue(x.priority), x.timestamp)) 55 56 if priority_sorted_jobs: 57 job = priority_sorted_jobs[0] 58 job.status = Status.JOB_STATUS_DICT["leased"] 59 job.put() 60 61 job_message = model.JobMessage() 62 common_attributes = self.GetCommonAttributes(job, model.JobMessage) 63 for attr in common_attributes: 64 setattr(job_message, attr, getattr(job, attr)) 65 66 device_query = model.DeviceModel.query( 67 model.DeviceModel.serial.IN(job.serial)) 68 devices = device_query.fetch() 69 devices_to_put = [] 70 for device in devices: 71 device.scheduling_status = Status.DEVICE_SCHEDULING_STATUS_DICT[ 72 "use"] 73 devices_to_put.append(device) 74 if devices_to_put: 75 ndb.put_multi(devices_to_put) 76 77 return model.JobLeaseResponse( 78 return_code=model.ReturnCodeMessage.SUCCESS, 79 jobs=[job_message]) 80 else: 81 return model.JobLeaseResponse( 82 return_code=model.ReturnCodeMessage.FAIL, jobs=[]) 83 84 @endpoints.method( 85 JOB_QUEUE_RESOURCE, 86 model.JobLeaseResponse, 87 path='heartbeat', 88 http_method='POST', 89 name='heartbeat') 90 def heartbeat(self, request): 91 """Processes the heartbeat signal from HC which leased queued job(s).""" 92 # minify jobs by query and confirm with serial from fetched jobs 93 job_query = model.JobModel.query( 94 model.JobModel.hostname == request.hostname, 95 model.JobModel.manifest_branch == request.manifest_branch, 96 model.JobModel.build_target == request.build_target, 97 model.JobModel.test_name == request.test_name, 98 model.JobModel.status == Status.JOB_STATUS_DICT["leased"]) 99 existing_jobs = job_query.fetch() 100 same_jobs = [ 101 x for x in existing_jobs if set(x.serial) == set(request.serial) 102 ] 103 104 if len(same_jobs) > 1: 105 logging.warning("[heartbeat] more than one job is found!") 106 logging.warning( 107 "[heartbeat] <hostname>{} <manifest_branch>{} " 108 "<build_target>{} <test_name>{} <serials>{}".format( 109 request.hostname, request.manifest_branch, 110 request.build_target, request.test_name, request.serial)) 111 112 if same_jobs: 113 job = same_jobs[0] 114 job_message = model.JobMessage() 115 common_attributes = self.GetCommonAttributes(job, model.JobMessage) 116 for attr in common_attributes: 117 setattr(job_message, attr, getattr(job, attr)) 118 119 device_query = model.DeviceModel.query( 120 model.DeviceModel.serial.IN(job.serial)) 121 devices = device_query.fetch() 122 logging.debug("[heartbeat] heartbeat job: hostname={}, " 123 "test_name={}, job creation time={}".format( 124 job.hostname, job.test_name, job.timestamp)) 125 logging.debug("[heartbeat] request status: {}".format( 126 request.status)) 127 logging.debug("[heartbeat] - devices = {}".format( 128 ", ".join([device.serial for device in devices]))) 129 devices_to_put = [] 130 if request.status == Status.JOB_STATUS_DICT["complete"]: 131 job.status = request.status 132 for device in devices: 133 device.scheduling_status = ( 134 Status.DEVICE_SCHEDULING_STATUS_DICT["free"]) 135 devices_to_put.append(device) 136 elif (request.status in [ 137 Status.JOB_STATUS_DICT["infra-err"], 138 Status.JOB_STATUS_DICT["bootup-err"] 139 ]): 140 job.status = request.status 141 email_util.send_job_notification(job) 142 for device in devices: 143 device.scheduling_status = ( 144 Status.DEVICE_SCHEDULING_STATUS_DICT["free"]) 145 device.status = Status.DEVICE_STATUS_DICT["unknown"] 146 devices_to_put.append(device) 147 elif request.status == Status.JOB_STATUS_DICT["leased"]: 148 job.status = request.status 149 for device in devices: 150 device.timestamp = datetime.datetime.now() 151 devices_to_put.append(device) 152 else: 153 logging.error( 154 "[heartbeat] Unexpected job status is received. - {}". 155 format(request.serial)) 156 if devices_to_put: 157 ndb.put_multi(devices_to_put) 158 159 if request.infra_log_url: 160 if request.infra_log_url.startswith(GCS_URL_PREFIX): 161 url = "{}{}".format( 162 STORAGE_API_URL, 163 request.infra_log_url[len(GCS_URL_PREFIX):]) 164 job.infra_log_url = url 165 elif re.match(HTTP_HTTPS_REGEX, request.infra_log_url): 166 job.infra_log_url = request.infra_log_url 167 else: 168 logging.debug("[heartbeat] Wrong infra_log_url address.") 169 170 job.heartbeat_stamp = datetime.datetime.now() 171 job.put() 172 model_util.UpdateParentSchedule(job, request.status) 173 return model.JobLeaseResponse( 174 return_code=model.ReturnCodeMessage.SUCCESS, 175 jobs=[job_message]) 176 177 return model.JobLeaseResponse( 178 return_code=model.ReturnCodeMessage.FAIL, jobs=[]) 179 180 @endpoints.method( 181 endpoint_base.GET_REQUEST_RESOURCE, 182 model.JobResponseMessage, 183 path="get", 184 http_method="POST", 185 name="get") 186 def get(self, request): 187 """Gets the jobs from datastore.""" 188 return_list, more = self.Get(request=request, 189 metaclass=model.JobModel, 190 message=model.JobMessage) 191 192 return model.JobResponseMessage(jobs=return_list, has_next=more) 193 194 @endpoints.method( 195 endpoint_base.COUNT_REQUEST_RESOURCE, 196 model.CountResponseMessage, 197 path="count", 198 http_method="POST", 199 name="count") 200 def count(self, request): 201 """Gets total number of JobModel entities stored in datastore.""" 202 filters = self.CreateFilterList( 203 filter_string=request.filter, metaclass=model.JobModel) 204 count = self.Count(metaclass=model.JobModel, filters=filters) 205 206 return model.CountResponseMessage(count=count) 207