1#!/usr/bin/env python
2#
3# Copyright (C) 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import datetime
19import unittest
20
21try:
22    from unittest import mock
23except ImportError:
24    import mock
25
26from webapp.src import vtslab_status as Status
27from webapp.src.endpoint import job_queue
28from webapp.src.proto import model
29from webapp.src.testing import unittest_base
30
31
32class JobQueueTest(unittest_base.UnitTestBase):
33    """A class to test job_queue endpoint API."""
34
35    def setUp(self):
36        """Initializes test"""
37        super(JobQueueTest, self).setUp()
38
39    def testGetJobModel(self):
40        """Asserts job_queue/get API receives a job lease request."""
41        test_values = {
42            "test_type": Status.TEST_TYPE_DICT[Status.TEST_TYPE_TOT],
43            "hostname": self.GetRandomString(),
44            "priority": self.GetRandomString(),
45            "test_name": self.GetRandomString(),
46            "require_signed_device_build": False,
47            "has_bootloader_img": True,
48            "has_radio_img": False,
49            "device": self.GetRandomString(),
50            "serial": ["serial01", "serial02"],
51            "build_storage_type": Status.STORAGE_TYPE_DICT["GCS"],
52            "manifest_branch": self.GetRandomString(),
53            "build_target": self.GetRandomString(),
54            "build_id": self.GetRandomString(),
55            "pab_account_id": self.GetRandomString(),
56            "shards": 1,
57            "param": [""],
58            "status": Status.JOB_STATUS_DICT["ready"],
59            "period": 360,
60            "gsi_storage_type": Status.STORAGE_TYPE_DICT["GCS"],
61            "gsi_branch": self.GetRandomString(),
62            "gsi_build_target": self.GetRandomString(),
63            "gsi_build_id": self.GetRandomString(),
64            "gsi_pab_account_id": self.GetRandomString(),
65            "gsi_vendor_version": self.GetRandomString(),
66            "test_storage_type": Status.STORAGE_TYPE_DICT["GCS"],
67            "test_branch": self.GetRandomString(),
68            "test_build_target": self.GetRandomString(),
69            "test_build_id": self.GetRandomString(),
70            "test_pab_account_id": self.GetRandomString(),
71            "retry_count": 2,
72            "infra_log_url": self.GetRandomString(),
73            "image_package_repo_base": self.GetRandomString(),
74            "report_bucket": [self.GetRandomString()],
75            "report_spreadsheet_id": [self.GetRandomString()],
76            "report_persistent_url": [self.GetRandomString()],
77            "report_reference_url": [self.GetRandomString()],
78        }
79
80        for serial in test_values["serial"]:
81            self.GenerateDeviceModel(serial=serial).put()
82
83        job = model.JobModel()
84        for key in test_values:
85            setattr(job, key, test_values[key])
86        job.timestamp = datetime.datetime.now()
87        job.put()
88
89        container = (job_queue.JOB_QUEUE_RESOURCE.combined_message_class(
90            hostname=test_values["hostname"]))
91        api = job_queue.JobQueueApi()
92        response = api.lease(container)
93
94        self.assertEqual(response.return_code,
95                          model.ReturnCodeMessage.SUCCESS)
96        self.assertEqual(len(response.jobs), 1)
97        for key in test_values:
98            if key is "status":
99                self.assertEqual(
100                    getattr(response.jobs[0], key),
101                    Status.JOB_STATUS_DICT["leased"])
102            else:
103                self.assertEqual(
104                    getattr(response.jobs[0], key), test_values[key])
105
106        devices = model.DeviceModel.query().fetch()
107        for device in devices:
108            self.assertEqual(device.scheduling_status,
109                              Status.DEVICE_SCHEDULING_STATUS_DICT["use"])
110
111        # test job heartbeat api
112        container = (job_queue.JOB_QUEUE_RESOURCE.combined_message_class(
113            hostname=response.jobs[0].hostname,
114            manifest_branch=response.jobs[0].manifest_branch,
115            build_target=response.jobs[0].build_target,
116            test_name=response.jobs[0].test_name,
117            serial=response.jobs[0].serial,
118            status=response.jobs[0].status,
119        ))
120        api = job_queue.JobQueueApi()
121        response = api.heartbeat(container)
122        self.assertEqual(response.return_code,
123                          model.ReturnCodeMessage.SUCCESS)
124
125        jobs = model.JobModel.query().fetch()
126        self.assertEqual(len(jobs), 1)
127        self.assertEqual(jobs[0].status, Status.JOB_STATUS_DICT["leased"])
128        self.assertTrue(datetime.datetime.now() - jobs[0].heartbeat_stamp <
129                        datetime.timedelta(seconds=1))
130
131        # test job heartbeat api to complete the job
132        container = (job_queue.JOB_QUEUE_RESOURCE.combined_message_class(
133            hostname=response.jobs[0].hostname,
134            manifest_branch=response.jobs[0].manifest_branch,
135            build_target=response.jobs[0].build_target,
136            test_name=response.jobs[0].test_name,
137            serial=response.jobs[0].serial,
138            status=Status.JOB_STATUS_DICT["complete"],
139        ))
140        api = job_queue.JobQueueApi()
141        response = api.heartbeat(container)
142        self.assertEqual(response.return_code,
143                          model.ReturnCodeMessage.SUCCESS)
144
145        jobs = model.JobModel.query().fetch()
146        self.assertEqual(len(jobs), 1)
147        self.assertEqual(jobs[0].status, Status.JOB_STATUS_DICT["complete"])
148        self.assertTrue(datetime.datetime.now() - jobs[0].heartbeat_stamp <
149                        datetime.timedelta(seconds=1))
150
151        devices = model.DeviceModel.query().fetch()
152        for device in devices:
153            self.assertEqual(device.scheduling_status,
154                              Status.DEVICE_SCHEDULING_STATUS_DICT["free"])
155
156
157if __name__ == "__main__":
158    unittest.main()
159