1#!/usr/bin/python2
2#
3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Script to archive old Autotest results to Google Storage.
8
9Uses gsutil to archive files to the configured Google Storage bucket.
10Upon successful copy, the local results directory is deleted.
11"""
12
13import abc
14try:
15  import cachetools
16except ImportError:
17  cachetools = None
18import datetime
19import errno
20import glob
21import gzip
22import logging
23import logging.handlers
24import os
25import re
26import shutil
27import stat
28import subprocess
29import sys
30import tarfile
31import tempfile
32import time
33import urllib
34
35from optparse import OptionParser
36
37import common
38from autotest_lib.client.common_lib import file_utils
39from autotest_lib.client.common_lib import global_config
40from autotest_lib.client.common_lib import utils
41from autotest_lib.site_utils import job_directories
42# For unittest, the cloud_console.proto is not compiled yet.
43try:
44    from autotest_lib.site_utils import cloud_console_client
45except ImportError:
46    cloud_console_client = None
47from autotest_lib.tko import models
48from autotest_lib.utils import labellib
49from autotest_lib.utils import gslib
50from autotest_lib.utils.side_effects import config_loader
51from chromite.lib import timeout_util
52
53# Autotest requires the psutil module from site-packages, so it must be imported
54# after "import common".
55try:
56    # Does not exist, nor is needed, on moblab.
57    import psutil
58except ImportError:
59    psutil = None
60
61from chromite.lib import parallel
62try:
63    from chromite.lib import metrics
64    from chromite.lib import ts_mon_config
65except ImportError:
66    metrics = utils.metrics_mock
67    ts_mon_config = utils.metrics_mock
68
69
70GS_OFFLOADING_ENABLED = global_config.global_config.get_config_value(
71        'CROS', 'gs_offloading_enabled', type=bool, default=True)
72
73# Nice setting for process, the higher the number the lower the priority.
74NICENESS = 10
75
76# Maximum number of seconds to allow for offloading a single
77# directory.
78OFFLOAD_TIMEOUT_SECS = 60 * 60
79
80# Sleep time per loop.
81SLEEP_TIME_SECS = 5
82
83# Minimum number of seconds between e-mail reports.
84REPORT_INTERVAL_SECS = 60 * 60
85
86# Location of Autotest results on disk.
87RESULTS_DIR = '/usr/local/autotest/results'
88FAILED_OFFLOADS_FILE = os.path.join(RESULTS_DIR, 'FAILED_OFFLOADS')
89
90FAILED_OFFLOADS_FILE_HEADER = '''
91This is the list of gs_offloader failed jobs.
92Last offloader attempt at %s failed to offload %d files.
93Check http://go/cros-triage-gsoffloader to triage the issue
94
95
96First failure       Count   Directory name
97=================== ======  ==============================
98'''
99# --+----1----+----  ----+  ----+----1----+----2----+----3
100
101FAILED_OFFLOADS_LINE_FORMAT = '%19s  %5d  %-1s\n'
102FAILED_OFFLOADS_TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
103
104USE_RSYNC_ENABLED = global_config.global_config.get_config_value(
105        'CROS', 'gs_offloader_use_rsync', type=bool, default=False)
106
107LIMIT_FILE_COUNT = global_config.global_config.get_config_value(
108        'CROS', 'gs_offloader_limit_file_count', type=bool, default=False)
109
110# Use multiprocessing for gsutil uploading.
111GS_OFFLOADER_MULTIPROCESSING = global_config.global_config.get_config_value(
112        'CROS', 'gs_offloader_multiprocessing', type=bool, default=False)
113
114D = '[0-9][0-9]'
115TIMESTAMP_PATTERN = '%s%s.%s.%s_%s.%s.%s' % (D, D, D, D, D, D, D)
116CTS_RESULT_PATTERN = 'testResult.xml'
117CTS_COMPRESSED_RESULT_PATTERN = 'testResult.xml.tgz'
118CTS_V2_RESULT_PATTERN = 'test_result.xml'
119CTS_V2_COMPRESSED_RESULT_PATTERN = 'test_result.xml.tgz'
120
121CTS_COMPRESSED_RESULT_TYPES = {
122        CTS_COMPRESSED_RESULT_PATTERN: CTS_RESULT_PATTERN,
123        CTS_V2_COMPRESSED_RESULT_PATTERN: CTS_V2_RESULT_PATTERN}
124
125# Google Storage bucket URI to store results in.
126DEFAULT_CTS_RESULTS_GSURI = global_config.global_config.get_config_value(
127        'CROS', 'cts_results_server', default='')
128DEFAULT_CTS_APFE_GSURI = global_config.global_config.get_config_value(
129        'CROS', 'cts_apfe_server', default='')
130DEFAULT_CTS_BVT_APFE_GSURI = global_config.global_config.get_config_value(
131        'CROS', 'ctsbvt_apfe_server', default='')
132
133# metadata type
134GS_OFFLOADER_SUCCESS_TYPE = 'gs_offloader_success'
135GS_OFFLOADER_FAILURE_TYPE = 'gs_offloader_failure'
136
137# Autotest test to collect list of CTS tests
138TEST_LIST_COLLECTOR = 'tradefed-run-collect-tests-only'
139
140def _get_metrics_fields(dir_entry):
141    """Get metrics fields for the given test result directory, including board
142    and milestone.
143
144    @param dir_entry: Directory entry to offload.
145    @return A dictionary for the metrics data to be uploaded.
146    """
147    fields = {'board': 'unknown',
148              'milestone': 'unknown'}
149    if dir_entry:
150        # There could be multiple hosts in the job directory, use the first one
151        # available.
152        for host in glob.glob(os.path.join(dir_entry, '*')):
153            try:
154                keyval = models.test.parse_job_keyval(host)
155            except ValueError:
156                continue
157            build = keyval.get('build')
158            if build:
159                try:
160                    cros_version = labellib.parse_cros_version(build)
161                    fields['board'] = cros_version.board
162                    fields['milestone'] = cros_version.milestone
163                    break
164                except ValueError:
165                    # Ignore version parsing error so it won't crash
166                    # gs_offloader.
167                    pass
168
169    return fields
170
171
172def _get_cmd_list(multiprocessing, dir_entry, gs_path):
173    """Return the command to offload a specified directory.
174
175    @param multiprocessing: True to turn on -m option for gsutil.
176    @param dir_entry: Directory entry/path that which we need a cmd_list
177                      to offload.
178    @param gs_path: Location in google storage where we will
179                    offload the directory.
180
181    @return A command list to be executed by Popen.
182    """
183    cmd = ['gsutil']
184    if multiprocessing:
185        cmd.append('-m')
186    if USE_RSYNC_ENABLED:
187        cmd.append('rsync')
188        target = os.path.join(gs_path, os.path.basename(dir_entry))
189    else:
190        cmd.append('cp')
191        target = gs_path
192    cmd += ['-eR', dir_entry, target]
193    return cmd
194
195
196def _get_finish_cmd_list(gs_path):
197    """Returns a command to remotely mark a given gs path as finished.
198
199    @param gs_path: Location in google storage where the offload directory
200                    should be marked as finished.
201
202    @return A command list to be executed by Popen.
203    """
204    target = os.path.join(gs_path, '.finished_offload')
205    return [
206        'gsutil',
207        'cp',
208        '/dev/null',
209        target,
210        ]
211
212
213def sanitize_dir(dirpath):
214    """Sanitize directory for gs upload.
215
216    Symlinks and FIFOS are converted to regular files to fix bugs.
217
218    @param dirpath: Directory entry to be sanitized.
219    """
220    if not os.path.exists(dirpath):
221        return
222    _escape_rename(dirpath)
223    _escape_rename_dir_contents(dirpath)
224    _sanitize_fifos(dirpath)
225    _sanitize_symlinks(dirpath)
226
227
228def _escape_rename_dir_contents(dirpath):
229    """Recursively rename directory to escape filenames for gs upload.
230
231    @param dirpath: Directory path string.
232    """
233    for filename in os.listdir(dirpath):
234        path = os.path.join(dirpath, filename)
235        _escape_rename(path)
236    for filename in os.listdir(dirpath):
237        path = os.path.join(dirpath, filename)
238        if os.path.isdir(path):
239            _escape_rename_dir_contents(path)
240
241
242def _escape_rename(path):
243    """Rename file to escape filenames for gs upload.
244
245    @param path: File path string.
246    """
247    dirpath, filename = os.path.split(path)
248    sanitized_filename = gslib.escape(filename)
249    sanitized_path = os.path.join(dirpath, sanitized_filename)
250    os.rename(path, sanitized_path)
251
252
253def _sanitize_fifos(dirpath):
254    """Convert fifos to regular files (fixes crbug.com/684122).
255
256    @param dirpath: Directory path string.
257    """
258    for root, _, files in os.walk(dirpath):
259        for filename in files:
260            path = os.path.join(root, filename)
261            file_stat = os.lstat(path)
262            if stat.S_ISFIFO(file_stat.st_mode):
263                _replace_fifo_with_file(path)
264
265
266def _replace_fifo_with_file(path):
267    """Replace a fifo with a normal file.
268
269    @param path: Fifo path string.
270    """
271    logging.debug('Removing fifo %s', path)
272    os.remove(path)
273    logging.debug('Creating fifo marker %s', path)
274    with open(path, 'w') as f:
275        f.write('<FIFO>')
276
277
278def _sanitize_symlinks(dirpath):
279    """Convert Symlinks to regular files (fixes crbug.com/692788).
280
281    @param dirpath: Directory path string.
282    """
283    for root, _, files in os.walk(dirpath):
284        for filename in files:
285            path = os.path.join(root, filename)
286            file_stat = os.lstat(path)
287            if stat.S_ISLNK(file_stat.st_mode):
288                _replace_symlink_with_file(path)
289
290
291def _replace_symlink_with_file(path):
292    """Replace a symlink with a normal file.
293
294    @param path: Symlink path string.
295    """
296    target = os.readlink(path)
297    logging.debug('Removing symlink %s', path)
298    os.remove(path)
299    logging.debug('Creating symlink marker %s', path)
300    with open(path, 'w') as f:
301        f.write('<symlink to %s>' % target)
302
303
304# Maximum number of files in the folder.
305_MAX_FILE_COUNT = 3000
306_FOLDERS_NEVER_ZIP = ['debug', 'ssp_logs', 'autoupdate_logs']
307
308
309def _get_zippable_folders(dir_entry):
310    folders_list = []
311    for folder in os.listdir(dir_entry):
312        folder_path = os.path.join(dir_entry, folder)
313        if (not os.path.isfile(folder_path) and
314                not folder in _FOLDERS_NEVER_ZIP):
315            folders_list.append(folder_path)
316    return folders_list
317
318
319def limit_file_count(dir_entry):
320    """Limit the number of files in given directory.
321
322    The method checks the total number of files in the given directory.
323    If the number is greater than _MAX_FILE_COUNT, the method will
324    compress each folder in the given directory, except folders in
325    _FOLDERS_NEVER_ZIP.
326
327    @param dir_entry: Directory entry to be checked.
328    """
329    try:
330        count = _count_files(dir_entry)
331    except ValueError:
332        logging.warning('Fail to get the file count in folder %s.', dir_entry)
333        return
334    if count < _MAX_FILE_COUNT:
335        return
336
337    # For test job, zip folders in a second level, e.g. 123-debug/host1.
338    # This is to allow autoserv debug folder still be accessible.
339    # For special task, it does not need to dig one level deeper.
340    is_special_task = re.match(job_directories.SPECIAL_TASK_PATTERN,
341                               dir_entry)
342
343    folders = _get_zippable_folders(dir_entry)
344    if not is_special_task:
345        subfolders = []
346        for folder in folders:
347            subfolders.extend(_get_zippable_folders(folder))
348        folders = subfolders
349
350    for folder in folders:
351        _make_into_tarball(folder)
352
353
354def _count_files(dirpath):
355    """Count the number of files in a directory recursively.
356
357    @param dirpath: Directory path string.
358    """
359    return sum(len(files) for _path, _dirs, files in os.walk(dirpath))
360
361
362def _make_into_tarball(dirpath):
363    """Make directory into tarball.
364
365    @param dirpath: Directory path string.
366    """
367    tarpath = '%s.tgz' % dirpath
368    with tarfile.open(tarpath, 'w:gz') as tar:
369        tar.add(dirpath, arcname=os.path.basename(dirpath))
370    shutil.rmtree(dirpath)
371
372
373def correct_results_folder_permission(dir_entry):
374    """Make sure the results folder has the right permission settings.
375
376    For tests running with server-side packaging, the results folder has
377    the owner of root. This must be changed to the user running the
378    autoserv process, so parsing job can access the results folder.
379
380    @param dir_entry: Path to the results folder.
381    """
382    if not dir_entry:
383        return
384
385    logging.info('Trying to correct file permission of %s.', dir_entry)
386    try:
387        owner = '%s:%s' % (os.getuid(), os.getgid())
388        subprocess.check_call(
389                ['sudo', '-n', 'chown', '-R', owner, dir_entry])
390        subprocess.check_call(['chmod', '-R', 'u+rw', dir_entry])
391        subprocess.check_call(
392                ['find', dir_entry, '-type', 'd',
393                 '-exec', 'chmod', 'u+x', '{}', ';'])
394    except subprocess.CalledProcessError as e:
395        logging.error('Failed to modify permission for %s: %s',
396                      dir_entry, e)
397
398
399def _upload_cts_testresult(dir_entry, multiprocessing):
400    """Upload test results to separate gs buckets.
401
402    Upload testResult.xml.gz/test_result.xml.gz file to cts_results_bucket.
403    Upload timestamp.zip to cts_apfe_bucket.
404
405    @param dir_entry: Path to the results folder.
406    @param multiprocessing: True to turn on -m option for gsutil.
407    """
408    for host in glob.glob(os.path.join(dir_entry, '*')):
409        cts_path = os.path.join(host, 'cheets_CTS.*', 'results', '*',
410                                TIMESTAMP_PATTERN)
411        cts_v2_path = os.path.join(host, 'cheets_CTS_*', 'results', '*',
412                                   TIMESTAMP_PATTERN)
413        gts_v2_path = os.path.join(host, 'cheets_GTS*', 'results', '*',
414                                   TIMESTAMP_PATTERN)
415        for result_path, result_pattern in [(cts_path, CTS_RESULT_PATTERN),
416                            (cts_path, CTS_COMPRESSED_RESULT_PATTERN),
417                            (cts_v2_path, CTS_V2_RESULT_PATTERN),
418                            (cts_v2_path, CTS_V2_COMPRESSED_RESULT_PATTERN),
419                            (gts_v2_path, CTS_V2_RESULT_PATTERN)]:
420            for path in glob.glob(result_path):
421                try:
422                    # Treat BVT and non-BVT CTS test results same, offload them
423                    # to APFE and result buckets. More details in b/172869794.
424                    # We will make this more structured when moving to
425                    # synchronous offloading.
426                    _upload_files(host, path, result_pattern,
427                                  multiprocessing,
428                                  DEFAULT_CTS_RESULTS_GSURI,
429                                  DEFAULT_CTS_APFE_GSURI)
430                except Exception as e:
431                    logging.error('ERROR uploading test results %s to GS: %s',
432                                  path, e)
433
434
435def _is_valid_result(build, result_pattern, suite):
436    """Check if the result should be uploaded to CTS/GTS buckets.
437
438    @param build: Builder name.
439    @param result_pattern: XML result file pattern.
440    @param suite: Test suite name.
441
442    @returns: Bool flag indicating whether a valid result.
443    """
444    if build is None or suite is None:
445        return False
446
447    # Not valid if it's not a release build.
448    if not re.match(r'(?!trybot-).*-release/.*', build):
449        return False
450
451    # Not valid if it's cts result but not 'arc-cts*' or 'test_that_wrapper'
452    # suite.
453    result_patterns = [CTS_RESULT_PATTERN, CTS_V2_RESULT_PATTERN]
454    if result_pattern in result_patterns and not (
455            suite.startswith('arc-cts') or
456            suite.startswith('arc-gts') or
457            suite.startswith('bvt-arc') or
458            suite.startswith('bvt-perbuild') or
459            suite.startswith('cros_test_platform') or
460            suite.startswith('test_that_wrapper')):
461        return False
462
463    return True
464
465
466def _is_test_collector(package):
467    """Returns true if the test run is just to collect list of CTS tests.
468
469    @param package: Autotest package name. e.g. cheets_CTS_N.CtsGraphicsTestCase
470
471    @return Bool flag indicating a test package is CTS list generator or not.
472    """
473    return TEST_LIST_COLLECTOR in package
474
475
476def _get_swarming_req_dir(path):
477    """
478    Returns the parent directory of |path|, if |path| is a swarming task result.
479
480    @param path: Full path to the result of a task.
481                      e.g. /tmp/results/swarming-44466815c4bc951/1
482
483    @return string of the parent dir or None if not a swarming task.
484    """
485    m_parent = re.match(
486            '(?P<parent_dir>.*/swarming-[0-9a-fA-F]*0)/[1-9a-fA-F]$', path)
487    if m_parent:
488        return m_parent.group('parent_dir')
489    return None
490
491
492def _parse_cts_job_results_file_path(path):
493    """Parse CTS file paths an extract required information from them."""
494
495    # Autotest paths look like:
496    # /317739475-chromeos-test/chromeos4-row9-rack11-host22/
497    # cheets_CTS.android.dpi/results/cts-results/2016.04.28_01.41.44
498
499    # Swarming paths look like:
500    # /swarming-458e3a3a7fc6f210/1/autoserv_test/
501    # cheets_CTS.android.dpi/results/cts-results/2016.04.28_01.41.44
502
503    folders = path.split(os.sep)
504    if 'swarming' in folders[1]:
505        # Swarming job and attempt combined
506        job_id = "%s-%s" % (folders[-7], folders[-6])
507    else:
508        job_id = folders[-6]
509
510    cts_package = folders[-4]
511    timestamp = folders[-1]
512
513    return job_id, cts_package, timestamp
514
515
516def _upload_files(host, path, result_pattern, multiprocessing,
517                  result_gs_bucket, apfe_gs_bucket):
518    keyval = models.test.parse_job_keyval(host)
519    build = keyval.get('build')
520    suite = keyval.get('suite')
521
522    host_keyval = models.test.parse_host_keyval(host, keyval.get('hostname'))
523    labels =  urllib.unquote(host_keyval.get('labels'))
524    try:
525        host_model_name = re.search(r'model:(\w+)', labels).group(1)
526    except AttributeError:
527        logging.error('Model: name attribute is missing in %s/host_keyval/%s.',
528                      host, keyval.get('hostname'))
529        return
530
531    if not _is_valid_result(build, result_pattern, suite):
532        # No need to upload current folder, return.
533        return
534
535    parent_job_id = str(keyval['parent_job_id'])
536
537    job_id, package, timestamp = _parse_cts_job_results_file_path(path)
538
539    # Results produced by CTS test list collector are dummy results.
540    # They don't need to be copied to APFE bucket which is mainly being used for
541    # CTS APFE submission.
542    if not _is_test_collector(package):
543        # Path: bucket/build/parent_job_id/cheets_CTS.*/job_id_timestamp/
544        # or bucket/build/parent_job_id/cheets_GTS.*/job_id_timestamp/
545        index = build.find('-release')
546        build_with_model_name = ''
547        if index == -1:
548            logging.info('Not a release build.'
549                         'Non release build results can be skipped from offloading')
550            return
551
552        # CTS v2 pipeline requires device info in 'board.model' format.
553        # e.g. coral.robo360-release, eve.eve-release
554        build_with_model_name = (build[:index] + '.' + host_model_name +
555                                     build[index:])
556
557        cts_apfe_gs_path = os.path.join(
558                apfe_gs_bucket, build_with_model_name, parent_job_id,
559                package, job_id + '_' + timestamp) + '/'
560
561        for zip_file in glob.glob(os.path.join('%s.zip' % path)):
562            utils.run(' '.join(_get_cmd_list(
563                    multiprocessing, zip_file, cts_apfe_gs_path)))
564            logging.debug('Upload %s to %s ', zip_file, cts_apfe_gs_path)
565    else:
566        logging.debug('%s is a CTS Test collector Autotest test run.', package)
567        logging.debug('Skipping CTS results upload to APFE gs:// bucket.')
568
569    if result_gs_bucket:
570        # Path: bucket/cheets_CTS.*/job_id_timestamp/
571        # or bucket/cheets_GTS.*/job_id_timestamp/
572        test_result_gs_path = os.path.join(
573                result_gs_bucket, package, job_id + '_' + timestamp) + '/'
574
575        for test_result_file in glob.glob(os.path.join(path, result_pattern)):
576            # gzip test_result_file(testResult.xml/test_result.xml)
577
578            test_result_tgz_file = ''
579            if test_result_file.endswith('tgz'):
580                # Extract .xml file from tgz file for better handling in the
581                # CTS dashboard pipeline.
582                # TODO(rohitbm): work with infra team to produce .gz file so
583                # tgz to gz middle conversion is not needed.
584                try:
585                    with tarfile.open(test_result_file, 'r:gz') as tar_file:
586                        tar_file.extract(
587                                CTS_COMPRESSED_RESULT_TYPES[result_pattern])
588                        test_result_tgz_file = test_result_file
589                        test_result_file = os.path.join(path,
590                                CTS_COMPRESSED_RESULT_TYPES[result_pattern])
591                except tarfile.ReadError as error:
592                    logging.debug(error)
593                except KeyError as error:
594                    logging.debug(error)
595
596            test_result_file_gz =  '%s.gz' % test_result_file
597            with open(test_result_file, 'r') as f_in, (
598                    gzip.open(test_result_file_gz, 'w')) as f_out:
599                shutil.copyfileobj(f_in, f_out)
600            utils.run(' '.join(_get_cmd_list(
601                    multiprocessing, test_result_file_gz, test_result_gs_path)))
602            logging.debug('Zip and upload %s to %s',
603                          test_result_file_gz, test_result_gs_path)
604            # Remove test_result_file_gz(testResult.xml.gz/test_result.xml.gz)
605            os.remove(test_result_file_gz)
606            # Remove extracted test_result.xml file.
607            if test_result_tgz_file:
608               os.remove(test_result_file)
609
610
611def _emit_gs_returncode_metric(returncode):
612    """Increment the gs_returncode counter based on |returncode|."""
613    m_gs_returncode = 'chromeos/autotest/gs_offloader/gs_returncode'
614    rcode = int(returncode)
615    if rcode < 0 or rcode > 255:
616        rcode = -1
617    metrics.Counter(m_gs_returncode).increment(fields={'return_code': rcode})
618
619
620def _handle_dir_os_error(dir_entry, fix_permission=False):
621    """Try to fix the result directory's permission issue if needed.
622
623    @param dir_entry: Directory entry to offload.
624    @param fix_permission: True to change the directory's owner to the same one
625            running gs_offloader.
626    """
627    if fix_permission:
628        correct_results_folder_permission(dir_entry)
629    m_permission_error = ('chromeos/autotest/errors/gs_offloader/'
630                          'wrong_permissions_count')
631    metrics_fields = _get_metrics_fields(dir_entry)
632    metrics.Counter(m_permission_error).increment(fields=metrics_fields)
633
634
635class BaseGSOffloader(object):
636
637    """Google Storage offloader interface."""
638
639    __metaclass__ = abc.ABCMeta
640
641    def offload(self, dir_entry, dest_path, job_complete_time):
642        """Safely offload a directory entry to Google Storage.
643
644        This method is responsible for copying the contents of
645        `dir_entry` to Google storage at `dest_path`.
646
647        When successful, the method must delete all of `dir_entry`.
648        On failure, `dir_entry` should be left undisturbed, in order
649        to allow for retry.
650
651        Errors are conveyed simply and solely by two methods:
652          * At the time of failure, write enough information to the log
653            to allow later debug, if necessary.
654          * Don't delete the content.
655
656        In order to guarantee robustness, this method must not raise any
657        exceptions.
658
659        @param dir_entry: Directory entry to offload.
660        @param dest_path: Location in google storage where we will
661                          offload the directory.
662        @param job_complete_time: The complete time of the job from the AFE
663                                  database.
664        """
665        try:
666            self._full_offload(dir_entry, dest_path, job_complete_time)
667        except Exception as e:
668            logging.debug('Exception in offload for %s', dir_entry)
669            logging.debug('Ignoring this error: %s', str(e))
670
671    @abc.abstractmethod
672    def _full_offload(self, dir_entry, dest_path, job_complete_time):
673        """Offload a directory entry to Google Storage.
674
675        This method implements the actual offload behavior of its
676        subclass.  To guarantee effective debug, this method should
677        catch all exceptions, and perform any reasonable diagnosis
678        or other handling.
679
680        @param dir_entry: Directory entry to offload.
681        @param dest_path: Location in google storage where we will
682                          offload the directory.
683        @param job_complete_time: The complete time of the job from the AFE
684                                  database.
685        """
686
687
688class GSOffloader(BaseGSOffloader):
689    """Google Storage Offloader."""
690
691    def __init__(self, gs_uri, multiprocessing, delete_age,
692            console_client=None):
693        """Returns the offload directory function for the given gs_uri
694
695        @param gs_uri: Google storage bucket uri to offload to.
696        @param multiprocessing: True to turn on -m option for gsutil.
697        @param console_client: The cloud console client. If None,
698          cloud console APIs are  not called.
699        """
700        self._gs_uri = gs_uri
701        self._multiprocessing = multiprocessing
702        self._delete_age = delete_age
703        self._console_client = console_client
704
705    @metrics.SecondsTimerDecorator(
706            'chromeos/autotest/gs_offloader/job_offload_duration')
707    def _full_offload(self, dir_entry, dest_path, job_complete_time):
708        """Offload the specified directory entry to Google storage.
709
710        @param dir_entry: Directory entry to offload.
711        @param dest_path: Location in google storage where we will
712                          offload the directory.
713        @param job_complete_time: The complete time of the job from the AFE
714                                  database.
715        """
716        with tempfile.TemporaryFile('w+') as stdout_file, \
717             tempfile.TemporaryFile('w+') as stderr_file:
718            try:
719                try:
720                    self._try_offload(dir_entry, dest_path, stdout_file,
721                                      stderr_file)
722                except OSError as e:
723                    # Correct file permission error of the directory, then raise
724                    # the exception so gs_offloader can retry later.
725                    _handle_dir_os_error(dir_entry, e.errno==errno.EACCES)
726                    # Try again after the permission issue is fixed.
727                    self._try_offload(dir_entry, dest_path, stdout_file,
728                                      stderr_file)
729            except _OffloadError as e:
730                metrics_fields = _get_metrics_fields(dir_entry)
731                m_any_error = 'chromeos/autotest/errors/gs_offloader/any_error'
732                metrics.Counter(m_any_error).increment(fields=metrics_fields)
733
734                # Rewind the log files for stdout and stderr and log
735                # their contents.
736                stdout_file.seek(0)
737                stderr_file.seek(0)
738                stderr_content = stderr_file.read()
739                logging.warning('Error occurred when offloading %s:', dir_entry)
740                logging.warning('Stdout:\n%s \nStderr:\n%s', stdout_file.read(),
741                                stderr_content)
742
743                # Some result files may have wrong file permission. Try
744                # to correct such error so later try can success.
745                # TODO(dshi): The code is added to correct result files
746                # with wrong file permission caused by bug 511778. After
747                # this code is pushed to lab and run for a while to
748                # clean up these files, following code and function
749                # correct_results_folder_permission can be deleted.
750                if 'CommandException: Error opening file' in stderr_content:
751                    correct_results_folder_permission(dir_entry)
752            else:
753                self._prune(dir_entry, job_complete_time)
754                swarming_req_dir = _get_swarming_req_dir(dir_entry)
755                if swarming_req_dir:
756                    self._prune_swarming_req_dir(swarming_req_dir)
757
758
759    def _try_offload(self, dir_entry, dest_path,
760                 stdout_file, stderr_file):
761        """Offload the specified directory entry to Google storage.
762
763        @param dir_entry: Directory entry to offload.
764        @param dest_path: Location in google storage where we will
765                          offload the directory.
766        @param job_complete_time: The complete time of the job from the AFE
767                                  database.
768        @param stdout_file: Log file.
769        @param stderr_file: Log file.
770        """
771        if _is_uploaded(dir_entry):
772            return
773        start_time = time.time()
774        metrics_fields = _get_metrics_fields(dir_entry)
775        error_obj = _OffloadError(start_time)
776        config = config_loader.load(dir_entry)
777        cts_enabled = True
778        if config:
779          # TODO(linxinan): use credential file assigned by the side_effect
780          # config.
781          if not config.cts.enabled:
782            cts_enabled = config.cts.enabled
783          if config.google_storage.bucket:
784            gs_prefix = ('' if config.google_storage.bucket.startswith('gs://')
785                         else 'gs://')
786            self._gs_uri = gs_prefix + config.google_storage.bucket
787        else:
788          # For now, the absence of config does not block gs_offloader
789          # from uploading files via default credential.
790          logging.debug('Failed to load the side effects config in %s.',
791                        dir_entry)
792        try:
793            sanitize_dir(dir_entry)
794            if DEFAULT_CTS_RESULTS_GSURI and cts_enabled:
795                _upload_cts_testresult(dir_entry, self._multiprocessing)
796
797            if LIMIT_FILE_COUNT:
798                limit_file_count(dir_entry)
799
800            process = None
801            with timeout_util.Timeout(OFFLOAD_TIMEOUT_SECS):
802                gs_path = '%s%s' % (self._gs_uri, dest_path)
803                cmd = _get_cmd_list(self._multiprocessing, dir_entry, gs_path)
804                logging.debug('Attempting an offload command %s', cmd)
805                process = subprocess.Popen(
806                    cmd, stdout=stdout_file, stderr=stderr_file)
807                process.wait()
808                logging.debug('Offload command %s completed; '
809                              'marking offload complete.', cmd)
810                _mark_upload_finished(gs_path, stdout_file, stderr_file)
811
812            _emit_gs_returncode_metric(process.returncode)
813            if process.returncode != 0:
814                raise error_obj
815            _emit_offload_metrics(dir_entry)
816
817            if self._console_client:
818                gcs_uri = os.path.join(gs_path,
819                        os.path.basename(dir_entry))
820                if not self._console_client.send_test_job_offloaded_message(
821                        gcs_uri):
822                    raise error_obj
823
824            _mark_uploaded(dir_entry)
825        except timeout_util.TimeoutError:
826            m_timeout = 'chromeos/autotest/errors/gs_offloader/timed_out_count'
827            metrics.Counter(m_timeout).increment(fields=metrics_fields)
828            # If we finished the call to Popen(), we may need to
829            # terminate the child process.  We don't bother calling
830            # process.poll(); that inherently races because the child
831            # can die any time it wants.
832            if process:
833                try:
834                    process.terminate()
835                except OSError:
836                    # We don't expect any error other than "No such
837                    # process".
838                    pass
839            logging.error('Offloading %s timed out after waiting %d '
840                          'seconds.', dir_entry, OFFLOAD_TIMEOUT_SECS)
841            raise error_obj
842
843    def _prune(self, dir_entry, job_complete_time):
844        """Prune directory if it is uploaded and expired.
845
846        @param dir_entry: Directory entry to offload.
847        @param job_complete_time: The complete time of the job from the AFE
848                                  database.
849        """
850        if not (_is_uploaded(dir_entry)
851                and job_directories.is_job_expired(self._delete_age,
852                                                   job_complete_time)):
853            return
854        try:
855            logging.debug('Pruning uploaded directory %s', dir_entry)
856            shutil.rmtree(dir_entry)
857            job_timestamp_cache.delete(dir_entry)
858        except OSError as e:
859            # The wrong file permission can lead call `shutil.rmtree(dir_entry)`
860            # to raise OSError with message 'Permission denied'. Details can be
861            # found in crbug.com/536151
862            _handle_dir_os_error(dir_entry, e.errno==errno.EACCES)
863            # Try again after the permission issue is fixed.
864            shutil.rmtree(dir_entry)
865
866    def _prune_swarming_req_dir(self, swarming_req_dir):
867        """Prune swarming request directory, if it is empty.
868
869        @param swarming_req_dir: Directory entry of a swarming request.
870        """
871        try:
872            logging.debug('Pruning swarming request directory %s',
873                          swarming_req_dir)
874            os.rmdir(swarming_req_dir)
875        except OSError as e:
876            # Do nothing and leave this directory to next attempt to remove.
877            logging.debug('Failed to prune swarming request directory %s',
878                          swarming_req_dir)
879
880
881class _OffloadError(Exception):
882    """Google Storage offload failed."""
883
884    def __init__(self, start_time):
885        super(_OffloadError, self).__init__(start_time)
886        self.start_time = start_time
887
888
889
890class FakeGSOffloader(BaseGSOffloader):
891
892    """Fake Google Storage Offloader that only deletes directories."""
893
894    def _full_offload(self, dir_entry, dest_path, job_complete_time):
895        """Pretend to offload a directory and delete it.
896
897        @param dir_entry: Directory entry to offload.
898        @param dest_path: Location in google storage where we will
899                          offload the directory.
900        @param job_complete_time: The complete time of the job from the AFE
901                                  database.
902        """
903        shutil.rmtree(dir_entry)
904
905
906class OptionalMemoryCache(object):
907   """Implements memory cache if cachetools module can be loaded.
908
909   If the platform has cachetools available then the cache will
910   be created, otherwise the get calls will always act as if there
911   was a cache miss and the set/delete will be no-ops.
912   """
913   cache = None
914
915   def setup(self, age_to_delete):
916       """Set up a TTL cache size based on how long the job will be handled.
917
918       Autotest jobs are handled by gs_offloader until they are deleted from
919       local storage, base the cache size on how long that is.
920
921       @param age_to_delete: Number of days after which items in the cache
922                             should expire.
923       """
924       if cachetools:
925           # Min cache is 1000 items for 10 mins. If the age to delete is 0
926           # days you still want a short / small cache.
927           # 2000 items is a good approximation for the max number of jobs a
928           # moblab # can produce in a day, lab offloads immediatly so
929           # the number of carried jobs should be very small in the normal
930           # case.
931           ttl = max(age_to_delete * 24 * 60 * 60, 600)
932           maxsize = max(age_to_delete * 2000, 1000)
933           job_timestamp_cache.cache = cachetools.TTLCache(maxsize=maxsize,
934                                                           ttl=ttl)
935
936   def get(self, key):
937       """If we have a cache try to retrieve from it."""
938       if self.cache is not None:
939           result = self.cache.get(key)
940           return result
941       return None
942
943   def add(self, key, value):
944       """If we have a cache try to store key/value."""
945       if self.cache is not None:
946           self.cache[key] = value
947
948   def delete(self, key):
949       """If we have a cache try to remove a key."""
950       if self.cache is not None:
951           return self.cache.delete(key)
952
953
954job_timestamp_cache = OptionalMemoryCache()
955
956
957def _cached_get_timestamp_if_finished(job):
958    """Retrieve a job finished timestamp from cache or AFE.
959    @param job       _JobDirectory instance to retrieve
960                     finished timestamp of..
961
962    @returns: None if the job is not finished, or the
963              last job finished time recorded by Autotest.
964    """
965    job_timestamp = job_timestamp_cache.get(job.dirname)
966    if not job_timestamp:
967        job_timestamp = job.get_timestamp_if_finished()
968        if job_timestamp:
969            job_timestamp_cache.add(job.dirname, job_timestamp)
970    return job_timestamp
971
972
973def _is_expired(job, age_limit):
974    """Return whether job directory is expired for uploading
975
976    @param job: _JobDirectory instance.
977    @param age_limit:  Minimum age in days at which a job may be offloaded.
978    """
979    job_timestamp = _cached_get_timestamp_if_finished(job)
980    if not job_timestamp:
981        return False
982    return job_directories.is_job_expired(age_limit, job_timestamp)
983
984
985def _emit_offload_metrics(dirpath):
986    """Emit gs offload metrics.
987
988    @param dirpath: Offloaded directory path.
989    """
990    dir_size = file_utils.get_directory_size_kibibytes(dirpath)
991    metrics_fields = _get_metrics_fields(dirpath)
992
993    m_offload_count = (
994            'chromeos/autotest/gs_offloader/jobs_offloaded')
995    metrics.Counter(m_offload_count).increment(
996            fields=metrics_fields)
997    m_offload_size = ('chromeos/autotest/gs_offloader/'
998                      'kilobytes_transferred')
999    metrics.Counter(m_offload_size).increment_by(
1000            dir_size, fields=metrics_fields)
1001
1002
1003def _is_uploaded(dirpath):
1004    """Return whether directory has been uploaded.
1005
1006    @param dirpath: Directory path string.
1007    """
1008    return os.path.isfile(_get_uploaded_marker_file(dirpath))
1009
1010
1011def _mark_uploaded(dirpath):
1012    """Mark directory as uploaded.
1013
1014    @param dirpath: Directory path string.
1015    """
1016    logging.debug('Creating uploaded marker for directory %s', dirpath)
1017    with open(_get_uploaded_marker_file(dirpath), 'a'):
1018        pass
1019
1020
1021def _mark_upload_finished(gs_path, stdout_file, stderr_file):
1022    """Mark a given gs_path upload as finished (remotely).
1023
1024    @param gs_path: gs:// url of the remote directory that is finished
1025                    upload.
1026    """
1027    cmd = _get_finish_cmd_list(gs_path)
1028    process = subprocess.Popen(cmd, stdout=stdout_file, stderr=stderr_file)
1029    process.wait()
1030    logging.debug('Finished marking as complete %s', cmd)
1031
1032
1033def _get_uploaded_marker_file(dirpath):
1034    """Return path to upload marker file for directory.
1035
1036    @param dirpath: Directory path string.
1037    """
1038    return '%s/.GS_UPLOADED' % (dirpath,)
1039
1040
1041def _format_job_for_failure_reporting(job):
1042    """Formats a _JobDirectory for reporting / logging.
1043
1044    @param job: The _JobDirectory to format.
1045    """
1046    d = datetime.datetime.fromtimestamp(job.first_offload_start)
1047    data = (d.strftime(FAILED_OFFLOADS_TIME_FORMAT),
1048            job.offload_count,
1049            job.dirname)
1050    return FAILED_OFFLOADS_LINE_FORMAT % data
1051
1052
1053def wait_for_gs_write_access(gs_uri):
1054    """Verify and wait until we have write access to Google Storage.
1055
1056    @param gs_uri: The Google Storage URI we are trying to offload to.
1057    """
1058    # TODO (sbasi) Try to use the gsutil command to check write access.
1059    # Ensure we have write access to gs_uri.
1060    dummy_file = tempfile.NamedTemporaryFile()
1061    test_cmd = _get_cmd_list(False, dummy_file.name, gs_uri)
1062    while True:
1063        logging.debug('Checking for write access with dummy file %s',
1064                      dummy_file.name)
1065        try:
1066            subprocess.check_call(test_cmd)
1067            subprocess.check_call(
1068                    ['gsutil', 'rm',
1069                     os.path.join(gs_uri,
1070                                  os.path.basename(dummy_file.name))])
1071            break
1072        except subprocess.CalledProcessError:
1073            t = 120
1074            logging.debug('Unable to offload dummy file to %s, sleeping for %s '
1075                          'seconds.', gs_uri, t)
1076            time.sleep(t)
1077    logging.debug('Dummy file write check to gs succeeded.')
1078
1079
1080class Offloader(object):
1081    """State of the offload process.
1082
1083    Contains the following member fields:
1084      * _gs_offloader:  _BaseGSOffloader to use to offload a job directory.
1085      * _jobdir_classes:  List of classes of job directory to be
1086        offloaded.
1087      * _processes:  Maximum number of outstanding offload processes
1088        to allow during an offload cycle.
1089      * _age_limit:  Minimum age in days at which a job may be
1090        offloaded.
1091      * _open_jobs: a dictionary mapping directory paths to Job
1092        objects.
1093    """
1094
1095    def __init__(self, options):
1096        self._upload_age_limit = options.age_to_upload
1097        self._delete_age_limit = options.age_to_delete
1098        if options.delete_only:
1099            self._gs_offloader = FakeGSOffloader()
1100        else:
1101            self.gs_uri = utils.get_offload_gsuri()
1102            logging.debug('Offloading to: %s', self.gs_uri)
1103            multiprocessing = False
1104            if options.multiprocessing:
1105                multiprocessing = True
1106            elif options.multiprocessing is None:
1107                multiprocessing = GS_OFFLOADER_MULTIPROCESSING
1108            logging.info(
1109                    'Offloader multiprocessing is set to:%r', multiprocessing)
1110            console_client = None
1111            if (cloud_console_client and
1112                    cloud_console_client.is_cloud_notification_enabled()):
1113                console_client = cloud_console_client.PubSubBasedClient()
1114            self._gs_offloader = GSOffloader(
1115                    self.gs_uri, multiprocessing, self._delete_age_limit,
1116                    console_client)
1117        classlist = [
1118                job_directories.SwarmingJobDirectory,
1119        ]
1120        if options.process_hosts_only or options.process_all:
1121            classlist.append(job_directories.SpecialJobDirectory)
1122        if not options.process_hosts_only:
1123            classlist.append(job_directories.RegularJobDirectory)
1124        self._jobdir_classes = classlist
1125        assert self._jobdir_classes
1126        self._processes = options.parallelism
1127        self._open_jobs = {}
1128        self._pusub_topic = None
1129        self._offload_count_limit = 3
1130
1131
1132    def _add_new_jobs(self):
1133        """Find new job directories that need offloading.
1134
1135        Go through the file system looking for valid job directories
1136        that are currently not in `self._open_jobs`, and add them in.
1137
1138        """
1139        new_job_count = 0
1140        for cls in self._jobdir_classes:
1141            for resultsdir in cls.get_job_directories():
1142                if resultsdir in self._open_jobs:
1143                    continue
1144                self._open_jobs[resultsdir] = cls(resultsdir)
1145                new_job_count += 1
1146        logging.debug('Start of offload cycle - found %d new jobs',
1147                      new_job_count)
1148
1149
1150    def _remove_offloaded_jobs(self):
1151        """Removed offloaded jobs from `self._open_jobs`."""
1152        removed_job_count = 0
1153        for jobkey, job in self._open_jobs.items():
1154            if (
1155                    not os.path.exists(job.dirname)
1156                    or _is_uploaded(job.dirname)):
1157                del self._open_jobs[jobkey]
1158                removed_job_count += 1
1159        logging.debug('End of offload cycle - cleared %d jobs, '
1160                      'carrying %d open jobs',
1161                      removed_job_count, len(self._open_jobs))
1162
1163
1164    def _report_failed_jobs(self):
1165        """Report status after attempting offload.
1166
1167        This function processes all jobs in `self._open_jobs`, assuming
1168        an attempt has just been made to offload all of them.
1169
1170        If any jobs have reportable errors, and we haven't generated
1171        an e-mail report in the last `REPORT_INTERVAL_SECS` seconds,
1172        send new e-mail describing the failures.
1173
1174        """
1175        failed_jobs = [j for j in self._open_jobs.values() if
1176                       j.first_offload_start]
1177        self._report_failed_jobs_count(failed_jobs)
1178        self._log_failed_jobs_locally(failed_jobs)
1179
1180
1181    def offload_once(self):
1182        """Perform one offload cycle.
1183
1184        Find all job directories for new jobs that we haven't seen
1185        before.  Then, attempt to offload the directories for any
1186        jobs that have finished running.  Offload of multiple jobs
1187        is done in parallel, up to `self._processes` at a time.
1188
1189        After we've tried uploading all directories, go through the list
1190        checking the status of all uploaded directories.  If necessary,
1191        report failures via e-mail.
1192
1193        """
1194        self._add_new_jobs()
1195        self._report_current_jobs_count()
1196        with parallel.BackgroundTaskRunner(
1197                self._gs_offloader.offload, processes=self._processes) as queue:
1198            for job in self._open_jobs.values():
1199                _enqueue_offload(job, queue, self._upload_age_limit)
1200        self._give_up_on_jobs_over_limit()
1201        self._remove_offloaded_jobs()
1202        self._report_failed_jobs()
1203
1204
1205    def _give_up_on_jobs_over_limit(self):
1206        """Give up on jobs that have gone over the offload limit.
1207
1208        We mark them as uploaded as we won't try to offload them any more.
1209        """
1210        for job in self._open_jobs.values():
1211            if job.offload_count >= self._offload_count_limit:
1212                _mark_uploaded(job.dirname)
1213
1214
1215    def _log_failed_jobs_locally(self, failed_jobs,
1216                                 log_file=FAILED_OFFLOADS_FILE):
1217        """Updates a local file listing all the failed jobs.
1218
1219        The dropped file can be used by the developers to list jobs that we have
1220        failed to upload.
1221
1222        @param failed_jobs: A list of failed _JobDirectory objects.
1223        @param log_file: The file to log the failed jobs to.
1224        """
1225        now = datetime.datetime.now()
1226        now_str = now.strftime(FAILED_OFFLOADS_TIME_FORMAT)
1227        formatted_jobs = [_format_job_for_failure_reporting(job)
1228                            for job in failed_jobs]
1229        formatted_jobs.sort()
1230
1231        with open(log_file, 'w') as logfile:
1232            logfile.write(FAILED_OFFLOADS_FILE_HEADER %
1233                          (now_str, len(failed_jobs)))
1234            logfile.writelines(formatted_jobs)
1235
1236
1237    def _report_current_jobs_count(self):
1238        """Report the number of outstanding jobs to monarch."""
1239        metrics.Gauge('chromeos/autotest/gs_offloader/current_jobs_count').set(
1240                len(self._open_jobs))
1241
1242
1243    def _report_failed_jobs_count(self, failed_jobs):
1244        """Report the number of outstanding failed offload jobs to monarch.
1245
1246        @param: List of failed jobs.
1247        """
1248        metrics.Gauge('chromeos/autotest/gs_offloader/failed_jobs_count').set(
1249                len(failed_jobs))
1250
1251
1252def _enqueue_offload(job, queue, age_limit):
1253    """Enqueue the job for offload, if it's eligible.
1254
1255    The job is eligible for offloading if the database has marked
1256    it finished, and the job is older than the `age_limit`
1257    parameter.
1258
1259    If the job is eligible, offload processing is requested by
1260    passing the `queue` parameter's `put()` method a sequence with
1261    the job's `dirname` attribute and its directory name.
1262
1263    @param job       _JobDirectory instance to offload.
1264    @param queue     If the job should be offloaded, put the offload
1265                     parameters into this queue for processing.
1266    @param age_limit Minimum age for a job to be offloaded.  A value
1267                     of 0 means that the job will be offloaded as
1268                     soon as it is finished.
1269
1270    """
1271    if not job.offload_count:
1272        if not _is_expired(job, age_limit):
1273            return
1274        job.first_offload_start = time.time()
1275    job.offload_count += 1
1276    if job.process_gs_instructions():
1277        timestamp = _cached_get_timestamp_if_finished(job)
1278        queue.put([job.dirname, os.path.dirname(job.dirname), timestamp])
1279
1280
1281def parse_options():
1282    """Parse the args passed into gs_offloader."""
1283    defaults = 'Defaults:\n  Destination: %s\n  Results Path: %s' % (
1284            utils.DEFAULT_OFFLOAD_GSURI, RESULTS_DIR)
1285    usage = 'usage: %prog [options]\n' + defaults
1286    parser = OptionParser(usage)
1287    parser.add_option('-a', '--all', dest='process_all',
1288                      action='store_true',
1289                      help='Offload all files in the results directory.')
1290    parser.add_option('-s', '--hosts', dest='process_hosts_only',
1291                      action='store_true',
1292                      help='Offload only the special tasks result files '
1293                      'located in the results/hosts subdirectory')
1294    parser.add_option('-p', '--parallelism', dest='parallelism',
1295                      type='int', default=1,
1296                      help='Number of parallel workers to use.')
1297    parser.add_option('-o', '--delete_only', dest='delete_only',
1298                      action='store_true',
1299                      help='GS Offloader will only the delete the '
1300                      'directories and will not offload them to google '
1301                      'storage. NOTE: If global_config variable '
1302                      'CROS.gs_offloading_enabled is False, --delete_only '
1303                      'is automatically True.',
1304                      default=not GS_OFFLOADING_ENABLED)
1305    parser.add_option('-d', '--days_old', dest='days_old',
1306                      help='Minimum job age in days before a result can be '
1307                      'offloaded.', type='int', default=0)
1308    parser.add_option('-l', '--log_size', dest='log_size',
1309                      help='Limit the offloader logs to a specified '
1310                      'number of Mega Bytes.', type='int', default=0)
1311    parser.add_option('-m', dest='multiprocessing', action='store_true',
1312                      help='Turn on -m option for gsutil. If not set, the '
1313                      'global config setting gs_offloader_multiprocessing '
1314                      'under CROS section is applied.')
1315    parser.add_option('-i', '--offload_once', dest='offload_once',
1316                      action='store_true',
1317                      help='Upload all available results and then exit.')
1318    parser.add_option('-y', '--normal_priority', dest='normal_priority',
1319                      action='store_true',
1320                      help='Upload using normal process priority.')
1321    parser.add_option('-u', '--age_to_upload', dest='age_to_upload',
1322                      help='Minimum job age in days before a result can be '
1323                      'offloaded, but not removed from local storage',
1324                      type='int', default=None)
1325    parser.add_option('-n', '--age_to_delete', dest='age_to_delete',
1326                      help='Minimum job age in days before a result can be '
1327                      'removed from local storage',
1328                      type='int', default=None)
1329    parser.add_option(
1330            '--metrics-file',
1331            help='If provided, drop metrics to this local file instead of '
1332                 'reporting to ts_mon',
1333            type=str,
1334            default=None,
1335    )
1336    parser.add_option('-t', '--enable_timestamp_cache',
1337                      dest='enable_timestamp_cache',
1338                      action='store_true',
1339                      help='Cache the finished timestamps from AFE.')
1340
1341    options = parser.parse_args()[0]
1342    if options.process_all and options.process_hosts_only:
1343        parser.print_help()
1344        print ('Cannot process all files and only the hosts '
1345               'subdirectory. Please remove an argument.')
1346        sys.exit(1)
1347
1348    if options.days_old and (options.age_to_upload or options.age_to_delete):
1349        parser.print_help()
1350        print('Use the days_old option or the age_to_* options but not both')
1351        sys.exit(1)
1352
1353    if options.age_to_upload == None:
1354        options.age_to_upload = options.days_old
1355    if options.age_to_delete == None:
1356        options.age_to_delete = options.days_old
1357
1358    return options
1359
1360
1361def main():
1362    """Main method of gs_offloader."""
1363    options = parse_options()
1364
1365    if options.process_all:
1366        offloader_type = 'all'
1367    elif options.process_hosts_only:
1368        offloader_type = 'hosts'
1369    else:
1370        offloader_type = 'jobs'
1371
1372    _setup_logging(options, offloader_type)
1373
1374    if options.enable_timestamp_cache:
1375        # Extend the cache expiry time by another 1% so the timstamps
1376        # are available as the results are purged.
1377        job_timestamp_cache.setup(options.age_to_delete * 1.01)
1378
1379    # Nice our process (carried to subprocesses) so we don't overload
1380    # the system.
1381    if not options.normal_priority:
1382        logging.debug('Set process to nice value: %d', NICENESS)
1383        os.nice(NICENESS)
1384    if psutil:
1385        proc = psutil.Process()
1386        logging.debug('Set process to ionice IDLE')
1387        proc.ionice(psutil.IOPRIO_CLASS_IDLE)
1388
1389    # os.listdir returns relative paths, so change to where we need to
1390    # be to avoid an os.path.join on each loop.
1391    logging.debug('Offloading Autotest results in %s', RESULTS_DIR)
1392    os.chdir(RESULTS_DIR)
1393
1394    service_name = 'gs_offloader(%s)' % offloader_type
1395    with ts_mon_config.SetupTsMonGlobalState(service_name, indirect=True,
1396                                             short_lived=False,
1397                                             debug_file=options.metrics_file):
1398        with metrics.SuccessCounter('chromeos/autotest/gs_offloader/exit'):
1399            offloader = Offloader(options)
1400            if not options.delete_only:
1401                wait_for_gs_write_access(offloader.gs_uri)
1402            while True:
1403                offloader.offload_once()
1404                if options.offload_once:
1405                    break
1406                time.sleep(SLEEP_TIME_SECS)
1407
1408
1409_LOG_LOCATION = '/usr/local/autotest/logs/'
1410_LOG_FILENAME_FORMAT = 'gs_offloader_%s_log_%s.txt'
1411_LOG_TIMESTAMP_FORMAT = '%Y%m%d_%H%M%S'
1412_LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
1413
1414
1415def _setup_logging(options, offloader_type):
1416    """Set up logging.
1417
1418    @param options: Parsed options.
1419    @param offloader_type: Type of offloader action as string.
1420    """
1421    log_filename = _get_log_filename(options, offloader_type)
1422    log_formatter = logging.Formatter(_LOGGING_FORMAT)
1423    # Replace the default logging handler with a RotatingFileHandler. If
1424    # options.log_size is 0, the file size will not be limited. Keeps
1425    # one backup just in case.
1426    handler = logging.handlers.RotatingFileHandler(
1427            log_filename, maxBytes=1024 * options.log_size, backupCount=1)
1428    handler.setFormatter(log_formatter)
1429    logger = logging.getLogger()
1430    logger.setLevel(logging.DEBUG)
1431    logger.addHandler(handler)
1432
1433
1434def _get_log_filename(options, offloader_type):
1435    """Get log filename.
1436
1437    @param options: Parsed options.
1438    @param offloader_type: Type of offloader action as string.
1439    """
1440    if options.log_size > 0:
1441        log_timestamp = ''
1442    else:
1443        log_timestamp = time.strftime(_LOG_TIMESTAMP_FORMAT)
1444    log_basename = _LOG_FILENAME_FORMAT % (offloader_type, log_timestamp)
1445    return os.path.join(_LOG_LOCATION, log_basename)
1446
1447
1448if __name__ == '__main__':
1449    main()
1450