1#
2# Copyright (C) 2017 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import json
18import logging
19import requests
20import threading
21import time
22
23from host_controller.utils.parser import pb2_utils
24
25# Job status dict
26JOB_STATUS_DICT = {
27    # scheduled but not leased yet
28    "ready": 0,
29    # scheduled and in running
30    "leased": 1,
31    # completed job
32    "complete": 2,
33    # unexpected error during running
34    "infra-err": 3,
35    # never leased within schedule period
36    "expired": 4,
37    # device boot error after flashing the given img sets
38    "bootup-err": 5
39}
40
41SCHEDULE_INFO_PB2_ATTR_FILTERS = {
42    "pab_account_id": "device_pab_account_id",
43    "name": "build_target",
44}
45
46# timeout seconds for requests
47REQUESTS_TIMEOUT_SECONDS = 60
48
49
50class VtiEndpointClient(object):
51    """VTI (Vendor Test Infrastructure) endpoint client.
52
53    Attributes:
54        _headers: A dictionary, containing HTTP request header information.
55        _url: string, the base URL of an endpoint API.
56        _job: dict, currently leased job info.
57    """
58
59    def __init__(self, url):
60        if url == "localhost":
61            url = "http://localhost:8080/_ah/api/"
62        else:
63            if not url.startswith(("https://")) and not url.startswith("http://"):
64                url = "https://" + url
65            if url.endswith("appspot.com"):
66                url += "/_ah/api/"
67        self._headers = {"content-type": "application/json",
68                   "Accept-Charset": "UTF-8"}
69        self._url = url
70        self._job = {}
71        self._heartbeat_thread = None
72
73    def UploadBuildInfo(self, builds):
74        """Uploads the given build information to VTI.
75
76        Args:
77            builds: a list of dictionaries, containing info about all new
78                    builds found.
79
80        Returns:
81            True if successful, False otherwise.
82        """
83        url = self._url + "build/v1/set"
84        fail = False
85        for build in builds:
86            try:
87                response = requests.post(url, data=json.dumps(build),
88                                         headers=self._headers,
89                                         timeout=REQUESTS_TIMEOUT_SECONDS)
90                if response.status_code != requests.codes.ok:
91                    logging.error("UploadBuildInfo error: %s", response)
92                    fail = True
93            except requests.exceptions.Timeout as e:
94                logging.exception(e)
95                fail = True
96        if fail:
97            return False
98        return True
99
100    def UploadDeviceInfo(self, hostname, devices):
101        """Uploads the given device information to VTI.
102
103        Args:
104            hostname: string, the hostname of a target host.
105            devices: a list of dicts, containing info about all detected
106                     devices that are attached to the host.
107
108        Returns:
109            True if successful, False otherwise.
110        """
111        url = self._url + "host/v1/set"
112        payload = {}
113        payload["hostname"] = hostname
114        payload["devices"] = []
115        for device in devices:
116            new_device = {
117                "serial": device["serial"],
118                "product": device["product"],
119                "status": device["status"]}
120            payload["devices"].append(new_device)
121
122        try:
123            response = requests.post(url, data=json.dumps(payload),
124                                     headers=self._headers,
125                                     timeout=REQUESTS_TIMEOUT_SECONDS)
126        except (requests.exceptions.ConnectionError,
127                requests.exceptions.Timeout) as e:
128            logging.exception(e)
129            return False
130        if response.status_code != requests.codes.ok:
131            logging.error("UploadDeviceInfo error: %s", response)
132            return False
133        return True
134
135    def UploadScheduleInfo(self, pbs, clear_schedule):
136        """Uploads the given schedule information to VTI.
137
138        Args:
139            pbs: a list of dicts, containing info about all task schedules.
140            clear_schedule: bool, True to clear all schedule data exist on the
141                            scheduler
142
143        Returns:
144            True if successful, False otherwise.
145        """
146        if pbs is None or len(pbs) == 0:
147            return False
148
149        url = self._url + "schedule/v1/clear"
150        succ = True
151        if clear_schedule:
152            try:
153                response = requests.post(
154                    url, data=json.dumps({"manifest_branch": "na"}),
155                    headers=self._headers, timeout=REQUESTS_TIMEOUT_SECONDS)
156            except requests.exceptions.Timeout as e:
157                logging.exception(e)
158                return False
159            if response.status_code != requests.codes.ok:
160                logging.error("UploadScheduleInfo error: %s", response)
161                succ = False
162
163        if not succ:
164            return False
165
166        url = self._url + "schedule/v1/set"
167        for pb in pbs:
168            schedule = {}
169            succ = succ and pb2_utils.FillDictAndPost(
170                pb, schedule, url, self._headers,
171                SCHEDULE_INFO_PB2_ATTR_FILTERS, "UploadScheduleInfo")
172
173        return succ
174
175    def UploadLabInfo(self, pbs, clear_labinfo):
176        """Uploads the given lab information to VTI.
177
178        Args:
179            pbs: a list of dicts, containing info about all known labs.
180            clear_labinfo: bool, True to clear all lab data exist on the
181                           scheduler
182
183        Returns:
184            True if successful, False otherwise.
185        """
186        if pbs is None or len(pbs) == 0:
187            return
188
189        url = self._url + "lab/v1/clear"
190        succ = True
191        if clear_labinfo:
192            try:
193                response = requests.post(url, data=json.dumps({"name": "na"}),
194                                         headers=self._headers,
195                                         timeout=REQUESTS_TIMEOUT_SECONDS)
196            except requests.exceptions.Timeout as e:
197                logging.exception(e)
198                return False
199            if response.status_code != requests.codes.ok:
200                logging.error("UploadLabInfo error: %s", response)
201                succ = False
202
203        if not succ:
204            return False
205
206        url = self._url + "lab/v1/set"
207        for pb in pbs:
208            lab = {}
209            lab["name"] = pb.name
210            lab["owner"] = pb.owner
211            lab["admin"] = []
212            lab["admin"].extend(pb.admin)
213            lab["host"] = []
214            for host in pb.host:
215                new_host = {}
216                new_host["hostname"] = host.hostname
217                new_host["ip"] = host.ip
218                new_host["script"] = host.script
219                if host.host_equipment:
220                    new_host["host_equipment"] = []
221                    new_host["host_equipment"].extend(host.host_equipment)
222                new_host["device"] = []
223                if host.device:
224                    for device in host.device:
225                        new_device = {}
226                        new_device["serial"] = device.serial
227                        new_device["product"] = device.product
228                        if device.device_equipment:
229                            new_device["device_equipment"] = []
230                            new_device["device_equipment"].extend(
231                                device.device_equipment)
232                        new_host["device"].append(new_device)
233                lab["host"].append(new_host)
234            try:
235                response = requests.post(url, data=json.dumps(lab),
236                                         headers=self._headers,
237                                         timeout=REQUESTS_TIMEOUT_SECONDS)
238                if response.status_code != requests.codes.ok:
239                    logging.error("UploadLabInfo error: %s", response)
240                    succ = False
241            except requests.exceptions.Timeout as e:
242                logging.exception(e)
243                succ = False
244        return succ
245
246    def LeaseJob(self, hostname, execute=True):
247        """Leases a job for the given host, 'hostname'.
248
249        Args:
250            hostname: string, the hostname of a target host.
251            execute: boolean, True to lease and execute StartHeartbeat, which is
252                     the case that the leased job will be executed on this
253                     process's context.
254
255        Returns:
256            True if successful, False otherwise.
257        """
258        if not hostname:
259            return None, {}
260
261        url = self._url + "job/v1/lease"
262        try:
263            response = requests.post(url, data=json.dumps({"hostname": hostname}),
264                                     headers=self._headers,
265                                     timeout=REQUESTS_TIMEOUT_SECONDS)
266        except requests.exceptions.Timeout as e:
267            logging.exception(e)
268            return None, {}
269
270        if response.status_code != requests.codes.ok:
271            logging.error("LeaseJob error: %s", response.status_code)
272            return None, {}
273
274        response_json = json.loads(response.text)
275        if ("return_code" in response_json
276                and response_json["return_code"] != "SUCCESS"):
277            logging.debug("LeaseJob error: %s", response_json)
278            return None, {}
279
280        if "jobs" not in response_json:
281            logging.error(
282                "LeaseJob jobs not found in response json %s", response.text)
283            return None, {}
284
285        jobs = response_json["jobs"]
286        if jobs and len(jobs) > 0:
287            for job in jobs:
288                if execute == True:
289                    self._job = job
290                    self.StartHeartbeat("leased", 60)
291                return job["test_name"].split("/")[0], job
292        return None, {}
293
294    def ExecuteJob(self, job):
295        """Executes leased job passed from parent process.
296
297        Args:
298            job: dict, information the on leased job.
299
300        Returns:
301            a string which is path to a script file for onecmd().
302            a dict contains info on the leased job, will be passed to onecmd().
303        """
304        logging.info("Job info : {}".format(json.dumps(job)))
305        if job is not None:
306            self._job = job
307            self.StartHeartbeat("leased", 60)
308            return job["test_name"].split("/")[0], job
309
310        return None, {}
311
312    def UpdateLeasedJobStatus(self, status, update_interval):
313        """Updates the status of the leased job.
314
315        Args:
316            status: string, status value.
317            update_interval: int, time between heartbeats in second.
318        """
319        if self._job is None:
320            return
321
322        url = self._url + "job/v1/heartbeat"
323        self._job["status"] = JOB_STATUS_DICT[status]
324
325        thread = threading.currentThread()
326        while getattr(thread, 'keep_running', True):
327            try:
328                response = requests.post(url, data=json.dumps(self._job),
329                                         headers=self._headers,
330                                         timeout=REQUESTS_TIMEOUT_SECONDS)
331                if response.status_code != requests.codes.ok:
332                    logging.error("UpdateLeasedJobStatus error: %s", response)
333            except requests.exceptions.Timeout as e:
334                logging.exception(e)
335            time.sleep(update_interval)
336
337    def StartHeartbeat(self, status="leased", update_interval=60):
338        """Starts the hearbeat_thread.
339
340        Args:
341            status: string, status value.
342            update_interval: int, time between heartbeats in second.
343        """
344        if (self._heartbeat_thread is None
345                or hasattr(self._heartbeat_thread, 'keep_running')):
346            self._heartbeat_thread = threading.Thread(
347                target=self.UpdateLeasedJobStatus,
348                args=(
349                    status,
350                    update_interval,
351                ))
352            self._heartbeat_thread.daemon = True
353            self._heartbeat_thread.start()
354
355    def StopHeartbeat(self, status="complete", infra_log_url=""):
356        """Stops the hearbeat_thread and sets current job's status.
357
358        Args:
359            status: string, status value.
360            infra_log_url: string, URL to the uploaded infra log.
361        """
362        self._heartbeat_thread.keep_running = False
363
364        if self._job is None:
365            return
366
367        url = self._url + "job/v1/heartbeat"
368        self.SetJobStatusFromLeasedTo(status)
369        self._job["infra_log_url"] = infra_log_url
370
371        try:
372            response = requests.post(url, data=json.dumps(self._job),
373                                     headers=self._headers,
374                                     timeout=REQUESTS_TIMEOUT_SECONDS)
375            if response.status_code != requests.codes.ok:
376                logging.error("StopHeartbeat error: %s", response)
377        except requests.exceptions.Timeout as e:
378            logging.exception(e)
379
380        self._job = None
381
382    def SetJobStatusFromLeasedTo(self, status):
383        """Sets current job's status only when the job's status is 'leased'.
384
385        Args:
386            status: string, status value.
387        """
388        if (self._job is not None and
389            self._job["status"] == JOB_STATUS_DICT["leased"]):
390            self._job["status"] = JOB_STATUS_DICT[status]
391
392    def UploadHostVersion(self, hostname, vtslab_version):
393        """Uploads vtslab version.
394
395        Args:
396            hostname: string, the name of the host.
397            vtslab_version: string, current version of vtslab package.
398        """
399        url = self._url + "lab/v1/set_version"
400        host = {}
401        host["hostname"] = hostname
402        host["vtslab_version"] = vtslab_version
403
404        try:
405            response = requests.post(url, data=json.dumps(host),
406                                     headers=self._headers,
407                                     timeout=REQUESTS_TIMEOUT_SECONDS)
408        except (requests.exceptions.ConnectionError,
409                requests.exceptions.Timeout) as e:
410            logging.exception(e)
411            return
412        if response.status_code != requests.codes.ok:
413            logging.error("UploadHostVersion error: %s", response)
414
415    def CheckBootUpStatus(self):
416        """Checks whether the device_img + gsi from the job fails to boot up.
417
418        Returns:
419            True if the devices flashed with the given imgs from the leased job
420            succeed to boot up. False otherwise.
421        """
422        if self._job:
423            return (self._job["status"] != JOB_STATUS_DICT["bootup-err"])
424        return False
425
426    def GetJobTestType(self):
427        """Returns the test type of the leased job.
428
429        Returns:
430            int, test_type attr in the job message. 0 when there is no job
431            leased to this vti_endpoint_client.
432        """
433        if self._job and "test_type" in self._job:
434            try:
435                return int(self._job["test_type"])
436            except ValueError as e:
437                logging.exception(e)
438        return 0
439
440    def GetJobDeviceProductName(self):
441        """Returns the product name of the DUTs of the leased job.
442
443        Returns:
444            string, product name. An empty string if there is no job leased or
445            "device" attr of the job obj is not well formatted.
446        """
447        if self._job and "device" in self._job:
448            try:
449                return self._job["device"].split("/")[1]
450            except IndexError as e:
451                logging.exception(e)
452        return ""
453