1# 2# Copyright (C) 2017 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16 17import json 18import logging 19import requests 20import threading 21import time 22 23from host_controller.utils.parser import pb2_utils 24 25# Job status dict 26JOB_STATUS_DICT = { 27 # scheduled but not leased yet 28 "ready": 0, 29 # scheduled and in running 30 "leased": 1, 31 # completed job 32 "complete": 2, 33 # unexpected error during running 34 "infra-err": 3, 35 # never leased within schedule period 36 "expired": 4, 37 # device boot error after flashing the given img sets 38 "bootup-err": 5 39} 40 41SCHEDULE_INFO_PB2_ATTR_FILTERS = { 42 "pab_account_id": "device_pab_account_id", 43 "name": "build_target", 44} 45 46# timeout seconds for requests 47REQUESTS_TIMEOUT_SECONDS = 60 48 49 50class VtiEndpointClient(object): 51 """VTI (Vendor Test Infrastructure) endpoint client. 52 53 Attributes: 54 _headers: A dictionary, containing HTTP request header information. 55 _url: string, the base URL of an endpoint API. 56 _job: dict, currently leased job info. 57 """ 58 59 def __init__(self, url): 60 if url == "localhost": 61 url = "http://localhost:8080/_ah/api/" 62 else: 63 if not url.startswith(("https://")) and not url.startswith("http://"): 64 url = "https://" + url 65 if url.endswith("appspot.com"): 66 url += "/_ah/api/" 67 self._headers = {"content-type": "application/json", 68 "Accept-Charset": "UTF-8"} 69 self._url = url 70 self._job = {} 71 self._heartbeat_thread = None 72 73 def UploadBuildInfo(self, builds): 74 """Uploads the given build information to VTI. 75 76 Args: 77 builds: a list of dictionaries, containing info about all new 78 builds found. 79 80 Returns: 81 True if successful, False otherwise. 82 """ 83 url = self._url + "build/v1/set" 84 fail = False 85 for build in builds: 86 try: 87 response = requests.post(url, data=json.dumps(build), 88 headers=self._headers, 89 timeout=REQUESTS_TIMEOUT_SECONDS) 90 if response.status_code != requests.codes.ok: 91 logging.error("UploadBuildInfo error: %s", response) 92 fail = True 93 except requests.exceptions.Timeout as e: 94 logging.exception(e) 95 fail = True 96 if fail: 97 return False 98 return True 99 100 def UploadDeviceInfo(self, hostname, devices): 101 """Uploads the given device information to VTI. 102 103 Args: 104 hostname: string, the hostname of a target host. 105 devices: a list of dicts, containing info about all detected 106 devices that are attached to the host. 107 108 Returns: 109 True if successful, False otherwise. 110 """ 111 url = self._url + "host/v1/set" 112 payload = {} 113 payload["hostname"] = hostname 114 payload["devices"] = [] 115 for device in devices: 116 new_device = { 117 "serial": device["serial"], 118 "product": device["product"], 119 "status": device["status"]} 120 payload["devices"].append(new_device) 121 122 try: 123 response = requests.post(url, data=json.dumps(payload), 124 headers=self._headers, 125 timeout=REQUESTS_TIMEOUT_SECONDS) 126 except (requests.exceptions.ConnectionError, 127 requests.exceptions.Timeout) as e: 128 logging.exception(e) 129 return False 130 if response.status_code != requests.codes.ok: 131 logging.error("UploadDeviceInfo error: %s", response) 132 return False 133 return True 134 135 def UploadScheduleInfo(self, pbs, clear_schedule): 136 """Uploads the given schedule information to VTI. 137 138 Args: 139 pbs: a list of dicts, containing info about all task schedules. 140 clear_schedule: bool, True to clear all schedule data exist on the 141 scheduler 142 143 Returns: 144 True if successful, False otherwise. 145 """ 146 if pbs is None or len(pbs) == 0: 147 return False 148 149 url = self._url + "schedule/v1/clear" 150 succ = True 151 if clear_schedule: 152 try: 153 response = requests.post( 154 url, data=json.dumps({"manifest_branch": "na"}), 155 headers=self._headers, timeout=REQUESTS_TIMEOUT_SECONDS) 156 except requests.exceptions.Timeout as e: 157 logging.exception(e) 158 return False 159 if response.status_code != requests.codes.ok: 160 logging.error("UploadScheduleInfo error: %s", response) 161 succ = False 162 163 if not succ: 164 return False 165 166 url = self._url + "schedule/v1/set" 167 for pb in pbs: 168 schedule = {} 169 succ = succ and pb2_utils.FillDictAndPost( 170 pb, schedule, url, self._headers, 171 SCHEDULE_INFO_PB2_ATTR_FILTERS, "UploadScheduleInfo") 172 173 return succ 174 175 def UploadLabInfo(self, pbs, clear_labinfo): 176 """Uploads the given lab information to VTI. 177 178 Args: 179 pbs: a list of dicts, containing info about all known labs. 180 clear_labinfo: bool, True to clear all lab data exist on the 181 scheduler 182 183 Returns: 184 True if successful, False otherwise. 185 """ 186 if pbs is None or len(pbs) == 0: 187 return 188 189 url = self._url + "lab/v1/clear" 190 succ = True 191 if clear_labinfo: 192 try: 193 response = requests.post(url, data=json.dumps({"name": "na"}), 194 headers=self._headers, 195 timeout=REQUESTS_TIMEOUT_SECONDS) 196 except requests.exceptions.Timeout as e: 197 logging.exception(e) 198 return False 199 if response.status_code != requests.codes.ok: 200 logging.error("UploadLabInfo error: %s", response) 201 succ = False 202 203 if not succ: 204 return False 205 206 url = self._url + "lab/v1/set" 207 for pb in pbs: 208 lab = {} 209 lab["name"] = pb.name 210 lab["owner"] = pb.owner 211 lab["admin"] = [] 212 lab["admin"].extend(pb.admin) 213 lab["host"] = [] 214 for host in pb.host: 215 new_host = {} 216 new_host["hostname"] = host.hostname 217 new_host["ip"] = host.ip 218 new_host["script"] = host.script 219 if host.host_equipment: 220 new_host["host_equipment"] = [] 221 new_host["host_equipment"].extend(host.host_equipment) 222 new_host["device"] = [] 223 if host.device: 224 for device in host.device: 225 new_device = {} 226 new_device["serial"] = device.serial 227 new_device["product"] = device.product 228 if device.device_equipment: 229 new_device["device_equipment"] = [] 230 new_device["device_equipment"].extend( 231 device.device_equipment) 232 new_host["device"].append(new_device) 233 lab["host"].append(new_host) 234 try: 235 response = requests.post(url, data=json.dumps(lab), 236 headers=self._headers, 237 timeout=REQUESTS_TIMEOUT_SECONDS) 238 if response.status_code != requests.codes.ok: 239 logging.error("UploadLabInfo error: %s", response) 240 succ = False 241 except requests.exceptions.Timeout as e: 242 logging.exception(e) 243 succ = False 244 return succ 245 246 def LeaseJob(self, hostname, execute=True): 247 """Leases a job for the given host, 'hostname'. 248 249 Args: 250 hostname: string, the hostname of a target host. 251 execute: boolean, True to lease and execute StartHeartbeat, which is 252 the case that the leased job will be executed on this 253 process's context. 254 255 Returns: 256 True if successful, False otherwise. 257 """ 258 if not hostname: 259 return None, {} 260 261 url = self._url + "job/v1/lease" 262 try: 263 response = requests.post(url, data=json.dumps({"hostname": hostname}), 264 headers=self._headers, 265 timeout=REQUESTS_TIMEOUT_SECONDS) 266 except requests.exceptions.Timeout as e: 267 logging.exception(e) 268 return None, {} 269 270 if response.status_code != requests.codes.ok: 271 logging.error("LeaseJob error: %s", response.status_code) 272 return None, {} 273 274 response_json = json.loads(response.text) 275 if ("return_code" in response_json 276 and response_json["return_code"] != "SUCCESS"): 277 logging.debug("LeaseJob error: %s", response_json) 278 return None, {} 279 280 if "jobs" not in response_json: 281 logging.error( 282 "LeaseJob jobs not found in response json %s", response.text) 283 return None, {} 284 285 jobs = response_json["jobs"] 286 if jobs and len(jobs) > 0: 287 for job in jobs: 288 if execute == True: 289 self._job = job 290 self.StartHeartbeat("leased", 60) 291 return job["test_name"].split("/")[0], job 292 return None, {} 293 294 def ExecuteJob(self, job): 295 """Executes leased job passed from parent process. 296 297 Args: 298 job: dict, information the on leased job. 299 300 Returns: 301 a string which is path to a script file for onecmd(). 302 a dict contains info on the leased job, will be passed to onecmd(). 303 """ 304 logging.info("Job info : {}".format(json.dumps(job))) 305 if job is not None: 306 self._job = job 307 self.StartHeartbeat("leased", 60) 308 return job["test_name"].split("/")[0], job 309 310 return None, {} 311 312 def UpdateLeasedJobStatus(self, status, update_interval): 313 """Updates the status of the leased job. 314 315 Args: 316 status: string, status value. 317 update_interval: int, time between heartbeats in second. 318 """ 319 if self._job is None: 320 return 321 322 url = self._url + "job/v1/heartbeat" 323 self._job["status"] = JOB_STATUS_DICT[status] 324 325 thread = threading.currentThread() 326 while getattr(thread, 'keep_running', True): 327 try: 328 response = requests.post(url, data=json.dumps(self._job), 329 headers=self._headers, 330 timeout=REQUESTS_TIMEOUT_SECONDS) 331 if response.status_code != requests.codes.ok: 332 logging.error("UpdateLeasedJobStatus error: %s", response) 333 except requests.exceptions.Timeout as e: 334 logging.exception(e) 335 time.sleep(update_interval) 336 337 def StartHeartbeat(self, status="leased", update_interval=60): 338 """Starts the hearbeat_thread. 339 340 Args: 341 status: string, status value. 342 update_interval: int, time between heartbeats in second. 343 """ 344 if (self._heartbeat_thread is None 345 or hasattr(self._heartbeat_thread, 'keep_running')): 346 self._heartbeat_thread = threading.Thread( 347 target=self.UpdateLeasedJobStatus, 348 args=( 349 status, 350 update_interval, 351 )) 352 self._heartbeat_thread.daemon = True 353 self._heartbeat_thread.start() 354 355 def StopHeartbeat(self, status="complete", infra_log_url=""): 356 """Stops the hearbeat_thread and sets current job's status. 357 358 Args: 359 status: string, status value. 360 infra_log_url: string, URL to the uploaded infra log. 361 """ 362 self._heartbeat_thread.keep_running = False 363 364 if self._job is None: 365 return 366 367 url = self._url + "job/v1/heartbeat" 368 self.SetJobStatusFromLeasedTo(status) 369 self._job["infra_log_url"] = infra_log_url 370 371 try: 372 response = requests.post(url, data=json.dumps(self._job), 373 headers=self._headers, 374 timeout=REQUESTS_TIMEOUT_SECONDS) 375 if response.status_code != requests.codes.ok: 376 logging.error("StopHeartbeat error: %s", response) 377 except requests.exceptions.Timeout as e: 378 logging.exception(e) 379 380 self._job = None 381 382 def SetJobStatusFromLeasedTo(self, status): 383 """Sets current job's status only when the job's status is 'leased'. 384 385 Args: 386 status: string, status value. 387 """ 388 if (self._job is not None and 389 self._job["status"] == JOB_STATUS_DICT["leased"]): 390 self._job["status"] = JOB_STATUS_DICT[status] 391 392 def UploadHostVersion(self, hostname, vtslab_version): 393 """Uploads vtslab version. 394 395 Args: 396 hostname: string, the name of the host. 397 vtslab_version: string, current version of vtslab package. 398 """ 399 url = self._url + "lab/v1/set_version" 400 host = {} 401 host["hostname"] = hostname 402 host["vtslab_version"] = vtslab_version 403 404 try: 405 response = requests.post(url, data=json.dumps(host), 406 headers=self._headers, 407 timeout=REQUESTS_TIMEOUT_SECONDS) 408 except (requests.exceptions.ConnectionError, 409 requests.exceptions.Timeout) as e: 410 logging.exception(e) 411 return 412 if response.status_code != requests.codes.ok: 413 logging.error("UploadHostVersion error: %s", response) 414 415 def CheckBootUpStatus(self): 416 """Checks whether the device_img + gsi from the job fails to boot up. 417 418 Returns: 419 True if the devices flashed with the given imgs from the leased job 420 succeed to boot up. False otherwise. 421 """ 422 if self._job: 423 return (self._job["status"] != JOB_STATUS_DICT["bootup-err"]) 424 return False 425 426 def GetJobTestType(self): 427 """Returns the test type of the leased job. 428 429 Returns: 430 int, test_type attr in the job message. 0 when there is no job 431 leased to this vti_endpoint_client. 432 """ 433 if self._job and "test_type" in self._job: 434 try: 435 return int(self._job["test_type"]) 436 except ValueError as e: 437 logging.exception(e) 438 return 0 439 440 def GetJobDeviceProductName(self): 441 """Returns the product name of the DUTs of the leased job. 442 443 Returns: 444 string, product name. An empty string if there is no job leased or 445 "device" attr of the job obj is not well formatted. 446 """ 447 if self._job and "device" in self._job: 448 try: 449 return self._job["device"].split("/")[1] 450 except IndexError as e: 451 logging.exception(e) 452 return "" 453