1#!/usr/bin/python2 2# 3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7"""Script to archive old Autotest results to Google Storage. 8 9Uses gsutil to archive files to the configured Google Storage bucket. 10Upon successful copy, the local results directory is deleted. 11""" 12 13import abc 14try: 15 import cachetools 16except ImportError: 17 cachetools = None 18import datetime 19import errno 20import glob 21import gzip 22import logging 23import logging.handlers 24import os 25import re 26import shutil 27import stat 28import subprocess 29import sys 30import tarfile 31import tempfile 32import time 33import urllib 34 35from optparse import OptionParser 36 37import common 38from autotest_lib.client.common_lib import file_utils 39from autotest_lib.client.common_lib import global_config 40from autotest_lib.client.common_lib import utils 41from autotest_lib.site_utils import job_directories 42# For unittest, the cloud_console.proto is not compiled yet. 43try: 44 from autotest_lib.site_utils import cloud_console_client 45except ImportError: 46 cloud_console_client = None 47from autotest_lib.tko import models 48from autotest_lib.utils import labellib 49from autotest_lib.utils import gslib 50from autotest_lib.utils.side_effects import config_loader 51from chromite.lib import timeout_util 52 53# Autotest requires the psutil module from site-packages, so it must be imported 54# after "import common". 55try: 56 # Does not exist, nor is needed, on moblab. 57 import psutil 58except ImportError: 59 psutil = None 60 61from chromite.lib import parallel 62try: 63 from chromite.lib import metrics 64 from chromite.lib import ts_mon_config 65except ImportError: 66 metrics = utils.metrics_mock 67 ts_mon_config = utils.metrics_mock 68 69 70GS_OFFLOADING_ENABLED = global_config.global_config.get_config_value( 71 'CROS', 'gs_offloading_enabled', type=bool, default=True) 72 73# Nice setting for process, the higher the number the lower the priority. 74NICENESS = 10 75 76# Maximum number of seconds to allow for offloading a single 77# directory. 78OFFLOAD_TIMEOUT_SECS = 60 * 60 79 80# Sleep time per loop. 81SLEEP_TIME_SECS = 5 82 83# Minimum number of seconds between e-mail reports. 84REPORT_INTERVAL_SECS = 60 * 60 85 86# Location of Autotest results on disk. 87RESULTS_DIR = '/usr/local/autotest/results' 88FAILED_OFFLOADS_FILE = os.path.join(RESULTS_DIR, 'FAILED_OFFLOADS') 89 90FAILED_OFFLOADS_FILE_HEADER = ''' 91This is the list of gs_offloader failed jobs. 92Last offloader attempt at %s failed to offload %d files. 93Check http://go/cros-triage-gsoffloader to triage the issue 94 95 96First failure Count Directory name 97=================== ====== ============================== 98''' 99# --+----1----+---- ----+ ----+----1----+----2----+----3 100 101FAILED_OFFLOADS_LINE_FORMAT = '%19s %5d %-1s\n' 102FAILED_OFFLOADS_TIME_FORMAT = '%Y-%m-%d %H:%M:%S' 103 104USE_RSYNC_ENABLED = global_config.global_config.get_config_value( 105 'CROS', 'gs_offloader_use_rsync', type=bool, default=False) 106 107LIMIT_FILE_COUNT = global_config.global_config.get_config_value( 108 'CROS', 'gs_offloader_limit_file_count', type=bool, default=False) 109 110# Use multiprocessing for gsutil uploading. 111GS_OFFLOADER_MULTIPROCESSING = global_config.global_config.get_config_value( 112 'CROS', 'gs_offloader_multiprocessing', type=bool, default=False) 113 114D = '[0-9][0-9]' 115TIMESTAMP_PATTERN = '%s%s.%s.%s_%s.%s.%s' % (D, D, D, D, D, D, D) 116CTS_RESULT_PATTERN = 'testResult.xml' 117CTS_COMPRESSED_RESULT_PATTERN = 'testResult.xml.tgz' 118CTS_V2_RESULT_PATTERN = 'test_result.xml' 119CTS_V2_COMPRESSED_RESULT_PATTERN = 'test_result.xml.tgz' 120 121CTS_COMPRESSED_RESULT_TYPES = { 122 CTS_COMPRESSED_RESULT_PATTERN: CTS_RESULT_PATTERN, 123 CTS_V2_COMPRESSED_RESULT_PATTERN: CTS_V2_RESULT_PATTERN} 124 125# Google Storage bucket URI to store results in. 126DEFAULT_CTS_RESULTS_GSURI = global_config.global_config.get_config_value( 127 'CROS', 'cts_results_server', default='') 128DEFAULT_CTS_APFE_GSURI = global_config.global_config.get_config_value( 129 'CROS', 'cts_apfe_server', default='') 130DEFAULT_CTS_BVT_APFE_GSURI = global_config.global_config.get_config_value( 131 'CROS', 'ctsbvt_apfe_server', default='') 132 133# metadata type 134GS_OFFLOADER_SUCCESS_TYPE = 'gs_offloader_success' 135GS_OFFLOADER_FAILURE_TYPE = 'gs_offloader_failure' 136 137# Autotest test to collect list of CTS tests 138TEST_LIST_COLLECTOR = 'tradefed-run-collect-tests-only' 139 140def _get_metrics_fields(dir_entry): 141 """Get metrics fields for the given test result directory, including board 142 and milestone. 143 144 @param dir_entry: Directory entry to offload. 145 @return A dictionary for the metrics data to be uploaded. 146 """ 147 fields = {'board': 'unknown', 148 'milestone': 'unknown'} 149 if dir_entry: 150 # There could be multiple hosts in the job directory, use the first one 151 # available. 152 for host in glob.glob(os.path.join(dir_entry, '*')): 153 try: 154 keyval = models.test.parse_job_keyval(host) 155 except ValueError: 156 continue 157 build = keyval.get('build') 158 if build: 159 try: 160 cros_version = labellib.parse_cros_version(build) 161 fields['board'] = cros_version.board 162 fields['milestone'] = cros_version.milestone 163 break 164 except ValueError: 165 # Ignore version parsing error so it won't crash 166 # gs_offloader. 167 pass 168 169 return fields 170 171 172def _get_cmd_list(multiprocessing, dir_entry, gs_path): 173 """Return the command to offload a specified directory. 174 175 @param multiprocessing: True to turn on -m option for gsutil. 176 @param dir_entry: Directory entry/path that which we need a cmd_list 177 to offload. 178 @param gs_path: Location in google storage where we will 179 offload the directory. 180 181 @return A command list to be executed by Popen. 182 """ 183 cmd = ['gsutil'] 184 if multiprocessing: 185 cmd.append('-m') 186 if USE_RSYNC_ENABLED: 187 cmd.append('rsync') 188 target = os.path.join(gs_path, os.path.basename(dir_entry)) 189 else: 190 cmd.append('cp') 191 target = gs_path 192 cmd += ['-eR', dir_entry, target] 193 return cmd 194 195 196def _get_finish_cmd_list(gs_path): 197 """Returns a command to remotely mark a given gs path as finished. 198 199 @param gs_path: Location in google storage where the offload directory 200 should be marked as finished. 201 202 @return A command list to be executed by Popen. 203 """ 204 target = os.path.join(gs_path, '.finished_offload') 205 return [ 206 'gsutil', 207 'cp', 208 '/dev/null', 209 target, 210 ] 211 212 213def sanitize_dir(dirpath): 214 """Sanitize directory for gs upload. 215 216 Symlinks and FIFOS are converted to regular files to fix bugs. 217 218 @param dirpath: Directory entry to be sanitized. 219 """ 220 if not os.path.exists(dirpath): 221 return 222 _escape_rename(dirpath) 223 _escape_rename_dir_contents(dirpath) 224 _sanitize_fifos(dirpath) 225 _sanitize_symlinks(dirpath) 226 227 228def _escape_rename_dir_contents(dirpath): 229 """Recursively rename directory to escape filenames for gs upload. 230 231 @param dirpath: Directory path string. 232 """ 233 for filename in os.listdir(dirpath): 234 path = os.path.join(dirpath, filename) 235 _escape_rename(path) 236 for filename in os.listdir(dirpath): 237 path = os.path.join(dirpath, filename) 238 if os.path.isdir(path): 239 _escape_rename_dir_contents(path) 240 241 242def _escape_rename(path): 243 """Rename file to escape filenames for gs upload. 244 245 @param path: File path string. 246 """ 247 dirpath, filename = os.path.split(path) 248 sanitized_filename = gslib.escape(filename) 249 sanitized_path = os.path.join(dirpath, sanitized_filename) 250 os.rename(path, sanitized_path) 251 252 253def _sanitize_fifos(dirpath): 254 """Convert fifos to regular files (fixes crbug.com/684122). 255 256 @param dirpath: Directory path string. 257 """ 258 for root, _, files in os.walk(dirpath): 259 for filename in files: 260 path = os.path.join(root, filename) 261 file_stat = os.lstat(path) 262 if stat.S_ISFIFO(file_stat.st_mode): 263 _replace_fifo_with_file(path) 264 265 266def _replace_fifo_with_file(path): 267 """Replace a fifo with a normal file. 268 269 @param path: Fifo path string. 270 """ 271 logging.debug('Removing fifo %s', path) 272 os.remove(path) 273 logging.debug('Creating fifo marker %s', path) 274 with open(path, 'w') as f: 275 f.write('<FIFO>') 276 277 278def _sanitize_symlinks(dirpath): 279 """Convert Symlinks to regular files (fixes crbug.com/692788). 280 281 @param dirpath: Directory path string. 282 """ 283 for root, _, files in os.walk(dirpath): 284 for filename in files: 285 path = os.path.join(root, filename) 286 file_stat = os.lstat(path) 287 if stat.S_ISLNK(file_stat.st_mode): 288 _replace_symlink_with_file(path) 289 290 291def _replace_symlink_with_file(path): 292 """Replace a symlink with a normal file. 293 294 @param path: Symlink path string. 295 """ 296 target = os.readlink(path) 297 logging.debug('Removing symlink %s', path) 298 os.remove(path) 299 logging.debug('Creating symlink marker %s', path) 300 with open(path, 'w') as f: 301 f.write('<symlink to %s>' % target) 302 303 304# Maximum number of files in the folder. 305_MAX_FILE_COUNT = 3000 306_FOLDERS_NEVER_ZIP = ['debug', 'ssp_logs', 'autoupdate_logs'] 307 308 309def _get_zippable_folders(dir_entry): 310 folders_list = [] 311 for folder in os.listdir(dir_entry): 312 folder_path = os.path.join(dir_entry, folder) 313 if (not os.path.isfile(folder_path) and 314 not folder in _FOLDERS_NEVER_ZIP): 315 folders_list.append(folder_path) 316 return folders_list 317 318 319def limit_file_count(dir_entry): 320 """Limit the number of files in given directory. 321 322 The method checks the total number of files in the given directory. 323 If the number is greater than _MAX_FILE_COUNT, the method will 324 compress each folder in the given directory, except folders in 325 _FOLDERS_NEVER_ZIP. 326 327 @param dir_entry: Directory entry to be checked. 328 """ 329 try: 330 count = _count_files(dir_entry) 331 except ValueError: 332 logging.warning('Fail to get the file count in folder %s.', dir_entry) 333 return 334 if count < _MAX_FILE_COUNT: 335 return 336 337 # For test job, zip folders in a second level, e.g. 123-debug/host1. 338 # This is to allow autoserv debug folder still be accessible. 339 # For special task, it does not need to dig one level deeper. 340 is_special_task = re.match(job_directories.SPECIAL_TASK_PATTERN, 341 dir_entry) 342 343 folders = _get_zippable_folders(dir_entry) 344 if not is_special_task: 345 subfolders = [] 346 for folder in folders: 347 subfolders.extend(_get_zippable_folders(folder)) 348 folders = subfolders 349 350 for folder in folders: 351 _make_into_tarball(folder) 352 353 354def _count_files(dirpath): 355 """Count the number of files in a directory recursively. 356 357 @param dirpath: Directory path string. 358 """ 359 return sum(len(files) for _path, _dirs, files in os.walk(dirpath)) 360 361 362def _make_into_tarball(dirpath): 363 """Make directory into tarball. 364 365 @param dirpath: Directory path string. 366 """ 367 tarpath = '%s.tgz' % dirpath 368 with tarfile.open(tarpath, 'w:gz') as tar: 369 tar.add(dirpath, arcname=os.path.basename(dirpath)) 370 shutil.rmtree(dirpath) 371 372 373def correct_results_folder_permission(dir_entry): 374 """Make sure the results folder has the right permission settings. 375 376 For tests running with server-side packaging, the results folder has 377 the owner of root. This must be changed to the user running the 378 autoserv process, so parsing job can access the results folder. 379 380 @param dir_entry: Path to the results folder. 381 """ 382 if not dir_entry: 383 return 384 385 logging.info('Trying to correct file permission of %s.', dir_entry) 386 try: 387 owner = '%s:%s' % (os.getuid(), os.getgid()) 388 subprocess.check_call( 389 ['sudo', '-n', 'chown', '-R', owner, dir_entry]) 390 subprocess.check_call(['chmod', '-R', 'u+rw', dir_entry]) 391 subprocess.check_call( 392 ['find', dir_entry, '-type', 'd', 393 '-exec', 'chmod', 'u+x', '{}', ';']) 394 except subprocess.CalledProcessError as e: 395 logging.error('Failed to modify permission for %s: %s', 396 dir_entry, e) 397 398 399def _upload_cts_testresult(dir_entry, multiprocessing): 400 """Upload test results to separate gs buckets. 401 402 Upload testResult.xml.gz/test_result.xml.gz file to cts_results_bucket. 403 Upload timestamp.zip to cts_apfe_bucket. 404 405 @param dir_entry: Path to the results folder. 406 @param multiprocessing: True to turn on -m option for gsutil. 407 """ 408 for host in glob.glob(os.path.join(dir_entry, '*')): 409 cts_path = os.path.join(host, 'cheets_CTS.*', 'results', '*', 410 TIMESTAMP_PATTERN) 411 cts_v2_path = os.path.join(host, 'cheets_CTS_*', 'results', '*', 412 TIMESTAMP_PATTERN) 413 gts_v2_path = os.path.join(host, 'cheets_GTS*', 'results', '*', 414 TIMESTAMP_PATTERN) 415 for result_path, result_pattern in [(cts_path, CTS_RESULT_PATTERN), 416 (cts_path, CTS_COMPRESSED_RESULT_PATTERN), 417 (cts_v2_path, CTS_V2_RESULT_PATTERN), 418 (cts_v2_path, CTS_V2_COMPRESSED_RESULT_PATTERN), 419 (gts_v2_path, CTS_V2_RESULT_PATTERN)]: 420 for path in glob.glob(result_path): 421 try: 422 # Treat BVT and non-BVT CTS test results same, offload them 423 # to APFE and result buckets. More details in b/172869794. 424 # We will make this more structured when moving to 425 # synchronous offloading. 426 _upload_files(host, path, result_pattern, 427 multiprocessing, 428 DEFAULT_CTS_RESULTS_GSURI, 429 DEFAULT_CTS_APFE_GSURI) 430 except Exception as e: 431 logging.error('ERROR uploading test results %s to GS: %s', 432 path, e) 433 434 435def _is_valid_result(build, result_pattern, suite): 436 """Check if the result should be uploaded to CTS/GTS buckets. 437 438 @param build: Builder name. 439 @param result_pattern: XML result file pattern. 440 @param suite: Test suite name. 441 442 @returns: Bool flag indicating whether a valid result. 443 """ 444 if build is None or suite is None: 445 return False 446 447 # Not valid if it's not a release build. 448 if not re.match(r'(?!trybot-).*-release/.*', build): 449 return False 450 451 # Not valid if it's cts result but not 'arc-cts*' or 'test_that_wrapper' 452 # suite. 453 result_patterns = [CTS_RESULT_PATTERN, CTS_V2_RESULT_PATTERN] 454 if result_pattern in result_patterns and not ( 455 suite.startswith('arc-cts') or 456 suite.startswith('arc-gts') or 457 suite.startswith('bvt-arc') or 458 suite.startswith('bvt-perbuild') or 459 suite.startswith('cros_test_platform') or 460 suite.startswith('test_that_wrapper')): 461 return False 462 463 return True 464 465 466def _is_test_collector(package): 467 """Returns true if the test run is just to collect list of CTS tests. 468 469 @param package: Autotest package name. e.g. cheets_CTS_N.CtsGraphicsTestCase 470 471 @return Bool flag indicating a test package is CTS list generator or not. 472 """ 473 return TEST_LIST_COLLECTOR in package 474 475 476def _get_swarming_req_dir(path): 477 """ 478 Returns the parent directory of |path|, if |path| is a swarming task result. 479 480 @param path: Full path to the result of a task. 481 e.g. /tmp/results/swarming-44466815c4bc951/1 482 483 @return string of the parent dir or None if not a swarming task. 484 """ 485 m_parent = re.match( 486 '(?P<parent_dir>.*/swarming-[0-9a-fA-F]*0)/[1-9a-fA-F]$', path) 487 if m_parent: 488 return m_parent.group('parent_dir') 489 return None 490 491 492def _parse_cts_job_results_file_path(path): 493 """Parse CTS file paths an extract required information from them.""" 494 495 # Autotest paths look like: 496 # /317739475-chromeos-test/chromeos4-row9-rack11-host22/ 497 # cheets_CTS.android.dpi/results/cts-results/2016.04.28_01.41.44 498 499 # Swarming paths look like: 500 # /swarming-458e3a3a7fc6f210/1/autoserv_test/ 501 # cheets_CTS.android.dpi/results/cts-results/2016.04.28_01.41.44 502 503 folders = path.split(os.sep) 504 if 'swarming' in folders[1]: 505 # Swarming job and attempt combined 506 job_id = "%s-%s" % (folders[-7], folders[-6]) 507 else: 508 job_id = folders[-6] 509 510 cts_package = folders[-4] 511 timestamp = folders[-1] 512 513 return job_id, cts_package, timestamp 514 515 516def _upload_files(host, path, result_pattern, multiprocessing, 517 result_gs_bucket, apfe_gs_bucket): 518 keyval = models.test.parse_job_keyval(host) 519 build = keyval.get('build') 520 suite = keyval.get('suite') 521 522 host_keyval = models.test.parse_host_keyval(host, keyval.get('hostname')) 523 labels = urllib.unquote(host_keyval.get('labels')) 524 try: 525 host_model_name = re.search(r'model:(\w+)', labels).group(1) 526 except AttributeError: 527 logging.error('Model: name attribute is missing in %s/host_keyval/%s.', 528 host, keyval.get('hostname')) 529 return 530 531 if not _is_valid_result(build, result_pattern, suite): 532 # No need to upload current folder, return. 533 return 534 535 parent_job_id = str(keyval['parent_job_id']) 536 537 job_id, package, timestamp = _parse_cts_job_results_file_path(path) 538 539 # Results produced by CTS test list collector are dummy results. 540 # They don't need to be copied to APFE bucket which is mainly being used for 541 # CTS APFE submission. 542 if not _is_test_collector(package): 543 # Path: bucket/build/parent_job_id/cheets_CTS.*/job_id_timestamp/ 544 # or bucket/build/parent_job_id/cheets_GTS.*/job_id_timestamp/ 545 index = build.find('-release') 546 build_with_model_name = '' 547 if index == -1: 548 logging.info('Not a release build.' 549 'Non release build results can be skipped from offloading') 550 return 551 552 # CTS v2 pipeline requires device info in 'board.model' format. 553 # e.g. coral.robo360-release, eve.eve-release 554 build_with_model_name = (build[:index] + '.' + host_model_name + 555 build[index:]) 556 557 cts_apfe_gs_path = os.path.join( 558 apfe_gs_bucket, build_with_model_name, parent_job_id, 559 package, job_id + '_' + timestamp) + '/' 560 561 for zip_file in glob.glob(os.path.join('%s.zip' % path)): 562 utils.run(' '.join(_get_cmd_list( 563 multiprocessing, zip_file, cts_apfe_gs_path))) 564 logging.debug('Upload %s to %s ', zip_file, cts_apfe_gs_path) 565 else: 566 logging.debug('%s is a CTS Test collector Autotest test run.', package) 567 logging.debug('Skipping CTS results upload to APFE gs:// bucket.') 568 569 if result_gs_bucket: 570 # Path: bucket/cheets_CTS.*/job_id_timestamp/ 571 # or bucket/cheets_GTS.*/job_id_timestamp/ 572 test_result_gs_path = os.path.join( 573 result_gs_bucket, package, job_id + '_' + timestamp) + '/' 574 575 for test_result_file in glob.glob(os.path.join(path, result_pattern)): 576 # gzip test_result_file(testResult.xml/test_result.xml) 577 578 test_result_tgz_file = '' 579 if test_result_file.endswith('tgz'): 580 # Extract .xml file from tgz file for better handling in the 581 # CTS dashboard pipeline. 582 # TODO(rohitbm): work with infra team to produce .gz file so 583 # tgz to gz middle conversion is not needed. 584 try: 585 with tarfile.open(test_result_file, 'r:gz') as tar_file: 586 tar_file.extract( 587 CTS_COMPRESSED_RESULT_TYPES[result_pattern]) 588 test_result_tgz_file = test_result_file 589 test_result_file = os.path.join(path, 590 CTS_COMPRESSED_RESULT_TYPES[result_pattern]) 591 except tarfile.ReadError as error: 592 logging.debug(error) 593 except KeyError as error: 594 logging.debug(error) 595 596 test_result_file_gz = '%s.gz' % test_result_file 597 with open(test_result_file, 'r') as f_in, ( 598 gzip.open(test_result_file_gz, 'w')) as f_out: 599 shutil.copyfileobj(f_in, f_out) 600 utils.run(' '.join(_get_cmd_list( 601 multiprocessing, test_result_file_gz, test_result_gs_path))) 602 logging.debug('Zip and upload %s to %s', 603 test_result_file_gz, test_result_gs_path) 604 # Remove test_result_file_gz(testResult.xml.gz/test_result.xml.gz) 605 os.remove(test_result_file_gz) 606 # Remove extracted test_result.xml file. 607 if test_result_tgz_file: 608 os.remove(test_result_file) 609 610 611def _emit_gs_returncode_metric(returncode): 612 """Increment the gs_returncode counter based on |returncode|.""" 613 m_gs_returncode = 'chromeos/autotest/gs_offloader/gs_returncode' 614 rcode = int(returncode) 615 if rcode < 0 or rcode > 255: 616 rcode = -1 617 metrics.Counter(m_gs_returncode).increment(fields={'return_code': rcode}) 618 619 620def _handle_dir_os_error(dir_entry, fix_permission=False): 621 """Try to fix the result directory's permission issue if needed. 622 623 @param dir_entry: Directory entry to offload. 624 @param fix_permission: True to change the directory's owner to the same one 625 running gs_offloader. 626 """ 627 if fix_permission: 628 correct_results_folder_permission(dir_entry) 629 m_permission_error = ('chromeos/autotest/errors/gs_offloader/' 630 'wrong_permissions_count') 631 metrics_fields = _get_metrics_fields(dir_entry) 632 metrics.Counter(m_permission_error).increment(fields=metrics_fields) 633 634 635class BaseGSOffloader(object): 636 637 """Google Storage offloader interface.""" 638 639 __metaclass__ = abc.ABCMeta 640 641 def offload(self, dir_entry, dest_path, job_complete_time): 642 """Safely offload a directory entry to Google Storage. 643 644 This method is responsible for copying the contents of 645 `dir_entry` to Google storage at `dest_path`. 646 647 When successful, the method must delete all of `dir_entry`. 648 On failure, `dir_entry` should be left undisturbed, in order 649 to allow for retry. 650 651 Errors are conveyed simply and solely by two methods: 652 * At the time of failure, write enough information to the log 653 to allow later debug, if necessary. 654 * Don't delete the content. 655 656 In order to guarantee robustness, this method must not raise any 657 exceptions. 658 659 @param dir_entry: Directory entry to offload. 660 @param dest_path: Location in google storage where we will 661 offload the directory. 662 @param job_complete_time: The complete time of the job from the AFE 663 database. 664 """ 665 try: 666 self._full_offload(dir_entry, dest_path, job_complete_time) 667 except Exception as e: 668 logging.debug('Exception in offload for %s', dir_entry) 669 logging.debug('Ignoring this error: %s', str(e)) 670 671 @abc.abstractmethod 672 def _full_offload(self, dir_entry, dest_path, job_complete_time): 673 """Offload a directory entry to Google Storage. 674 675 This method implements the actual offload behavior of its 676 subclass. To guarantee effective debug, this method should 677 catch all exceptions, and perform any reasonable diagnosis 678 or other handling. 679 680 @param dir_entry: Directory entry to offload. 681 @param dest_path: Location in google storage where we will 682 offload the directory. 683 @param job_complete_time: The complete time of the job from the AFE 684 database. 685 """ 686 687 688class GSOffloader(BaseGSOffloader): 689 """Google Storage Offloader.""" 690 691 def __init__(self, gs_uri, multiprocessing, delete_age, 692 console_client=None): 693 """Returns the offload directory function for the given gs_uri 694 695 @param gs_uri: Google storage bucket uri to offload to. 696 @param multiprocessing: True to turn on -m option for gsutil. 697 @param console_client: The cloud console client. If None, 698 cloud console APIs are not called. 699 """ 700 self._gs_uri = gs_uri 701 self._multiprocessing = multiprocessing 702 self._delete_age = delete_age 703 self._console_client = console_client 704 705 @metrics.SecondsTimerDecorator( 706 'chromeos/autotest/gs_offloader/job_offload_duration') 707 def _full_offload(self, dir_entry, dest_path, job_complete_time): 708 """Offload the specified directory entry to Google storage. 709 710 @param dir_entry: Directory entry to offload. 711 @param dest_path: Location in google storage where we will 712 offload the directory. 713 @param job_complete_time: The complete time of the job from the AFE 714 database. 715 """ 716 with tempfile.TemporaryFile('w+') as stdout_file, \ 717 tempfile.TemporaryFile('w+') as stderr_file: 718 try: 719 try: 720 self._try_offload(dir_entry, dest_path, stdout_file, 721 stderr_file) 722 except OSError as e: 723 # Correct file permission error of the directory, then raise 724 # the exception so gs_offloader can retry later. 725 _handle_dir_os_error(dir_entry, e.errno==errno.EACCES) 726 # Try again after the permission issue is fixed. 727 self._try_offload(dir_entry, dest_path, stdout_file, 728 stderr_file) 729 except _OffloadError as e: 730 metrics_fields = _get_metrics_fields(dir_entry) 731 m_any_error = 'chromeos/autotest/errors/gs_offloader/any_error' 732 metrics.Counter(m_any_error).increment(fields=metrics_fields) 733 734 # Rewind the log files for stdout and stderr and log 735 # their contents. 736 stdout_file.seek(0) 737 stderr_file.seek(0) 738 stderr_content = stderr_file.read() 739 logging.warning('Error occurred when offloading %s:', dir_entry) 740 logging.warning('Stdout:\n%s \nStderr:\n%s', stdout_file.read(), 741 stderr_content) 742 743 # Some result files may have wrong file permission. Try 744 # to correct such error so later try can success. 745 # TODO(dshi): The code is added to correct result files 746 # with wrong file permission caused by bug 511778. After 747 # this code is pushed to lab and run for a while to 748 # clean up these files, following code and function 749 # correct_results_folder_permission can be deleted. 750 if 'CommandException: Error opening file' in stderr_content: 751 correct_results_folder_permission(dir_entry) 752 else: 753 self._prune(dir_entry, job_complete_time) 754 swarming_req_dir = _get_swarming_req_dir(dir_entry) 755 if swarming_req_dir: 756 self._prune_swarming_req_dir(swarming_req_dir) 757 758 759 def _try_offload(self, dir_entry, dest_path, 760 stdout_file, stderr_file): 761 """Offload the specified directory entry to Google storage. 762 763 @param dir_entry: Directory entry to offload. 764 @param dest_path: Location in google storage where we will 765 offload the directory. 766 @param job_complete_time: The complete time of the job from the AFE 767 database. 768 @param stdout_file: Log file. 769 @param stderr_file: Log file. 770 """ 771 if _is_uploaded(dir_entry): 772 return 773 start_time = time.time() 774 metrics_fields = _get_metrics_fields(dir_entry) 775 error_obj = _OffloadError(start_time) 776 config = config_loader.load(dir_entry) 777 cts_enabled = True 778 if config: 779 # TODO(linxinan): use credential file assigned by the side_effect 780 # config. 781 if not config.cts.enabled: 782 cts_enabled = config.cts.enabled 783 if config.google_storage.bucket: 784 gs_prefix = ('' if config.google_storage.bucket.startswith('gs://') 785 else 'gs://') 786 self._gs_uri = gs_prefix + config.google_storage.bucket 787 else: 788 # For now, the absence of config does not block gs_offloader 789 # from uploading files via default credential. 790 logging.debug('Failed to load the side effects config in %s.', 791 dir_entry) 792 try: 793 sanitize_dir(dir_entry) 794 if DEFAULT_CTS_RESULTS_GSURI and cts_enabled: 795 _upload_cts_testresult(dir_entry, self._multiprocessing) 796 797 if LIMIT_FILE_COUNT: 798 limit_file_count(dir_entry) 799 800 process = None 801 with timeout_util.Timeout(OFFLOAD_TIMEOUT_SECS): 802 gs_path = '%s%s' % (self._gs_uri, dest_path) 803 cmd = _get_cmd_list(self._multiprocessing, dir_entry, gs_path) 804 logging.debug('Attempting an offload command %s', cmd) 805 process = subprocess.Popen( 806 cmd, stdout=stdout_file, stderr=stderr_file) 807 process.wait() 808 logging.debug('Offload command %s completed; ' 809 'marking offload complete.', cmd) 810 _mark_upload_finished(gs_path, stdout_file, stderr_file) 811 812 _emit_gs_returncode_metric(process.returncode) 813 if process.returncode != 0: 814 raise error_obj 815 _emit_offload_metrics(dir_entry) 816 817 if self._console_client: 818 gcs_uri = os.path.join(gs_path, 819 os.path.basename(dir_entry)) 820 if not self._console_client.send_test_job_offloaded_message( 821 gcs_uri): 822 raise error_obj 823 824 _mark_uploaded(dir_entry) 825 except timeout_util.TimeoutError: 826 m_timeout = 'chromeos/autotest/errors/gs_offloader/timed_out_count' 827 metrics.Counter(m_timeout).increment(fields=metrics_fields) 828 # If we finished the call to Popen(), we may need to 829 # terminate the child process. We don't bother calling 830 # process.poll(); that inherently races because the child 831 # can die any time it wants. 832 if process: 833 try: 834 process.terminate() 835 except OSError: 836 # We don't expect any error other than "No such 837 # process". 838 pass 839 logging.error('Offloading %s timed out after waiting %d ' 840 'seconds.', dir_entry, OFFLOAD_TIMEOUT_SECS) 841 raise error_obj 842 843 def _prune(self, dir_entry, job_complete_time): 844 """Prune directory if it is uploaded and expired. 845 846 @param dir_entry: Directory entry to offload. 847 @param job_complete_time: The complete time of the job from the AFE 848 database. 849 """ 850 if not (_is_uploaded(dir_entry) 851 and job_directories.is_job_expired(self._delete_age, 852 job_complete_time)): 853 return 854 try: 855 logging.debug('Pruning uploaded directory %s', dir_entry) 856 shutil.rmtree(dir_entry) 857 job_timestamp_cache.delete(dir_entry) 858 except OSError as e: 859 # The wrong file permission can lead call `shutil.rmtree(dir_entry)` 860 # to raise OSError with message 'Permission denied'. Details can be 861 # found in crbug.com/536151 862 _handle_dir_os_error(dir_entry, e.errno==errno.EACCES) 863 # Try again after the permission issue is fixed. 864 shutil.rmtree(dir_entry) 865 866 def _prune_swarming_req_dir(self, swarming_req_dir): 867 """Prune swarming request directory, if it is empty. 868 869 @param swarming_req_dir: Directory entry of a swarming request. 870 """ 871 try: 872 logging.debug('Pruning swarming request directory %s', 873 swarming_req_dir) 874 os.rmdir(swarming_req_dir) 875 except OSError as e: 876 # Do nothing and leave this directory to next attempt to remove. 877 logging.debug('Failed to prune swarming request directory %s', 878 swarming_req_dir) 879 880 881class _OffloadError(Exception): 882 """Google Storage offload failed.""" 883 884 def __init__(self, start_time): 885 super(_OffloadError, self).__init__(start_time) 886 self.start_time = start_time 887 888 889 890class FakeGSOffloader(BaseGSOffloader): 891 892 """Fake Google Storage Offloader that only deletes directories.""" 893 894 def _full_offload(self, dir_entry, dest_path, job_complete_time): 895 """Pretend to offload a directory and delete it. 896 897 @param dir_entry: Directory entry to offload. 898 @param dest_path: Location in google storage where we will 899 offload the directory. 900 @param job_complete_time: The complete time of the job from the AFE 901 database. 902 """ 903 shutil.rmtree(dir_entry) 904 905 906class OptionalMemoryCache(object): 907 """Implements memory cache if cachetools module can be loaded. 908 909 If the platform has cachetools available then the cache will 910 be created, otherwise the get calls will always act as if there 911 was a cache miss and the set/delete will be no-ops. 912 """ 913 cache = None 914 915 def setup(self, age_to_delete): 916 """Set up a TTL cache size based on how long the job will be handled. 917 918 Autotest jobs are handled by gs_offloader until they are deleted from 919 local storage, base the cache size on how long that is. 920 921 @param age_to_delete: Number of days after which items in the cache 922 should expire. 923 """ 924 if cachetools: 925 # Min cache is 1000 items for 10 mins. If the age to delete is 0 926 # days you still want a short / small cache. 927 # 2000 items is a good approximation for the max number of jobs a 928 # moblab # can produce in a day, lab offloads immediatly so 929 # the number of carried jobs should be very small in the normal 930 # case. 931 ttl = max(age_to_delete * 24 * 60 * 60, 600) 932 maxsize = max(age_to_delete * 2000, 1000) 933 job_timestamp_cache.cache = cachetools.TTLCache(maxsize=maxsize, 934 ttl=ttl) 935 936 def get(self, key): 937 """If we have a cache try to retrieve from it.""" 938 if self.cache is not None: 939 result = self.cache.get(key) 940 return result 941 return None 942 943 def add(self, key, value): 944 """If we have a cache try to store key/value.""" 945 if self.cache is not None: 946 self.cache[key] = value 947 948 def delete(self, key): 949 """If we have a cache try to remove a key.""" 950 if self.cache is not None: 951 return self.cache.delete(key) 952 953 954job_timestamp_cache = OptionalMemoryCache() 955 956 957def _cached_get_timestamp_if_finished(job): 958 """Retrieve a job finished timestamp from cache or AFE. 959 @param job _JobDirectory instance to retrieve 960 finished timestamp of.. 961 962 @returns: None if the job is not finished, or the 963 last job finished time recorded by Autotest. 964 """ 965 job_timestamp = job_timestamp_cache.get(job.dirname) 966 if not job_timestamp: 967 job_timestamp = job.get_timestamp_if_finished() 968 if job_timestamp: 969 job_timestamp_cache.add(job.dirname, job_timestamp) 970 return job_timestamp 971 972 973def _is_expired(job, age_limit): 974 """Return whether job directory is expired for uploading 975 976 @param job: _JobDirectory instance. 977 @param age_limit: Minimum age in days at which a job may be offloaded. 978 """ 979 job_timestamp = _cached_get_timestamp_if_finished(job) 980 if not job_timestamp: 981 return False 982 return job_directories.is_job_expired(age_limit, job_timestamp) 983 984 985def _emit_offload_metrics(dirpath): 986 """Emit gs offload metrics. 987 988 @param dirpath: Offloaded directory path. 989 """ 990 dir_size = file_utils.get_directory_size_kibibytes(dirpath) 991 metrics_fields = _get_metrics_fields(dirpath) 992 993 m_offload_count = ( 994 'chromeos/autotest/gs_offloader/jobs_offloaded') 995 metrics.Counter(m_offload_count).increment( 996 fields=metrics_fields) 997 m_offload_size = ('chromeos/autotest/gs_offloader/' 998 'kilobytes_transferred') 999 metrics.Counter(m_offload_size).increment_by( 1000 dir_size, fields=metrics_fields) 1001 1002 1003def _is_uploaded(dirpath): 1004 """Return whether directory has been uploaded. 1005 1006 @param dirpath: Directory path string. 1007 """ 1008 return os.path.isfile(_get_uploaded_marker_file(dirpath)) 1009 1010 1011def _mark_uploaded(dirpath): 1012 """Mark directory as uploaded. 1013 1014 @param dirpath: Directory path string. 1015 """ 1016 logging.debug('Creating uploaded marker for directory %s', dirpath) 1017 with open(_get_uploaded_marker_file(dirpath), 'a'): 1018 pass 1019 1020 1021def _mark_upload_finished(gs_path, stdout_file, stderr_file): 1022 """Mark a given gs_path upload as finished (remotely). 1023 1024 @param gs_path: gs:// url of the remote directory that is finished 1025 upload. 1026 """ 1027 cmd = _get_finish_cmd_list(gs_path) 1028 process = subprocess.Popen(cmd, stdout=stdout_file, stderr=stderr_file) 1029 process.wait() 1030 logging.debug('Finished marking as complete %s', cmd) 1031 1032 1033def _get_uploaded_marker_file(dirpath): 1034 """Return path to upload marker file for directory. 1035 1036 @param dirpath: Directory path string. 1037 """ 1038 return '%s/.GS_UPLOADED' % (dirpath,) 1039 1040 1041def _format_job_for_failure_reporting(job): 1042 """Formats a _JobDirectory for reporting / logging. 1043 1044 @param job: The _JobDirectory to format. 1045 """ 1046 d = datetime.datetime.fromtimestamp(job.first_offload_start) 1047 data = (d.strftime(FAILED_OFFLOADS_TIME_FORMAT), 1048 job.offload_count, 1049 job.dirname) 1050 return FAILED_OFFLOADS_LINE_FORMAT % data 1051 1052 1053def wait_for_gs_write_access(gs_uri): 1054 """Verify and wait until we have write access to Google Storage. 1055 1056 @param gs_uri: The Google Storage URI we are trying to offload to. 1057 """ 1058 # TODO (sbasi) Try to use the gsutil command to check write access. 1059 # Ensure we have write access to gs_uri. 1060 dummy_file = tempfile.NamedTemporaryFile() 1061 test_cmd = _get_cmd_list(False, dummy_file.name, gs_uri) 1062 while True: 1063 logging.debug('Checking for write access with dummy file %s', 1064 dummy_file.name) 1065 try: 1066 subprocess.check_call(test_cmd) 1067 subprocess.check_call( 1068 ['gsutil', 'rm', 1069 os.path.join(gs_uri, 1070 os.path.basename(dummy_file.name))]) 1071 break 1072 except subprocess.CalledProcessError: 1073 t = 120 1074 logging.debug('Unable to offload dummy file to %s, sleeping for %s ' 1075 'seconds.', gs_uri, t) 1076 time.sleep(t) 1077 logging.debug('Dummy file write check to gs succeeded.') 1078 1079 1080class Offloader(object): 1081 """State of the offload process. 1082 1083 Contains the following member fields: 1084 * _gs_offloader: _BaseGSOffloader to use to offload a job directory. 1085 * _jobdir_classes: List of classes of job directory to be 1086 offloaded. 1087 * _processes: Maximum number of outstanding offload processes 1088 to allow during an offload cycle. 1089 * _age_limit: Minimum age in days at which a job may be 1090 offloaded. 1091 * _open_jobs: a dictionary mapping directory paths to Job 1092 objects. 1093 """ 1094 1095 def __init__(self, options): 1096 self._upload_age_limit = options.age_to_upload 1097 self._delete_age_limit = options.age_to_delete 1098 if options.delete_only: 1099 self._gs_offloader = FakeGSOffloader() 1100 else: 1101 self.gs_uri = utils.get_offload_gsuri() 1102 logging.debug('Offloading to: %s', self.gs_uri) 1103 multiprocessing = False 1104 if options.multiprocessing: 1105 multiprocessing = True 1106 elif options.multiprocessing is None: 1107 multiprocessing = GS_OFFLOADER_MULTIPROCESSING 1108 logging.info( 1109 'Offloader multiprocessing is set to:%r', multiprocessing) 1110 console_client = None 1111 if (cloud_console_client and 1112 cloud_console_client.is_cloud_notification_enabled()): 1113 console_client = cloud_console_client.PubSubBasedClient() 1114 self._gs_offloader = GSOffloader( 1115 self.gs_uri, multiprocessing, self._delete_age_limit, 1116 console_client) 1117 classlist = [ 1118 job_directories.SwarmingJobDirectory, 1119 ] 1120 if options.process_hosts_only or options.process_all: 1121 classlist.append(job_directories.SpecialJobDirectory) 1122 if not options.process_hosts_only: 1123 classlist.append(job_directories.RegularJobDirectory) 1124 self._jobdir_classes = classlist 1125 assert self._jobdir_classes 1126 self._processes = options.parallelism 1127 self._open_jobs = {} 1128 self._pusub_topic = None 1129 self._offload_count_limit = 3 1130 1131 1132 def _add_new_jobs(self): 1133 """Find new job directories that need offloading. 1134 1135 Go through the file system looking for valid job directories 1136 that are currently not in `self._open_jobs`, and add them in. 1137 1138 """ 1139 new_job_count = 0 1140 for cls in self._jobdir_classes: 1141 for resultsdir in cls.get_job_directories(): 1142 if resultsdir in self._open_jobs: 1143 continue 1144 self._open_jobs[resultsdir] = cls(resultsdir) 1145 new_job_count += 1 1146 logging.debug('Start of offload cycle - found %d new jobs', 1147 new_job_count) 1148 1149 1150 def _remove_offloaded_jobs(self): 1151 """Removed offloaded jobs from `self._open_jobs`.""" 1152 removed_job_count = 0 1153 for jobkey, job in self._open_jobs.items(): 1154 if ( 1155 not os.path.exists(job.dirname) 1156 or _is_uploaded(job.dirname)): 1157 del self._open_jobs[jobkey] 1158 removed_job_count += 1 1159 logging.debug('End of offload cycle - cleared %d jobs, ' 1160 'carrying %d open jobs', 1161 removed_job_count, len(self._open_jobs)) 1162 1163 1164 def _report_failed_jobs(self): 1165 """Report status after attempting offload. 1166 1167 This function processes all jobs in `self._open_jobs`, assuming 1168 an attempt has just been made to offload all of them. 1169 1170 If any jobs have reportable errors, and we haven't generated 1171 an e-mail report in the last `REPORT_INTERVAL_SECS` seconds, 1172 send new e-mail describing the failures. 1173 1174 """ 1175 failed_jobs = [j for j in self._open_jobs.values() if 1176 j.first_offload_start] 1177 self._report_failed_jobs_count(failed_jobs) 1178 self._log_failed_jobs_locally(failed_jobs) 1179 1180 1181 def offload_once(self): 1182 """Perform one offload cycle. 1183 1184 Find all job directories for new jobs that we haven't seen 1185 before. Then, attempt to offload the directories for any 1186 jobs that have finished running. Offload of multiple jobs 1187 is done in parallel, up to `self._processes` at a time. 1188 1189 After we've tried uploading all directories, go through the list 1190 checking the status of all uploaded directories. If necessary, 1191 report failures via e-mail. 1192 1193 """ 1194 self._add_new_jobs() 1195 self._report_current_jobs_count() 1196 with parallel.BackgroundTaskRunner( 1197 self._gs_offloader.offload, processes=self._processes) as queue: 1198 for job in self._open_jobs.values(): 1199 _enqueue_offload(job, queue, self._upload_age_limit) 1200 self._give_up_on_jobs_over_limit() 1201 self._remove_offloaded_jobs() 1202 self._report_failed_jobs() 1203 1204 1205 def _give_up_on_jobs_over_limit(self): 1206 """Give up on jobs that have gone over the offload limit. 1207 1208 We mark them as uploaded as we won't try to offload them any more. 1209 """ 1210 for job in self._open_jobs.values(): 1211 if job.offload_count >= self._offload_count_limit: 1212 _mark_uploaded(job.dirname) 1213 1214 1215 def _log_failed_jobs_locally(self, failed_jobs, 1216 log_file=FAILED_OFFLOADS_FILE): 1217 """Updates a local file listing all the failed jobs. 1218 1219 The dropped file can be used by the developers to list jobs that we have 1220 failed to upload. 1221 1222 @param failed_jobs: A list of failed _JobDirectory objects. 1223 @param log_file: The file to log the failed jobs to. 1224 """ 1225 now = datetime.datetime.now() 1226 now_str = now.strftime(FAILED_OFFLOADS_TIME_FORMAT) 1227 formatted_jobs = [_format_job_for_failure_reporting(job) 1228 for job in failed_jobs] 1229 formatted_jobs.sort() 1230 1231 with open(log_file, 'w') as logfile: 1232 logfile.write(FAILED_OFFLOADS_FILE_HEADER % 1233 (now_str, len(failed_jobs))) 1234 logfile.writelines(formatted_jobs) 1235 1236 1237 def _report_current_jobs_count(self): 1238 """Report the number of outstanding jobs to monarch.""" 1239 metrics.Gauge('chromeos/autotest/gs_offloader/current_jobs_count').set( 1240 len(self._open_jobs)) 1241 1242 1243 def _report_failed_jobs_count(self, failed_jobs): 1244 """Report the number of outstanding failed offload jobs to monarch. 1245 1246 @param: List of failed jobs. 1247 """ 1248 metrics.Gauge('chromeos/autotest/gs_offloader/failed_jobs_count').set( 1249 len(failed_jobs)) 1250 1251 1252def _enqueue_offload(job, queue, age_limit): 1253 """Enqueue the job for offload, if it's eligible. 1254 1255 The job is eligible for offloading if the database has marked 1256 it finished, and the job is older than the `age_limit` 1257 parameter. 1258 1259 If the job is eligible, offload processing is requested by 1260 passing the `queue` parameter's `put()` method a sequence with 1261 the job's `dirname` attribute and its directory name. 1262 1263 @param job _JobDirectory instance to offload. 1264 @param queue If the job should be offloaded, put the offload 1265 parameters into this queue for processing. 1266 @param age_limit Minimum age for a job to be offloaded. A value 1267 of 0 means that the job will be offloaded as 1268 soon as it is finished. 1269 1270 """ 1271 if not job.offload_count: 1272 if not _is_expired(job, age_limit): 1273 return 1274 job.first_offload_start = time.time() 1275 job.offload_count += 1 1276 if job.process_gs_instructions(): 1277 timestamp = _cached_get_timestamp_if_finished(job) 1278 queue.put([job.dirname, os.path.dirname(job.dirname), timestamp]) 1279 1280 1281def parse_options(): 1282 """Parse the args passed into gs_offloader.""" 1283 defaults = 'Defaults:\n Destination: %s\n Results Path: %s' % ( 1284 utils.DEFAULT_OFFLOAD_GSURI, RESULTS_DIR) 1285 usage = 'usage: %prog [options]\n' + defaults 1286 parser = OptionParser(usage) 1287 parser.add_option('-a', '--all', dest='process_all', 1288 action='store_true', 1289 help='Offload all files in the results directory.') 1290 parser.add_option('-s', '--hosts', dest='process_hosts_only', 1291 action='store_true', 1292 help='Offload only the special tasks result files ' 1293 'located in the results/hosts subdirectory') 1294 parser.add_option('-p', '--parallelism', dest='parallelism', 1295 type='int', default=1, 1296 help='Number of parallel workers to use.') 1297 parser.add_option('-o', '--delete_only', dest='delete_only', 1298 action='store_true', 1299 help='GS Offloader will only the delete the ' 1300 'directories and will not offload them to google ' 1301 'storage. NOTE: If global_config variable ' 1302 'CROS.gs_offloading_enabled is False, --delete_only ' 1303 'is automatically True.', 1304 default=not GS_OFFLOADING_ENABLED) 1305 parser.add_option('-d', '--days_old', dest='days_old', 1306 help='Minimum job age in days before a result can be ' 1307 'offloaded.', type='int', default=0) 1308 parser.add_option('-l', '--log_size', dest='log_size', 1309 help='Limit the offloader logs to a specified ' 1310 'number of Mega Bytes.', type='int', default=0) 1311 parser.add_option('-m', dest='multiprocessing', action='store_true', 1312 help='Turn on -m option for gsutil. If not set, the ' 1313 'global config setting gs_offloader_multiprocessing ' 1314 'under CROS section is applied.') 1315 parser.add_option('-i', '--offload_once', dest='offload_once', 1316 action='store_true', 1317 help='Upload all available results and then exit.') 1318 parser.add_option('-y', '--normal_priority', dest='normal_priority', 1319 action='store_true', 1320 help='Upload using normal process priority.') 1321 parser.add_option('-u', '--age_to_upload', dest='age_to_upload', 1322 help='Minimum job age in days before a result can be ' 1323 'offloaded, but not removed from local storage', 1324 type='int', default=None) 1325 parser.add_option('-n', '--age_to_delete', dest='age_to_delete', 1326 help='Minimum job age in days before a result can be ' 1327 'removed from local storage', 1328 type='int', default=None) 1329 parser.add_option( 1330 '--metrics-file', 1331 help='If provided, drop metrics to this local file instead of ' 1332 'reporting to ts_mon', 1333 type=str, 1334 default=None, 1335 ) 1336 parser.add_option('-t', '--enable_timestamp_cache', 1337 dest='enable_timestamp_cache', 1338 action='store_true', 1339 help='Cache the finished timestamps from AFE.') 1340 1341 options = parser.parse_args()[0] 1342 if options.process_all and options.process_hosts_only: 1343 parser.print_help() 1344 print ('Cannot process all files and only the hosts ' 1345 'subdirectory. Please remove an argument.') 1346 sys.exit(1) 1347 1348 if options.days_old and (options.age_to_upload or options.age_to_delete): 1349 parser.print_help() 1350 print('Use the days_old option or the age_to_* options but not both') 1351 sys.exit(1) 1352 1353 if options.age_to_upload == None: 1354 options.age_to_upload = options.days_old 1355 if options.age_to_delete == None: 1356 options.age_to_delete = options.days_old 1357 1358 return options 1359 1360 1361def main(): 1362 """Main method of gs_offloader.""" 1363 options = parse_options() 1364 1365 if options.process_all: 1366 offloader_type = 'all' 1367 elif options.process_hosts_only: 1368 offloader_type = 'hosts' 1369 else: 1370 offloader_type = 'jobs' 1371 1372 _setup_logging(options, offloader_type) 1373 1374 if options.enable_timestamp_cache: 1375 # Extend the cache expiry time by another 1% so the timstamps 1376 # are available as the results are purged. 1377 job_timestamp_cache.setup(options.age_to_delete * 1.01) 1378 1379 # Nice our process (carried to subprocesses) so we don't overload 1380 # the system. 1381 if not options.normal_priority: 1382 logging.debug('Set process to nice value: %d', NICENESS) 1383 os.nice(NICENESS) 1384 if psutil: 1385 proc = psutil.Process() 1386 logging.debug('Set process to ionice IDLE') 1387 proc.ionice(psutil.IOPRIO_CLASS_IDLE) 1388 1389 # os.listdir returns relative paths, so change to where we need to 1390 # be to avoid an os.path.join on each loop. 1391 logging.debug('Offloading Autotest results in %s', RESULTS_DIR) 1392 os.chdir(RESULTS_DIR) 1393 1394 service_name = 'gs_offloader(%s)' % offloader_type 1395 with ts_mon_config.SetupTsMonGlobalState(service_name, indirect=True, 1396 short_lived=False, 1397 debug_file=options.metrics_file): 1398 with metrics.SuccessCounter('chromeos/autotest/gs_offloader/exit'): 1399 offloader = Offloader(options) 1400 if not options.delete_only: 1401 wait_for_gs_write_access(offloader.gs_uri) 1402 while True: 1403 offloader.offload_once() 1404 if options.offload_once: 1405 break 1406 time.sleep(SLEEP_TIME_SECS) 1407 1408 1409_LOG_LOCATION = '/usr/local/autotest/logs/' 1410_LOG_FILENAME_FORMAT = 'gs_offloader_%s_log_%s.txt' 1411_LOG_TIMESTAMP_FORMAT = '%Y%m%d_%H%M%S' 1412_LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' 1413 1414 1415def _setup_logging(options, offloader_type): 1416 """Set up logging. 1417 1418 @param options: Parsed options. 1419 @param offloader_type: Type of offloader action as string. 1420 """ 1421 log_filename = _get_log_filename(options, offloader_type) 1422 log_formatter = logging.Formatter(_LOGGING_FORMAT) 1423 # Replace the default logging handler with a RotatingFileHandler. If 1424 # options.log_size is 0, the file size will not be limited. Keeps 1425 # one backup just in case. 1426 handler = logging.handlers.RotatingFileHandler( 1427 log_filename, maxBytes=1024 * options.log_size, backupCount=1) 1428 handler.setFormatter(log_formatter) 1429 logger = logging.getLogger() 1430 logger.setLevel(logging.DEBUG) 1431 logger.addHandler(handler) 1432 1433 1434def _get_log_filename(options, offloader_type): 1435 """Get log filename. 1436 1437 @param options: Parsed options. 1438 @param offloader_type: Type of offloader action as string. 1439 """ 1440 if options.log_size > 0: 1441 log_timestamp = '' 1442 else: 1443 log_timestamp = time.strftime(_LOG_TIMESTAMP_FORMAT) 1444 log_basename = _LOG_FILENAME_FORMAT % (offloader_type, log_timestamp) 1445 return os.path.join(_LOG_LOCATION, log_basename) 1446 1447 1448if __name__ == '__main__': 1449 main() 1450