1# Copyright 2017 Google Inc. All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Job Queue Info APIs implemented using Google Cloud Endpoints."""
15
16import datetime
17import endpoints
18import logging
19import re
20
21from webapp.src import vtslab_status as Status
22from webapp.src.endpoint import endpoint_base
23from webapp.src.proto import model
24from webapp.src.utils import email_util
25from webapp.src.utils import model_util
26
27from google.appengine.ext import ndb
28
29JOB_QUEUE_RESOURCE = endpoints.ResourceContainer(model.JobMessage)
30GCS_URL_PREFIX = "gs://"
31HTTP_HTTPS_REGEX = "^https?://"
32STORAGE_API_URL = "https://storage.cloud.google.com/"
33
34
35@endpoints.api(name='job', version='v1')
36class JobQueueApi(endpoint_base.EndpointBase):
37    """Endpoint API for job_queue."""
38
39    @endpoints.method(
40        JOB_QUEUE_RESOURCE,
41        model.JobLeaseResponse,
42        path='lease',
43        http_method='POST',
44        name='lease')
45    def lease(self, request):
46        """Gets the job(s) based on the condition specified in `request`."""
47        job_query = model.JobModel.query(
48            model.JobModel.hostname == request.hostname,
49            model.JobModel.status == Status.JOB_STATUS_DICT["ready"])
50        existing_jobs = job_query.fetch()
51
52        priority_sorted_jobs = sorted(
53            existing_jobs,
54            key=lambda x: (Status.GetPriorityValue(x.priority), x.timestamp))
55
56        if priority_sorted_jobs:
57            job = priority_sorted_jobs[0]
58            job.status = Status.JOB_STATUS_DICT["leased"]
59            job.put()
60
61            job_message = model.JobMessage()
62            common_attributes = self.GetCommonAttributes(job, model.JobMessage)
63            for attr in common_attributes:
64                setattr(job_message, attr, getattr(job, attr))
65
66            device_query = model.DeviceModel.query(
67                model.DeviceModel.serial.IN(job.serial))
68            devices = device_query.fetch()
69            devices_to_put = []
70            for device in devices:
71                device.scheduling_status = Status.DEVICE_SCHEDULING_STATUS_DICT[
72                    "use"]
73                devices_to_put.append(device)
74            if devices_to_put:
75                ndb.put_multi(devices_to_put)
76
77            return model.JobLeaseResponse(
78                return_code=model.ReturnCodeMessage.SUCCESS,
79                jobs=[job_message])
80        else:
81            return model.JobLeaseResponse(
82                return_code=model.ReturnCodeMessage.FAIL, jobs=[])
83
84    @endpoints.method(
85        JOB_QUEUE_RESOURCE,
86        model.JobLeaseResponse,
87        path='heartbeat',
88        http_method='POST',
89        name='heartbeat')
90    def heartbeat(self, request):
91        """Processes the heartbeat signal from HC which leased queued job(s)."""
92        # minify jobs by query and confirm with serial from fetched jobs
93        job_query = model.JobModel.query(
94            model.JobModel.hostname == request.hostname,
95            model.JobModel.manifest_branch == request.manifest_branch,
96            model.JobModel.build_target == request.build_target,
97            model.JobModel.test_name == request.test_name,
98            model.JobModel.status == Status.JOB_STATUS_DICT["leased"])
99        existing_jobs = job_query.fetch()
100        same_jobs = [
101            x for x in existing_jobs if set(x.serial) == set(request.serial)
102        ]
103
104        if len(same_jobs) > 1:
105            logging.warning("[heartbeat] more than one job is found!")
106            logging.warning(
107                "[heartbeat] <hostname>{} <manifest_branch>{} "
108                "<build_target>{} <test_name>{} <serials>{}".format(
109                    request.hostname, request.manifest_branch,
110                    request.build_target, request.test_name, request.serial))
111
112        if same_jobs:
113            job = same_jobs[0]
114            job_message = model.JobMessage()
115            common_attributes = self.GetCommonAttributes(job, model.JobMessage)
116            for attr in common_attributes:
117                setattr(job_message, attr, getattr(job, attr))
118
119            device_query = model.DeviceModel.query(
120                model.DeviceModel.serial.IN(job.serial))
121            devices = device_query.fetch()
122            logging.debug("[heartbeat] heartbeat job: hostname={}, "
123                          "test_name={}, job creation time={}".format(
124                              job.hostname, job.test_name, job.timestamp))
125            logging.debug("[heartbeat] request status: {}".format(
126                request.status))
127            logging.debug("[heartbeat]  - devices = {}".format(
128                ", ".join([device.serial for device in devices])))
129            devices_to_put = []
130            if request.status == Status.JOB_STATUS_DICT["complete"]:
131                job.status = request.status
132                for device in devices:
133                    device.scheduling_status = (
134                        Status.DEVICE_SCHEDULING_STATUS_DICT["free"])
135                    devices_to_put.append(device)
136            elif (request.status in [
137                    Status.JOB_STATUS_DICT["infra-err"],
138                    Status.JOB_STATUS_DICT["bootup-err"]
139            ]):
140                job.status = request.status
141                email_util.send_job_notification(job)
142                for device in devices:
143                    device.scheduling_status = (
144                        Status.DEVICE_SCHEDULING_STATUS_DICT["free"])
145                    device.status = Status.DEVICE_STATUS_DICT["unknown"]
146                    devices_to_put.append(device)
147            elif request.status == Status.JOB_STATUS_DICT["leased"]:
148                job.status = request.status
149                for device in devices:
150                    device.timestamp = datetime.datetime.now()
151                    devices_to_put.append(device)
152            else:
153                logging.error(
154                    "[heartbeat] Unexpected job status is received. - {}".
155                    format(request.serial))
156            if devices_to_put:
157                ndb.put_multi(devices_to_put)
158
159            if request.infra_log_url:
160                if request.infra_log_url.startswith(GCS_URL_PREFIX):
161                    url = "{}{}".format(
162                        STORAGE_API_URL,
163                        request.infra_log_url[len(GCS_URL_PREFIX):])
164                    job.infra_log_url = url
165                elif re.match(HTTP_HTTPS_REGEX, request.infra_log_url):
166                    job.infra_log_url = request.infra_log_url
167                else:
168                    logging.debug("[heartbeat] Wrong infra_log_url address.")
169
170            job.heartbeat_stamp = datetime.datetime.now()
171            job.put()
172            model_util.UpdateParentSchedule(job, request.status)
173            return model.JobLeaseResponse(
174                return_code=model.ReturnCodeMessage.SUCCESS,
175                jobs=[job_message])
176
177        return model.JobLeaseResponse(
178            return_code=model.ReturnCodeMessage.FAIL, jobs=[])
179
180    @endpoints.method(
181        endpoint_base.GET_REQUEST_RESOURCE,
182        model.JobResponseMessage,
183        path="get",
184        http_method="POST",
185        name="get")
186    def get(self, request):
187        """Gets the jobs from datastore."""
188        return_list, more = self.Get(request=request,
189                                     metaclass=model.JobModel,
190                                     message=model.JobMessage)
191
192        return model.JobResponseMessage(jobs=return_list, has_next=more)
193
194    @endpoints.method(
195        endpoint_base.COUNT_REQUEST_RESOURCE,
196        model.CountResponseMessage,
197        path="count",
198        http_method="POST",
199        name="count")
200    def count(self, request):
201        """Gets total number of JobModel entities stored in datastore."""
202        filters = self.CreateFilterList(
203            filter_string=request.filter, metaclass=model.JobModel)
204        count = self.Count(metaclass=model.JobModel, filters=filters)
205
206        return model.CountResponseMessage(count=count)
207