1#!/usr/bin/env python3
2# Copyright (C) 2019 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import argparse
17import httplib2
18import logging
19import mimetypes
20import mmap
21import os
22import subprocess
23import signal
24import sys
25import threading
26import time
27
28from common_utils import init_logging
29from config import GCS_ARTIFACTS
30from multiprocessing.pool import ThreadPool
31from oauth2client.client import GoogleCredentials
32
33CUR_DIR = os.path.dirname(__file__)
34RESCAN_PERIOD_SEC = 5  # Scan for new artifact directories every X seconds.
35WATCHDOG_SEC = 60 * 6  # Self kill after 5 minutes
36
37tls = threading.local()
38'''Polls for new directories under ARTIFACTS_DIR and uploads them to GCS'''
39
40
41def get_http_obj():
42  http = getattr(tls, 'http', None)
43  if http is not None:
44    return http
45  tls.http = httplib2.Http()
46  scopes = ['https://www.googleapis.com/auth/cloud-platform']
47  creds = GoogleCredentials.get_application_default().create_scoped(scopes)
48  creds.authorize(tls.http)
49  return tls.http
50
51
52def upload_one_file(fpath):
53  http = get_http_obj()
54  relpath = os.path.relpath(fpath, os.getenv('ARTIFACTS_DIR'))
55  logging.debug('Uploading %s', relpath)
56  assert (os.path.exists(fpath))
57  fsize = os.path.getsize(fpath)
58  mime_type = mimetypes.guess_type(fpath)[0] or 'application/octet-stream'
59  mm = ''
60  hdr = {'Content-Length': fsize, 'Content-type': mime_type}
61  if fsize > 0:
62    with open(fpath, 'rb') as f:
63      mm = mmap.mmap(f.fileno(), fsize, access=mmap.ACCESS_READ)
64  uri = 'https://%s.storage.googleapis.com/%s' % (GCS_ARTIFACTS, relpath)
65  resp, res = http.request(uri, method='PUT', headers=hdr, body=mm)
66  if fsize > 0:
67    mm.close()
68  if resp.status != 200:
69    logging.error('HTTP request failed with code %d : %s', resp.status, res)
70    return -1
71  return fsize
72
73
74def upload_one_file_with_retries(fpath):
75  for retry in [0.5, 1.5, 3]:
76    res = upload_one_file(fpath)
77    if res >= 0:
78      return res
79    logging.warning('Upload of %s failed, retrying in %s seconds', fpath, retry)
80    time.sleep(retry)
81
82
83def list_files(path):
84  for root, _, files in os.walk(path):
85    for fname in files:
86      fpath = os.path.join(root, fname)
87      if os.path.isfile(fpath):
88        yield fpath
89
90
91def scan_and_upload_perf_folder(job_id, dirpath):
92  perf_folder = os.path.join(dirpath, 'perf')
93  if not os.path.isdir(perf_folder):
94    return
95  uploader = os.path.join(CUR_DIR, 'perf_metrics_uploader.py')
96  for path in list_files(perf_folder):
97    subprocess.call([uploader, '--job-id', job_id, path])
98
99
100def main():
101  init_logging()
102  signal.alarm(WATCHDOG_SEC)
103  mimetypes.add_type('application/wasm', '.wasm')
104
105  parser = argparse.ArgumentParser()
106  parser.add_argument('--rm', action='store_true', help='Removes the directory')
107  parser.add_argument(
108      '--job-id',
109      type=str,
110      required=True,
111      help='The Perfetto CI job ID to tie this upload to')
112  args = parser.parse_args()
113  job_id = args.job_id
114  dirpath = os.path.join(os.getenv('ARTIFACTS_DIR', default=os.curdir), job_id)
115  if not os.path.isdir(dirpath):
116    logging.error('Directory not found: %s', dirpath)
117    return 1
118
119  # Make all artifacts readable by our user. Some of them are extracted as
120  # rw-rw--- and owned by a diffrent user (whatever the "sandbox" docker
121  # container uid ends up mapping to).
122  subprocess.call(['sudo', 'chown', '-R', str(os.geteuid()), dirpath])
123
124  total_size = 0
125  uploads = 0
126  failures = 0
127  files = list_files(dirpath)
128  pool = ThreadPool(processes=10)
129  for upl_size in pool.imap_unordered(upload_one_file_with_retries, files):
130    uploads += 1 if upl_size >= 0 else 0
131    failures += 1 if upl_size < 0 else 0
132    total_size += max(upl_size, 0)
133
134  logging.info('Uploaded artifacts for %s: %d files, %s failures, %d KB',
135               job_id, uploads, failures, total_size / 1e3)
136
137  scan_and_upload_perf_folder(job_id, dirpath)
138
139  if args.rm:
140    subprocess.call(['sudo', 'rm', '-rf', dirpath])
141
142  return 0
143
144
145if __name__ == '__main__':
146  sys.exit(main())
147