1#!/usr/bin/env python
2#
3# Copyright (C) 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import datetime
19import unittest
20
21try:
22    from unittest import mock
23except ImportError:
24    import mock
25
26from webapp.src import vtslab_status as Status
27from webapp.src.proto import model
28from webapp.src.scheduler import job_heartbeat
29from webapp.src.scheduler import schedule_worker
30from webapp.src.testing import unittest_base
31
32
33class JobHeartbeatTest(unittest_base.UnitTestBase):
34    """Tests for PeriodicJobHeartBeat cron class.
35
36    Attributes:
37        testbed: A Testbed instance which provides local unit testing.
38        job_heartbeat: A mock job_heartbeat.PeriodicJobHeartBeat instance.
39    """
40
41    def setUp(self):
42        """Initializes test"""
43        super(JobHeartbeatTest, self).setUp()
44        # Mocking PeriodicJobHeartBeat and essential methods.
45        self.job_heartbeat = job_heartbeat.PeriodicJobHeartBeat(mock.Mock())
46        self.job_heartbeat.response = mock.Mock()
47        self.job_heartbeat.response.write = mock.Mock()
48
49    def testJobHearbeat(self):
50        """Asserts job heartbeat detects unavailable jobs."""
51        num_of_devices = 2
52        shards = 2
53
54        lab = self.GenerateLabModel()
55        lab.put()
56
57        devices = []
58        for _ in range(num_of_devices):
59            for i in range(shards):
60                device = self.GenerateDeviceModel(
61                    hostname=lab.hostname, product="product{}".format(i))
62                device.put()
63                devices.append(device)
64
65        schedules = []
66        for device in devices:
67            schedule = self.GenerateScheduleModel(
68                lab_model=lab, device_model=device, shards=shards)
69            schedule.put()
70            schedules.append(schedule)
71
72        for schedule in schedules:
73            build_dict = self.GenerateBuildModel(schedule)
74            for key in build_dict:
75                build_dict[key].put()
76
77        # Mocking ScheduleHandler and essential methods.
78        scheduler = schedule_worker.ScheduleHandler(mock.Mock())
79        scheduler.response = mock.Mock()
80        scheduler.response.write = mock.Mock()
81        scheduler.request.get = mock.MagicMock(return_value="")
82
83        # Creating jobs.
84        scheduler.post()
85        jobs = model.JobModel.query().fetch()
86        self.assertEqual(2, len(jobs))
87
88        # jobs[0] will get old enough so it will be timed out.
89        jobs[0].status = Status.JOB_STATUS_DICT["leased"]
90        jobs[0].timestamp = (datetime.datetime.now() - datetime.timedelta(
91            seconds=job_heartbeat.JOB_RESPONSE_TIMEOUT_SECONDS + 5))
92        jobs[0].heartbeat_stamp = (
93            datetime.datetime.now() - datetime.timedelta(
94                seconds=job_heartbeat.JOB_RESPONSE_TIMEOUT_SECONDS + 5))
95        jobs[0].put()
96
97        # jobs[1] will not exceed the timeout time.
98        jobs[1].status = Status.JOB_STATUS_DICT["leased"]
99        jobs[1].timestamp = (datetime.datetime.now() - datetime.timedelta(
100            seconds=job_heartbeat.JOB_RESPONSE_TIMEOUT_SECONDS - 5))
101        jobs[1].heartbeat_stamp = (
102            datetime.datetime.now() - datetime.timedelta(
103                seconds=job_heartbeat.JOB_RESPONSE_TIMEOUT_SECONDS - 5))
104        jobs[1].put()
105
106        # Creating jobs.
107        self.job_heartbeat.get()
108
109        # One job(job[0]) should be changed to infra-err status.
110        jobs = model.JobModel.query().fetch()
111        infra_error_jobs = [
112            x for x in jobs if x.status == Status.JOB_STATUS_DICT["infra-err"]
113        ]
114        self.assertEqual(len(infra_error_jobs), 1)
115
116        # job[0]'s devices should be changed to free scheduling status.
117        serials = infra_error_jobs[0].serial
118        devices = model.DeviceModel.query(
119            model.DeviceModel.serial.IN(serials)).fetch()
120        for device in devices:
121            self.assertEqual(device.scheduling_status,
122                              Status.DEVICE_SCHEDULING_STATUS_DICT["free"])
123
124
125if __name__ == "__main__":
126    unittest.main()
127