1#!/usr/bin/env python 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import datetime 19import unittest 20 21try: 22 from unittest import mock 23except ImportError: 24 import mock 25 26from webapp.src import vtslab_status as Status 27from webapp.src.endpoint import job_queue 28from webapp.src.proto import model 29from webapp.src.testing import unittest_base 30 31 32class JobQueueTest(unittest_base.UnitTestBase): 33 """A class to test job_queue endpoint API.""" 34 35 def setUp(self): 36 """Initializes test""" 37 super(JobQueueTest, self).setUp() 38 39 def testGetJobModel(self): 40 """Asserts job_queue/get API receives a job lease request.""" 41 test_values = { 42 "test_type": Status.TEST_TYPE_DICT[Status.TEST_TYPE_TOT], 43 "hostname": self.GetRandomString(), 44 "priority": self.GetRandomString(), 45 "test_name": self.GetRandomString(), 46 "require_signed_device_build": False, 47 "has_bootloader_img": True, 48 "has_radio_img": False, 49 "device": self.GetRandomString(), 50 "serial": ["serial01", "serial02"], 51 "build_storage_type": Status.STORAGE_TYPE_DICT["GCS"], 52 "manifest_branch": self.GetRandomString(), 53 "build_target": self.GetRandomString(), 54 "build_id": self.GetRandomString(), 55 "pab_account_id": self.GetRandomString(), 56 "shards": 1, 57 "param": [""], 58 "status": Status.JOB_STATUS_DICT["ready"], 59 "period": 360, 60 "gsi_storage_type": Status.STORAGE_TYPE_DICT["GCS"], 61 "gsi_branch": self.GetRandomString(), 62 "gsi_build_target": self.GetRandomString(), 63 "gsi_build_id": self.GetRandomString(), 64 "gsi_pab_account_id": self.GetRandomString(), 65 "gsi_vendor_version": self.GetRandomString(), 66 "test_storage_type": Status.STORAGE_TYPE_DICT["GCS"], 67 "test_branch": self.GetRandomString(), 68 "test_build_target": self.GetRandomString(), 69 "test_build_id": self.GetRandomString(), 70 "test_pab_account_id": self.GetRandomString(), 71 "retry_count": 2, 72 "infra_log_url": self.GetRandomString(), 73 "image_package_repo_base": self.GetRandomString(), 74 "report_bucket": [self.GetRandomString()], 75 "report_spreadsheet_id": [self.GetRandomString()], 76 "report_persistent_url": [self.GetRandomString()], 77 "report_reference_url": [self.GetRandomString()], 78 } 79 80 for serial in test_values["serial"]: 81 self.GenerateDeviceModel(serial=serial).put() 82 83 job = model.JobModel() 84 for key in test_values: 85 setattr(job, key, test_values[key]) 86 job.timestamp = datetime.datetime.now() 87 job.put() 88 89 container = (job_queue.JOB_QUEUE_RESOURCE.combined_message_class( 90 hostname=test_values["hostname"])) 91 api = job_queue.JobQueueApi() 92 response = api.lease(container) 93 94 self.assertEqual(response.return_code, 95 model.ReturnCodeMessage.SUCCESS) 96 self.assertEqual(len(response.jobs), 1) 97 for key in test_values: 98 if key is "status": 99 self.assertEqual( 100 getattr(response.jobs[0], key), 101 Status.JOB_STATUS_DICT["leased"]) 102 else: 103 self.assertEqual( 104 getattr(response.jobs[0], key), test_values[key]) 105 106 devices = model.DeviceModel.query().fetch() 107 for device in devices: 108 self.assertEqual(device.scheduling_status, 109 Status.DEVICE_SCHEDULING_STATUS_DICT["use"]) 110 111 # test job heartbeat api 112 container = (job_queue.JOB_QUEUE_RESOURCE.combined_message_class( 113 hostname=response.jobs[0].hostname, 114 manifest_branch=response.jobs[0].manifest_branch, 115 build_target=response.jobs[0].build_target, 116 test_name=response.jobs[0].test_name, 117 serial=response.jobs[0].serial, 118 status=response.jobs[0].status, 119 )) 120 api = job_queue.JobQueueApi() 121 response = api.heartbeat(container) 122 self.assertEqual(response.return_code, 123 model.ReturnCodeMessage.SUCCESS) 124 125 jobs = model.JobModel.query().fetch() 126 self.assertEqual(len(jobs), 1) 127 self.assertEqual(jobs[0].status, Status.JOB_STATUS_DICT["leased"]) 128 self.assertTrue(datetime.datetime.now() - jobs[0].heartbeat_stamp < 129 datetime.timedelta(seconds=1)) 130 131 # test job heartbeat api to complete the job 132 container = (job_queue.JOB_QUEUE_RESOURCE.combined_message_class( 133 hostname=response.jobs[0].hostname, 134 manifest_branch=response.jobs[0].manifest_branch, 135 build_target=response.jobs[0].build_target, 136 test_name=response.jobs[0].test_name, 137 serial=response.jobs[0].serial, 138 status=Status.JOB_STATUS_DICT["complete"], 139 )) 140 api = job_queue.JobQueueApi() 141 response = api.heartbeat(container) 142 self.assertEqual(response.return_code, 143 model.ReturnCodeMessage.SUCCESS) 144 145 jobs = model.JobModel.query().fetch() 146 self.assertEqual(len(jobs), 1) 147 self.assertEqual(jobs[0].status, Status.JOB_STATUS_DICT["complete"]) 148 self.assertTrue(datetime.datetime.now() - jobs[0].heartbeat_stamp < 149 datetime.timedelta(seconds=1)) 150 151 devices = model.DeviceModel.query().fetch() 152 for device in devices: 153 self.assertEqual(device.scheduling_status, 154 Status.DEVICE_SCHEDULING_STATUS_DICT["free"]) 155 156 157if __name__ == "__main__": 158 unittest.main() 159