1#!/usr/bin/env python3 2# Copyright (C) 2019 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16import argparse 17import httplib2 18import logging 19import mimetypes 20import mmap 21import os 22import subprocess 23import signal 24import sys 25import threading 26import time 27 28from common_utils import init_logging 29from config import GCS_ARTIFACTS 30from multiprocessing.pool import ThreadPool 31from oauth2client.client import GoogleCredentials 32 33CUR_DIR = os.path.dirname(__file__) 34RESCAN_PERIOD_SEC = 5 # Scan for new artifact directories every X seconds. 35WATCHDOG_SEC = 60 * 6 # Self kill after 5 minutes 36 37tls = threading.local() 38'''Polls for new directories under ARTIFACTS_DIR and uploads them to GCS''' 39 40 41def get_http_obj(): 42 http = getattr(tls, 'http', None) 43 if http is not None: 44 return http 45 tls.http = httplib2.Http() 46 scopes = ['https://www.googleapis.com/auth/cloud-platform'] 47 creds = GoogleCredentials.get_application_default().create_scoped(scopes) 48 creds.authorize(tls.http) 49 return tls.http 50 51 52def upload_one_file(fpath): 53 http = get_http_obj() 54 relpath = os.path.relpath(fpath, os.getenv('ARTIFACTS_DIR')) 55 logging.debug('Uploading %s', relpath) 56 assert (os.path.exists(fpath)) 57 fsize = os.path.getsize(fpath) 58 mime_type = mimetypes.guess_type(fpath)[0] or 'application/octet-stream' 59 mm = '' 60 hdr = {'Content-Length': fsize, 'Content-type': mime_type} 61 if fsize > 0: 62 with open(fpath, 'rb') as f: 63 mm = mmap.mmap(f.fileno(), fsize, access=mmap.ACCESS_READ) 64 uri = 'https://%s.storage.googleapis.com/%s' % (GCS_ARTIFACTS, relpath) 65 resp, res = http.request(uri, method='PUT', headers=hdr, body=mm) 66 if fsize > 0: 67 mm.close() 68 if resp.status != 200: 69 logging.error('HTTP request failed with code %d : %s', resp.status, res) 70 return -1 71 return fsize 72 73 74def upload_one_file_with_retries(fpath): 75 for retry in [0.5, 1.5, 3]: 76 res = upload_one_file(fpath) 77 if res >= 0: 78 return res 79 logging.warning('Upload of %s failed, retrying in %s seconds', fpath, retry) 80 time.sleep(retry) 81 82 83def list_files(path): 84 for root, _, files in os.walk(path): 85 for fname in files: 86 fpath = os.path.join(root, fname) 87 if os.path.isfile(fpath): 88 yield fpath 89 90 91def scan_and_upload_perf_folder(job_id, dirpath): 92 perf_folder = os.path.join(dirpath, 'perf') 93 if not os.path.isdir(perf_folder): 94 return 95 uploader = os.path.join(CUR_DIR, 'perf_metrics_uploader.py') 96 for path in list_files(perf_folder): 97 subprocess.call([uploader, '--job-id', job_id, path]) 98 99 100def main(): 101 init_logging() 102 signal.alarm(WATCHDOG_SEC) 103 mimetypes.add_type('application/wasm', '.wasm') 104 105 parser = argparse.ArgumentParser() 106 parser.add_argument('--rm', action='store_true', help='Removes the directory') 107 parser.add_argument( 108 '--job-id', 109 type=str, 110 required=True, 111 help='The Perfetto CI job ID to tie this upload to') 112 args = parser.parse_args() 113 job_id = args.job_id 114 dirpath = os.path.join(os.getenv('ARTIFACTS_DIR', default=os.curdir), job_id) 115 if not os.path.isdir(dirpath): 116 logging.error('Directory not found: %s', dirpath) 117 return 1 118 119 # Make all artifacts readable by our user. Some of them are extracted as 120 # rw-rw--- and owned by a diffrent user (whatever the "sandbox" docker 121 # container uid ends up mapping to). 122 subprocess.call(['sudo', 'chown', '-R', str(os.geteuid()), dirpath]) 123 124 total_size = 0 125 uploads = 0 126 failures = 0 127 files = list_files(dirpath) 128 pool = ThreadPool(processes=10) 129 for upl_size in pool.imap_unordered(upload_one_file_with_retries, files): 130 uploads += 1 if upl_size >= 0 else 0 131 failures += 1 if upl_size < 0 else 0 132 total_size += max(upl_size, 0) 133 134 logging.info('Uploaded artifacts for %s: %d files, %s failures, %d KB', 135 job_id, uploads, failures, total_size / 1e3) 136 137 scan_and_upload_perf_folder(job_id, dirpath) 138 139 if args.rm: 140 subprocess.call(['sudo', 'rm', '-rf', dirpath]) 141 142 return 0 143 144 145if __name__ == '__main__': 146 sys.exit(main()) 147