1#!/usr/bin/python
2#
3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Script to archive old Autotest results to Google Storage.
8
9Uses gsutil to archive files to the configured Google Storage bucket.
10Upon successful copy, the local results directory is deleted.
11"""
12
13import abc
14import datetime
15import errno
16import glob
17import gzip
18import logging
19import logging.handlers
20import os
21import re
22import shutil
23import socket
24import stat
25import subprocess
26import sys
27import tarfile
28import tempfile
29import time
30
31from optparse import OptionParser
32
33import common
34from autotest_lib.client.common_lib import file_utils
35from autotest_lib.client.common_lib import global_config
36from autotest_lib.client.common_lib import utils
37from autotest_lib.site_utils import job_directories
38# For unittest, the cloud_console.proto is not compiled yet.
39try:
40    from autotest_lib.site_utils import cloud_console_client
41except ImportError:
42    cloud_console_client = None
43from autotest_lib.tko import models
44from autotest_lib.utils import labellib
45from autotest_lib.utils import gslib
46from chromite.lib import timeout_util
47
48# Autotest requires the psutil module from site-packages, so it must be imported
49# after "import common".
50try:
51    # Does not exist, nor is needed, on moblab.
52    import psutil
53except ImportError:
54    psutil = None
55
56from chromite.lib import parallel
57try:
58    from chromite.lib import metrics
59    from chromite.lib import ts_mon_config
60except ImportError:
61    metrics = utils.metrics_mock
62    ts_mon_config = utils.metrics_mock
63
64
65GS_OFFLOADING_ENABLED = global_config.global_config.get_config_value(
66        'CROS', 'gs_offloading_enabled', type=bool, default=True)
67
68# Nice setting for process, the higher the number the lower the priority.
69NICENESS = 10
70
71# Maximum number of seconds to allow for offloading a single
72# directory.
73OFFLOAD_TIMEOUT_SECS = 60 * 60
74
75# Sleep time per loop.
76SLEEP_TIME_SECS = 5
77
78# Minimum number of seconds between e-mail reports.
79REPORT_INTERVAL_SECS = 60 * 60
80
81# Location of Autotest results on disk.
82RESULTS_DIR = '/usr/local/autotest/results'
83FAILED_OFFLOADS_FILE = os.path.join(RESULTS_DIR, 'FAILED_OFFLOADS')
84
85FAILED_OFFLOADS_FILE_HEADER = '''
86This is the list of gs_offloader failed jobs.
87Last offloader attempt at %s failed to offload %d files.
88Check http://go/cros-triage-gsoffloader to triage the issue
89
90
91First failure       Count   Directory name
92=================== ======  ==============================
93'''
94# --+----1----+----  ----+  ----+----1----+----2----+----3
95
96FAILED_OFFLOADS_LINE_FORMAT = '%19s  %5d  %-1s\n'
97FAILED_OFFLOADS_TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
98
99USE_RSYNC_ENABLED = global_config.global_config.get_config_value(
100        'CROS', 'gs_offloader_use_rsync', type=bool, default=False)
101
102LIMIT_FILE_COUNT = global_config.global_config.get_config_value(
103        'CROS', 'gs_offloader_limit_file_count', type=bool, default=False)
104
105# Use multiprocessing for gsutil uploading.
106GS_OFFLOADER_MULTIPROCESSING = global_config.global_config.get_config_value(
107        'CROS', 'gs_offloader_multiprocessing', type=bool, default=False)
108
109D = '[0-9][0-9]'
110TIMESTAMP_PATTERN = '%s%s.%s.%s_%s.%s.%s' % (D, D, D, D, D, D, D)
111CTS_RESULT_PATTERN = 'testResult.xml'
112CTS_V2_RESULT_PATTERN = 'test_result.xml'
113# Google Storage bucket URI to store results in.
114DEFAULT_CTS_RESULTS_GSURI = global_config.global_config.get_config_value(
115        'CROS', 'cts_results_server', default='')
116DEFAULT_CTS_APFE_GSURI = global_config.global_config.get_config_value(
117        'CROS', 'cts_apfe_server', default='')
118
119# metadata type
120GS_OFFLOADER_SUCCESS_TYPE = 'gs_offloader_success'
121GS_OFFLOADER_FAILURE_TYPE = 'gs_offloader_failure'
122
123# Autotest test to collect list of CTS tests
124TEST_LIST_COLLECTOR = 'tradefed-run-collect-tests-only'
125
126def _get_metrics_fields(dir_entry):
127    """Get metrics fields for the given test result directory, including board
128    and milestone.
129
130    @param dir_entry: Directory entry to offload.
131    @return A dictionary for the metrics data to be uploaded.
132    """
133    fields = {'board': 'unknown',
134              'milestone': 'unknown'}
135    if dir_entry:
136        # There could be multiple hosts in the job directory, use the first one
137        # available.
138        for host in glob.glob(os.path.join(dir_entry, '*')):
139            try:
140                keyval = models.test.parse_job_keyval(host)
141            except ValueError:
142                continue
143            build = keyval.get('build')
144            if build:
145                try:
146                    cros_version = labellib.parse_cros_version(build)
147                    fields['board'] = cros_version.board
148                    fields['milestone'] = cros_version.milestone
149                    break
150                except ValueError:
151                    # Ignore version parsing error so it won't crash
152                    # gs_offloader.
153                    pass
154
155    return fields;
156
157
158def _get_es_metadata(dir_entry):
159    """Get ES metadata for the given test result directory.
160
161    @param dir_entry: Directory entry to offload.
162    @return A dictionary for the metadata to be uploaded.
163    """
164    fields = _get_metrics_fields(dir_entry)
165    fields['hostname'] = socket.gethostname()
166    # Include more data about the test job in metadata.
167    if dir_entry:
168        fields['dir_entry'] = dir_entry
169        fields['job_id'] = job_directories.get_job_id_or_task_id(dir_entry)
170
171    return fields
172
173
174def _get_cmd_list(multiprocessing, dir_entry, gs_path):
175    """Return the command to offload a specified directory.
176
177    @param multiprocessing: True to turn on -m option for gsutil.
178    @param dir_entry: Directory entry/path that which we need a cmd_list
179                      to offload.
180    @param gs_path: Location in google storage where we will
181                    offload the directory.
182
183    @return A command list to be executed by Popen.
184    """
185    cmd = ['gsutil']
186    if multiprocessing:
187        cmd.append('-m')
188    if USE_RSYNC_ENABLED:
189        cmd.append('rsync')
190        target = os.path.join(gs_path, os.path.basename(dir_entry))
191    else:
192        cmd.append('cp')
193        target = gs_path
194    cmd += ['-eR', dir_entry, target]
195    return cmd
196
197
198def sanitize_dir(dirpath):
199    """Sanitize directory for gs upload.
200
201    Symlinks and FIFOS are converted to regular files to fix bugs.
202
203    @param dirpath: Directory entry to be sanitized.
204    """
205    if not os.path.exists(dirpath):
206        return
207    _escape_rename(dirpath)
208    _escape_rename_dir_contents(dirpath)
209    _sanitize_fifos(dirpath)
210    _sanitize_symlinks(dirpath)
211
212
213def _escape_rename_dir_contents(dirpath):
214    """Recursively rename directory to escape filenames for gs upload.
215
216    @param dirpath: Directory path string.
217    """
218    for filename in os.listdir(dirpath):
219        path = os.path.join(dirpath, filename)
220        _escape_rename(path)
221    for filename in os.listdir(dirpath):
222        path = os.path.join(dirpath, filename)
223        if os.path.isdir(path):
224            _escape_rename_dir_contents(path)
225
226
227def _escape_rename(path):
228    """Rename file to escape filenames for gs upload.
229
230    @param path: File path string.
231    """
232    dirpath, filename = os.path.split(path)
233    sanitized_filename = gslib.escape(filename)
234    sanitized_path = os.path.join(dirpath, sanitized_filename)
235    os.rename(path, sanitized_path)
236
237
238def _sanitize_fifos(dirpath):
239    """Convert fifos to regular files (fixes crbug.com/684122).
240
241    @param dirpath: Directory path string.
242    """
243    for root, _, files in os.walk(dirpath):
244        for filename in files:
245            path = os.path.join(root, filename)
246            file_stat = os.lstat(path)
247            if stat.S_ISFIFO(file_stat.st_mode):
248                _replace_fifo_with_file(path)
249
250
251def _replace_fifo_with_file(path):
252    """Replace a fifo with a normal file.
253
254    @param path: Fifo path string.
255    """
256    logging.debug('Removing fifo %s', path)
257    os.remove(path)
258    logging.debug('Creating marker %s', path)
259    with open(path, 'w') as f:
260        f.write('<FIFO>')
261
262
263def _sanitize_symlinks(dirpath):
264    """Convert Symlinks to regular files (fixes crbug.com/692788).
265
266    @param dirpath: Directory path string.
267    """
268    for root, _, files in os.walk(dirpath):
269        for filename in files:
270            path = os.path.join(root, filename)
271            file_stat = os.lstat(path)
272            if stat.S_ISLNK(file_stat.st_mode):
273                _replace_symlink_with_file(path)
274
275
276def _replace_symlink_with_file(path):
277    """Replace a symlink with a normal file.
278
279    @param path: Symlink path string.
280    """
281    target = os.readlink(path)
282    logging.debug('Removing symlink %s', path)
283    os.remove(path)
284    logging.debug('Creating marker %s', path)
285    with open(path, 'w') as f:
286        f.write('<symlink to %s>' % target)
287
288
289# Maximum number of files in the folder.
290_MAX_FILE_COUNT = 500
291_FOLDERS_NEVER_ZIP = ['debug', 'ssp_logs', 'autoupdate_logs']
292
293
294def _get_zippable_folders(dir_entry):
295    folders_list = []
296    for folder in os.listdir(dir_entry):
297        folder_path = os.path.join(dir_entry, folder)
298        if (not os.path.isfile(folder_path) and
299                not folder in _FOLDERS_NEVER_ZIP):
300            folders_list.append(folder_path)
301    return folders_list
302
303
304def limit_file_count(dir_entry):
305    """Limit the number of files in given directory.
306
307    The method checks the total number of files in the given directory.
308    If the number is greater than _MAX_FILE_COUNT, the method will
309    compress each folder in the given directory, except folders in
310    _FOLDERS_NEVER_ZIP.
311
312    @param dir_entry: Directory entry to be checked.
313    """
314    try:
315        count = _count_files(dir_entry)
316    except ValueError:
317        logging.warning('Fail to get the file count in folder %s.', dir_entry)
318        return
319    if count < _MAX_FILE_COUNT:
320        return
321
322    # For test job, zip folders in a second level, e.g. 123-debug/host1.
323    # This is to allow autoserv debug folder still be accessible.
324    # For special task, it does not need to dig one level deeper.
325    is_special_task = re.match(job_directories.SPECIAL_TASK_PATTERN,
326                               dir_entry)
327
328    folders = _get_zippable_folders(dir_entry)
329    if not is_special_task:
330        subfolders = []
331        for folder in folders:
332            subfolders.extend(_get_zippable_folders(folder))
333        folders = subfolders
334
335    for folder in folders:
336        _make_into_tarball(folder)
337
338
339def _count_files(dirpath):
340    """Count the number of files in a directory recursively.
341
342    @param dirpath: Directory path string.
343    """
344    return sum(len(files) for _path, _dirs, files in os.walk(dirpath))
345
346
347def _make_into_tarball(dirpath):
348    """Make directory into tarball.
349
350    @param dirpath: Directory path string.
351    """
352    tarpath = '%s.tgz' % dirpath
353    with tarfile.open(tarpath, 'w:gz') as tar:
354        tar.add(dirpath, arcname=os.path.basename(dirpath))
355    shutil.rmtree(dirpath)
356
357
358def correct_results_folder_permission(dir_entry):
359    """Make sure the results folder has the right permission settings.
360
361    For tests running with server-side packaging, the results folder has
362    the owner of root. This must be changed to the user running the
363    autoserv process, so parsing job can access the results folder.
364
365    @param dir_entry: Path to the results folder.
366    """
367    if not dir_entry:
368        return
369
370    logging.info('Trying to correct file permission of %s.', dir_entry)
371    try:
372        owner = '%s:%s' % (os.getuid(), os.getgid())
373        subprocess.check_call(
374                ['sudo', '-n', 'chown', '-R', owner, dir_entry])
375        subprocess.check_call(['chmod', '-R', 'u+r', dir_entry])
376        subprocess.check_call(
377                ['find', dir_entry, '-type', 'd',
378                 '-exec', 'chmod', 'u+x', '{}', ';'])
379    except subprocess.CalledProcessError as e:
380        logging.error('Failed to modify permission for %s: %s',
381                      dir_entry, e)
382
383
384def _upload_cts_testresult(dir_entry, multiprocessing):
385    """Upload test results to separate gs buckets.
386
387    Upload testResult.xml.gz/test_result.xml.gz file to cts_results_bucket.
388    Upload timestamp.zip to cts_apfe_bucket.
389
390    @param dir_entry: Path to the results folder.
391    @param multiprocessing: True to turn on -m option for gsutil.
392    """
393    for host in glob.glob(os.path.join(dir_entry, '*')):
394        cts_path = os.path.join(host, 'cheets_CTS.*', 'results', '*',
395                                TIMESTAMP_PATTERN)
396        cts_v2_path = os.path.join(host, 'cheets_CTS_*', 'results', '*',
397                                   TIMESTAMP_PATTERN)
398        gts_v2_path = os.path.join(host, 'cheets_GTS.*', 'results', '*',
399                                   TIMESTAMP_PATTERN)
400        for result_path, result_pattern in [(cts_path, CTS_RESULT_PATTERN),
401                            (cts_v2_path, CTS_V2_RESULT_PATTERN),
402                            (gts_v2_path, CTS_V2_RESULT_PATTERN)]:
403            for path in glob.glob(result_path):
404                try:
405                    _upload_files(host, path, result_pattern, multiprocessing)
406                except Exception as e:
407                    logging.error('ERROR uploading test results %s to GS: %s',
408                                  path, e)
409
410
411def _is_valid_result(build, result_pattern, suite):
412    """Check if the result should be uploaded to CTS/GTS buckets.
413
414    @param build: Builder name.
415    @param result_pattern: XML result file pattern.
416    @param suite: Test suite name.
417
418    @returns: Bool flag indicating whether a valid result.
419    """
420    if build is None or suite is None:
421        return False
422
423    # Not valid if it's not a release build.
424    if not re.match(r'(?!trybot-).*-release/.*', build):
425        return False
426
427    # Not valid if it's cts result but not 'arc-cts*' or 'test_that_wrapper'
428    # suite.
429    result_patterns = [CTS_RESULT_PATTERN, CTS_V2_RESULT_PATTERN]
430    if result_pattern in result_patterns and not (
431            suite.startswith('arc-cts') or suite.startswith('arc-gts') or
432            suite.startswith('test_that_wrapper')):
433        return False
434
435    return True
436
437
438def _is_test_collector(package):
439    """Returns true if the test run is just to collect list of CTS tests.
440
441    @param package: Autotest package name. e.g. cheets_CTS_N.CtsGraphicsTestCase
442
443    @return Bool flag indicating a test package is CTS list generator or not.
444    """
445    return TEST_LIST_COLLECTOR in package
446
447
448def _upload_files(host, path, result_pattern, multiprocessing):
449    keyval = models.test.parse_job_keyval(host)
450    build = keyval.get('build')
451    suite = keyval.get('suite')
452
453    if not _is_valid_result(build, result_pattern, suite):
454        # No need to upload current folder, return.
455        return
456
457    parent_job_id = str(keyval['parent_job_id'])
458
459    folders = path.split(os.sep)
460    job_id = folders[-6]
461    package = folders[-4]
462    timestamp = folders[-1]
463
464    # Results produced by CTS test list collector are dummy results.
465    # They don't need to be copied to APFE bucket which is mainly being used for
466    # CTS APFE submission.
467    if not _is_test_collector(package):
468        # Path: bucket/build/parent_job_id/cheets_CTS.*/job_id_timestamp/
469        # or bucket/build/parent_job_id/cheets_GTS.*/job_id_timestamp/
470        cts_apfe_gs_path = os.path.join(
471                DEFAULT_CTS_APFE_GSURI, build, parent_job_id,
472                package, job_id + '_' + timestamp) + '/'
473
474        for zip_file in glob.glob(os.path.join('%s.zip' % path)):
475            utils.run(' '.join(_get_cmd_list(
476                    multiprocessing, zip_file, cts_apfe_gs_path)))
477            logging.debug('Upload %s to %s ', zip_file, cts_apfe_gs_path)
478    else:
479        logging.debug('%s is a CTS Test collector Autotest test run.', package)
480        logging.debug('Skipping CTS results upload to APFE gs:// bucket.')
481
482    # Path: bucket/cheets_CTS.*/job_id_timestamp/
483    # or bucket/cheets_GTS.*/job_id_timestamp/
484    test_result_gs_path = os.path.join(
485            DEFAULT_CTS_RESULTS_GSURI, package,
486            job_id + '_' + timestamp) + '/'
487
488    for test_result_file in glob.glob(os.path.join(path, result_pattern)):
489        # gzip test_result_file(testResult.xml/test_result.xml)
490
491        test_result_file_gz =  '%s.gz' % test_result_file
492        with open(test_result_file, 'r') as f_in, (
493                gzip.open(test_result_file_gz, 'w')) as f_out:
494            shutil.copyfileobj(f_in, f_out)
495        utils.run(' '.join(_get_cmd_list(
496                multiprocessing, test_result_file_gz, test_result_gs_path)))
497        logging.debug('Zip and upload %s to %s',
498                      test_result_file_gz, test_result_gs_path)
499        # Remove test_result_file_gz(testResult.xml.gz/test_result.xml.gz)
500        os.remove(test_result_file_gz)
501
502
503def _emit_gs_returncode_metric(returncode):
504    """Increment the gs_returncode counter based on |returncode|."""
505    m_gs_returncode = 'chromeos/autotest/gs_offloader/gs_returncode'
506    rcode = int(returncode)
507    if rcode < 0 or rcode > 255:
508        rcode = -1
509    metrics.Counter(m_gs_returncode).increment(fields={'return_code': rcode})
510
511
512def _handle_dir_os_error(dir_entry, fix_permission=False):
513    """Try to fix the result directory's permission issue if needed.
514
515    @param dir_entry: Directory entry to offload.
516    @param fix_permission: True to change the directory's owner to the same one
517            running gs_offloader.
518    """
519    if fix_permission:
520        correct_results_folder_permission(dir_entry)
521    m_permission_error = ('chromeos/autotest/errors/gs_offloader/'
522                          'wrong_permissions_count')
523    metrics_fields = _get_metrics_fields(dir_entry)
524    metrics.Counter(m_permission_error).increment(fields=metrics_fields)
525
526
527class BaseGSOffloader(object):
528
529    """Google Storage offloader interface."""
530
531    __metaclass__ = abc.ABCMeta
532
533    def offload(self, dir_entry, dest_path, job_complete_time):
534        """Safely offload a directory entry to Google Storage.
535
536        This method is responsible for copying the contents of
537        `dir_entry` to Google storage at `dest_path`.
538
539        When successful, the method must delete all of `dir_entry`.
540        On failure, `dir_entry` should be left undisturbed, in order
541        to allow for retry.
542
543        Errors are conveyed simply and solely by two methods:
544          * At the time of failure, write enough information to the log
545            to allow later debug, if necessary.
546          * Don't delete the content.
547
548        In order to guarantee robustness, this method must not raise any
549        exceptions.
550
551        @param dir_entry: Directory entry to offload.
552        @param dest_path: Location in google storage where we will
553                          offload the directory.
554        @param job_complete_time: The complete time of the job from the AFE
555                                  database.
556        """
557        try:
558            self._full_offload(dir_entry, dest_path, job_complete_time)
559        except Exception as e:
560            logging.debug('Exception in offload for %s', dir_entry)
561            logging.debug('Ignoring this error: %s', str(e))
562
563    @abc.abstractmethod
564    def _full_offload(self, dir_entry, dest_path, job_complete_time):
565        """Offload a directory entry to Google Storage.
566
567        This method implements the actual offload behavior of its
568        subclass.  To guarantee effective debug, this method should
569        catch all exceptions, and perform any reasonable diagnosis
570        or other handling.
571
572        @param dir_entry: Directory entry to offload.
573        @param dest_path: Location in google storage where we will
574                          offload the directory.
575        @param job_complete_time: The complete time of the job from the AFE
576                                  database.
577        """
578
579
580class GSOffloader(BaseGSOffloader):
581    """Google Storage Offloader."""
582
583    def __init__(self, gs_uri, multiprocessing, delete_age,
584            console_client=None):
585        """Returns the offload directory function for the given gs_uri
586
587        @param gs_uri: Google storage bucket uri to offload to.
588        @param multiprocessing: True to turn on -m option for gsutil.
589        @param console_client: The cloud console client. If None,
590          cloud console APIs are  not called.
591        """
592        self._gs_uri = gs_uri
593        self._multiprocessing = multiprocessing
594        self._delete_age = delete_age
595        self._console_client = console_client
596
597    @metrics.SecondsTimerDecorator(
598            'chromeos/autotest/gs_offloader/job_offload_duration')
599    def _full_offload(self, dir_entry, dest_path, job_complete_time):
600        """Offload the specified directory entry to Google storage.
601
602        @param dir_entry: Directory entry to offload.
603        @param dest_path: Location in google storage where we will
604                          offload the directory.
605        @param job_complete_time: The complete time of the job from the AFE
606                                  database.
607        """
608        with tempfile.TemporaryFile('w+') as stdout_file, \
609             tempfile.TemporaryFile('w+') as stderr_file:
610            try:
611                try:
612                    self._try_offload(dir_entry, dest_path, stdout_file,
613                                      stderr_file)
614                except OSError as e:
615                    # Correct file permission error of the directory, then raise
616                    # the exception so gs_offloader can retry later.
617                    _handle_dir_os_error(dir_entry, e.errno==errno.EACCES)
618                    # Try again after the permission issue is fixed.
619                    self._try_offload(dir_entry, dest_path, stdout_file,
620                                      stderr_file)
621            except _OffloadError as e:
622                metrics_fields = _get_metrics_fields(dir_entry)
623                m_any_error = 'chromeos/autotest/errors/gs_offloader/any_error'
624                metrics.Counter(m_any_error).increment(fields=metrics_fields)
625
626                # Rewind the log files for stdout and stderr and log
627                # their contents.
628                stdout_file.seek(0)
629                stderr_file.seek(0)
630                stderr_content = stderr_file.read()
631                logging.warning('Error occurred when offloading %s:', dir_entry)
632                logging.warning('Stdout:\n%s \nStderr:\n%s', stdout_file.read(),
633                                stderr_content)
634
635                # Some result files may have wrong file permission. Try
636                # to correct such error so later try can success.
637                # TODO(dshi): The code is added to correct result files
638                # with wrong file permission caused by bug 511778. After
639                # this code is pushed to lab and run for a while to
640                # clean up these files, following code and function
641                # correct_results_folder_permission can be deleted.
642                if 'CommandException: Error opening file' in stderr_content:
643                    correct_results_folder_permission(dir_entry)
644            else:
645                self._prune(dir_entry, job_complete_time)
646
647    def _try_offload(self, dir_entry, dest_path,
648                 stdout_file, stderr_file):
649        """Offload the specified directory entry to Google storage.
650
651        @param dir_entry: Directory entry to offload.
652        @param dest_path: Location in google storage where we will
653                          offload the directory.
654        @param job_complete_time: The complete time of the job from the AFE
655                                  database.
656        @param stdout_file: Log file.
657        @param stderr_file: Log file.
658        """
659        if _is_uploaded(dir_entry):
660            return
661        start_time = time.time()
662        metrics_fields = _get_metrics_fields(dir_entry)
663        es_metadata = _get_es_metadata(dir_entry)
664        error_obj = _OffloadError(start_time, es_metadata)
665        try:
666            sanitize_dir(dir_entry)
667            if DEFAULT_CTS_RESULTS_GSURI:
668                _upload_cts_testresult(dir_entry, self._multiprocessing)
669
670            if LIMIT_FILE_COUNT:
671                limit_file_count(dir_entry)
672            es_metadata['size_kb'] = file_utils.get_directory_size_kibibytes(dir_entry)
673
674            process = None
675            with timeout_util.Timeout(OFFLOAD_TIMEOUT_SECS):
676                gs_path = '%s%s' % (self._gs_uri, dest_path)
677                process = subprocess.Popen(
678                        _get_cmd_list(self._multiprocessing, dir_entry, gs_path),
679                        stdout=stdout_file, stderr=stderr_file)
680                process.wait()
681
682            _emit_gs_returncode_metric(process.returncode)
683            if process.returncode != 0:
684                raise error_obj
685            _emit_offload_metrics(dir_entry)
686
687            if self._console_client:
688                gcs_uri = os.path.join(gs_path,
689                        os.path.basename(dir_entry))
690                if not self._console_client.send_test_job_offloaded_message(
691                        gcs_uri):
692                    raise error_obj
693
694            _mark_uploaded(dir_entry)
695        except timeout_util.TimeoutError:
696            m_timeout = 'chromeos/autotest/errors/gs_offloader/timed_out_count'
697            metrics.Counter(m_timeout).increment(fields=metrics_fields)
698            # If we finished the call to Popen(), we may need to
699            # terminate the child process.  We don't bother calling
700            # process.poll(); that inherently races because the child
701            # can die any time it wants.
702            if process:
703                try:
704                    process.terminate()
705                except OSError:
706                    # We don't expect any error other than "No such
707                    # process".
708                    pass
709            logging.error('Offloading %s timed out after waiting %d '
710                          'seconds.', dir_entry, OFFLOAD_TIMEOUT_SECS)
711            raise error_obj
712
713    def _prune(self, dir_entry, job_complete_time):
714        """Prune directory if it is uploaded and expired.
715
716        @param dir_entry: Directory entry to offload.
717        @param job_complete_time: The complete time of the job from the AFE
718                                  database.
719        """
720        if not (_is_uploaded(dir_entry)
721                and job_directories.is_job_expired(self._delete_age,
722                                                   job_complete_time)):
723            return
724        try:
725            shutil.rmtree(dir_entry)
726        except OSError as e:
727            # The wrong file permission can lead call `shutil.rmtree(dir_entry)`
728            # to raise OSError with message 'Permission denied'. Details can be
729            # found in crbug.com/536151
730            _handle_dir_os_error(dir_entry, e.errno==errno.EACCES)
731            # Try again after the permission issue is fixed.
732            shutil.rmtree(dir_entry)
733
734
735class _OffloadError(Exception):
736    """Google Storage offload failed."""
737
738    def __init__(self, start_time, es_metadata):
739        super(_OffloadError, self).__init__(start_time, es_metadata)
740        self.start_time = start_time
741        self.es_metadata = es_metadata
742
743
744
745class FakeGSOffloader(BaseGSOffloader):
746
747    """Fake Google Storage Offloader that only deletes directories."""
748
749    def _full_offload(self, dir_entry, dest_path, job_complete_time):
750        """Pretend to offload a directory and delete it.
751
752        @param dir_entry: Directory entry to offload.
753        @param dest_path: Location in google storage where we will
754                          offload the directory.
755        @param job_complete_time: The complete time of the job from the AFE
756                                  database.
757        """
758        shutil.rmtree(dir_entry)
759
760
761def _is_expired(job, age_limit):
762    """Return whether job directory is expired for uploading
763
764    @param job: _JobDirectory instance.
765    @param age_limit:  Minimum age in days at which a job may be offloaded.
766    """
767    job_timestamp = job.get_timestamp_if_finished()
768    if not job_timestamp:
769        return False
770    return job_directories.is_job_expired(age_limit, job_timestamp)
771
772
773def _emit_offload_metrics(dirpath):
774    """Emit gs offload metrics.
775
776    @param dirpath: Offloaded directory path.
777    """
778    dir_size = file_utils.get_directory_size_kibibytes(dirpath)
779    metrics_fields = _get_metrics_fields(dirpath)
780
781    m_offload_count = (
782            'chromeos/autotest/gs_offloader/jobs_offloaded')
783    metrics.Counter(m_offload_count).increment(
784            fields=metrics_fields)
785    m_offload_size = ('chromeos/autotest/gs_offloader/'
786                      'kilobytes_transferred')
787    metrics.Counter(m_offload_size).increment_by(
788            dir_size, fields=metrics_fields)
789
790
791def _is_uploaded(dirpath):
792    """Return whether directory has been uploaded.
793
794    @param dirpath: Directory path string.
795    """
796    return os.path.isfile(_get_uploaded_marker_file(dirpath))
797
798
799def _mark_uploaded(dirpath):
800    """Mark directory as uploaded.
801
802    @param dirpath: Directory path string.
803    """
804    with open(_get_uploaded_marker_file(dirpath), 'a'):
805        pass
806
807
808def _get_uploaded_marker_file(dirpath):
809    """Return path to upload marker file for directory.
810
811    @param dirpath: Directory path string.
812    """
813    return '%s/.GS_UPLOADED' % (dirpath,)
814
815
816def _format_job_for_failure_reporting(job):
817    """Formats a _JobDirectory for reporting / logging.
818
819    @param job: The _JobDirectory to format.
820    """
821    d = datetime.datetime.fromtimestamp(job.first_offload_start)
822    data = (d.strftime(FAILED_OFFLOADS_TIME_FORMAT),
823            job.offload_count,
824            job.dirname)
825    return FAILED_OFFLOADS_LINE_FORMAT % data
826
827
828def wait_for_gs_write_access(gs_uri):
829    """Verify and wait until we have write access to Google Storage.
830
831    @param gs_uri: The Google Storage URI we are trying to offload to.
832    """
833    # TODO (sbasi) Try to use the gsutil command to check write access.
834    # Ensure we have write access to gs_uri.
835    dummy_file = tempfile.NamedTemporaryFile()
836    test_cmd = _get_cmd_list(False, dummy_file.name, gs_uri)
837    while True:
838        try:
839            subprocess.check_call(test_cmd)
840            subprocess.check_call(
841                    ['gsutil', 'rm',
842                     os.path.join(gs_uri,
843                                  os.path.basename(dummy_file.name))])
844            break
845        except subprocess.CalledProcessError:
846            logging.debug('Unable to offload to %s, sleeping.', gs_uri)
847            time.sleep(120)
848
849
850class Offloader(object):
851    """State of the offload process.
852
853    Contains the following member fields:
854      * _gs_offloader:  _BaseGSOffloader to use to offload a job directory.
855      * _jobdir_classes:  List of classes of job directory to be
856        offloaded.
857      * _processes:  Maximum number of outstanding offload processes
858        to allow during an offload cycle.
859      * _age_limit:  Minimum age in days at which a job may be
860        offloaded.
861      * _open_jobs: a dictionary mapping directory paths to Job
862        objects.
863    """
864
865    def __init__(self, options):
866        self._upload_age_limit = options.age_to_upload
867        self._delete_age_limit = options.age_to_delete
868        if options.delete_only:
869            self._gs_offloader = FakeGSOffloader()
870        else:
871            self.gs_uri = utils.get_offload_gsuri()
872            logging.debug('Offloading to: %s', self.gs_uri)
873            multiprocessing = False
874            if options.multiprocessing:
875                multiprocessing = True
876            elif options.multiprocessing is None:
877                multiprocessing = GS_OFFLOADER_MULTIPROCESSING
878            logging.info(
879                    'Offloader multiprocessing is set to:%r', multiprocessing)
880            console_client = None
881            if (cloud_console_client and
882                    cloud_console_client.is_cloud_notification_enabled()):
883                console_client = cloud_console_client.PubSubBasedClient()
884            self._gs_offloader = GSOffloader(
885                    self.gs_uri, multiprocessing, self._delete_age_limit,
886                    console_client)
887        classlist = []
888        if options.process_hosts_only or options.process_all:
889            classlist.append(job_directories.SpecialJobDirectory)
890        if not options.process_hosts_only:
891            classlist.append(job_directories.RegularJobDirectory)
892        self._jobdir_classes = classlist
893        assert self._jobdir_classes
894        self._processes = options.parallelism
895        self._open_jobs = {}
896        self._pusub_topic = None
897        self._offload_count_limit = 3
898
899
900    def _add_new_jobs(self):
901        """Find new job directories that need offloading.
902
903        Go through the file system looking for valid job directories
904        that are currently not in `self._open_jobs`, and add them in.
905
906        """
907        new_job_count = 0
908        for cls in self._jobdir_classes:
909            for resultsdir in cls.get_job_directories():
910                if (
911                        resultsdir in self._open_jobs
912                        or _is_uploaded(resultsdir)):
913                    continue
914                self._open_jobs[resultsdir] = cls(resultsdir)
915                new_job_count += 1
916        logging.debug('Start of offload cycle - found %d new jobs',
917                      new_job_count)
918
919
920    def _remove_offloaded_jobs(self):
921        """Removed offloaded jobs from `self._open_jobs`."""
922        removed_job_count = 0
923        for jobkey, job in self._open_jobs.items():
924            if (
925                    not os.path.exists(job.dirname)
926                    or _is_uploaded(job.dirname)):
927                del self._open_jobs[jobkey]
928                removed_job_count += 1
929        logging.debug('End of offload cycle - cleared %d new jobs, '
930                      'carrying %d open jobs',
931                      removed_job_count, len(self._open_jobs))
932
933
934    def _report_failed_jobs(self):
935        """Report status after attempting offload.
936
937        This function processes all jobs in `self._open_jobs`, assuming
938        an attempt has just been made to offload all of them.
939
940        If any jobs have reportable errors, and we haven't generated
941        an e-mail report in the last `REPORT_INTERVAL_SECS` seconds,
942        send new e-mail describing the failures.
943
944        """
945        failed_jobs = [j for j in self._open_jobs.values() if
946                       j.first_offload_start]
947        self._report_failed_jobs_count(failed_jobs)
948        self._log_failed_jobs_locally(failed_jobs)
949
950
951    def offload_once(self):
952        """Perform one offload cycle.
953
954        Find all job directories for new jobs that we haven't seen
955        before.  Then, attempt to offload the directories for any
956        jobs that have finished running.  Offload of multiple jobs
957        is done in parallel, up to `self._processes` at a time.
958
959        After we've tried uploading all directories, go through the list
960        checking the status of all uploaded directories.  If necessary,
961        report failures via e-mail.
962
963        """
964        self._add_new_jobs()
965        self._report_current_jobs_count()
966        with parallel.BackgroundTaskRunner(
967                self._gs_offloader.offload, processes=self._processes) as queue:
968            for job in self._open_jobs.values():
969                _enqueue_offload(job, queue, self._upload_age_limit)
970        self._give_up_on_jobs_over_limit()
971        self._remove_offloaded_jobs()
972        self._report_failed_jobs()
973
974
975    def _give_up_on_jobs_over_limit(self):
976        """Give up on jobs that have gone over the offload limit.
977
978        We mark them as uploaded as we won't try to offload them any more.
979        """
980        for job in self._open_jobs.values():
981            if job.offload_count >= self._offload_count_limit:
982                _mark_uploaded(job.dirname)
983
984
985    def _log_failed_jobs_locally(self, failed_jobs,
986                                 log_file=FAILED_OFFLOADS_FILE):
987        """Updates a local file listing all the failed jobs.
988
989        The dropped file can be used by the developers to list jobs that we have
990        failed to upload.
991
992        @param failed_jobs: A list of failed _JobDirectory objects.
993        @param log_file: The file to log the failed jobs to.
994        """
995        now = datetime.datetime.now()
996        now_str = now.strftime(FAILED_OFFLOADS_TIME_FORMAT)
997        formatted_jobs = [_format_job_for_failure_reporting(job)
998                            for job in failed_jobs]
999        formatted_jobs.sort()
1000
1001        with open(log_file, 'w') as logfile:
1002            logfile.write(FAILED_OFFLOADS_FILE_HEADER %
1003                          (now_str, len(failed_jobs)))
1004            logfile.writelines(formatted_jobs)
1005
1006
1007    def _report_current_jobs_count(self):
1008        """Report the number of outstanding jobs to monarch."""
1009        metrics.Gauge('chromeos/autotest/gs_offloader/current_jobs_count').set(
1010                len(self._open_jobs))
1011
1012
1013    def _report_failed_jobs_count(self, failed_jobs):
1014        """Report the number of outstanding failed offload jobs to monarch.
1015
1016        @param: List of failed jobs.
1017        """
1018        metrics.Gauge('chromeos/autotest/gs_offloader/failed_jobs_count').set(
1019                len(failed_jobs))
1020
1021
1022def _enqueue_offload(job, queue, age_limit):
1023    """Enqueue the job for offload, if it's eligible.
1024
1025    The job is eligible for offloading if the database has marked
1026    it finished, and the job is older than the `age_limit`
1027    parameter.
1028
1029    If the job is eligible, offload processing is requested by
1030    passing the `queue` parameter's `put()` method a sequence with
1031    the job's `dirname` attribute and its directory name.
1032
1033    @param job       _JobDirectory instance to offload.
1034    @param queue     If the job should be offloaded, put the offload
1035                     parameters into this queue for processing.
1036    @param age_limit Minimum age for a job to be offloaded.  A value
1037                     of 0 means that the job will be offloaded as
1038                     soon as it is finished.
1039
1040    """
1041    if not job.offload_count:
1042        if not _is_expired(job, age_limit):
1043            return
1044        job.first_offload_start = time.time()
1045    job.offload_count += 1
1046    if job.process_gs_instructions():
1047        timestamp = job.get_timestamp_if_finished()
1048        queue.put([job.dirname, os.path.dirname(job.dirname), timestamp])
1049
1050
1051def parse_options():
1052    """Parse the args passed into gs_offloader."""
1053    defaults = 'Defaults:\n  Destination: %s\n  Results Path: %s' % (
1054            utils.DEFAULT_OFFLOAD_GSURI, RESULTS_DIR)
1055    usage = 'usage: %prog [options]\n' + defaults
1056    parser = OptionParser(usage)
1057    parser.add_option('-a', '--all', dest='process_all',
1058                      action='store_true',
1059                      help='Offload all files in the results directory.')
1060    parser.add_option('-s', '--hosts', dest='process_hosts_only',
1061                      action='store_true',
1062                      help='Offload only the special tasks result files '
1063                      'located in the results/hosts subdirectory')
1064    parser.add_option('-p', '--parallelism', dest='parallelism',
1065                      type='int', default=1,
1066                      help='Number of parallel workers to use.')
1067    parser.add_option('-o', '--delete_only', dest='delete_only',
1068                      action='store_true',
1069                      help='GS Offloader will only the delete the '
1070                      'directories and will not offload them to google '
1071                      'storage. NOTE: If global_config variable '
1072                      'CROS.gs_offloading_enabled is False, --delete_only '
1073                      'is automatically True.',
1074                      default=not GS_OFFLOADING_ENABLED)
1075    parser.add_option('-d', '--days_old', dest='days_old',
1076                      help='Minimum job age in days before a result can be '
1077                      'offloaded.', type='int', default=0)
1078    parser.add_option('-l', '--log_size', dest='log_size',
1079                      help='Limit the offloader logs to a specified '
1080                      'number of Mega Bytes.', type='int', default=0)
1081    parser.add_option('-m', dest='multiprocessing', action='store_true',
1082                      help='Turn on -m option for gsutil. If not set, the '
1083                      'global config setting gs_offloader_multiprocessing '
1084                      'under CROS section is applied.')
1085    parser.add_option('-i', '--offload_once', dest='offload_once',
1086                      action='store_true',
1087                      help='Upload all available results and then exit.')
1088    parser.add_option('-y', '--normal_priority', dest='normal_priority',
1089                      action='store_true',
1090                      help='Upload using normal process priority.')
1091    parser.add_option('-u', '--age_to_upload', dest='age_to_upload',
1092                      help='Minimum job age in days before a result can be '
1093                      'offloaded, but not removed from local storage',
1094                      type='int', default=None)
1095    parser.add_option('-n', '--age_to_delete', dest='age_to_delete',
1096                      help='Minimum job age in days before a result can be '
1097                      'removed from local storage',
1098                      type='int', default=None)
1099    parser.add_option(
1100            '--metrics-file',
1101            help='If provided, drop metrics to this local file instead of '
1102                 'reporting to ts_mon',
1103            type=str,
1104            default=None,
1105    )
1106
1107    options = parser.parse_args()[0]
1108    if options.process_all and options.process_hosts_only:
1109        parser.print_help()
1110        print ('Cannot process all files and only the hosts '
1111               'subdirectory. Please remove an argument.')
1112        sys.exit(1)
1113
1114    if options.days_old and (options.age_to_upload or options.age_to_delete):
1115        parser.print_help()
1116        print('Use the days_old option or the age_to_* options but not both')
1117        sys.exit(1)
1118
1119    if options.age_to_upload == None:
1120        options.age_to_upload = options.days_old
1121    if options.age_to_delete == None:
1122        options.age_to_delete = options.days_old
1123
1124    return options
1125
1126
1127def main():
1128    """Main method of gs_offloader."""
1129    options = parse_options()
1130
1131    if options.process_all:
1132        offloader_type = 'all'
1133    elif options.process_hosts_only:
1134        offloader_type = 'hosts'
1135    else:
1136        offloader_type = 'jobs'
1137
1138    _setup_logging(options, offloader_type)
1139
1140    # Nice our process (carried to subprocesses) so we don't overload
1141    # the system.
1142    if not options.normal_priority:
1143        logging.debug('Set process to nice value: %d', NICENESS)
1144        os.nice(NICENESS)
1145    if psutil:
1146        proc = psutil.Process()
1147        logging.debug('Set process to ionice IDLE')
1148        proc.ionice(psutil.IOPRIO_CLASS_IDLE)
1149
1150    # os.listdir returns relative paths, so change to where we need to
1151    # be to avoid an os.path.join on each loop.
1152    logging.debug('Offloading Autotest results in %s', RESULTS_DIR)
1153    os.chdir(RESULTS_DIR)
1154
1155    service_name = 'gs_offloader(%s)' % offloader_type
1156    with ts_mon_config.SetupTsMonGlobalState(service_name, indirect=True,
1157                                             short_lived=False,
1158                                             debug_file=options.metrics_file):
1159        with metrics.SuccessCounter('chromeos/autotest/gs_offloader/exit'):
1160            offloader = Offloader(options)
1161            if not options.delete_only:
1162                wait_for_gs_write_access(offloader.gs_uri)
1163            while True:
1164                offloader.offload_once()
1165                if options.offload_once:
1166                    break
1167                time.sleep(SLEEP_TIME_SECS)
1168
1169
1170_LOG_LOCATION = '/usr/local/autotest/logs/'
1171_LOG_FILENAME_FORMAT = 'gs_offloader_%s_log_%s.txt'
1172_LOG_TIMESTAMP_FORMAT = '%Y%m%d_%H%M%S'
1173_LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
1174
1175
1176def _setup_logging(options, offloader_type):
1177    """Set up logging.
1178
1179    @param options: Parsed options.
1180    @param offloader_type: Type of offloader action as string.
1181    """
1182    log_filename = _get_log_filename(options, offloader_type)
1183    log_formatter = logging.Formatter(_LOGGING_FORMAT)
1184    # Replace the default logging handler with a RotatingFileHandler. If
1185    # options.log_size is 0, the file size will not be limited. Keeps
1186    # one backup just in case.
1187    handler = logging.handlers.RotatingFileHandler(
1188            log_filename, maxBytes=1024 * options.log_size, backupCount=1)
1189    handler.setFormatter(log_formatter)
1190    logger = logging.getLogger()
1191    logger.setLevel(logging.DEBUG)
1192    logger.addHandler(handler)
1193
1194
1195def _get_log_filename(options, offloader_type):
1196    """Get log filename.
1197
1198    @param options: Parsed options.
1199    @param offloader_type: Type of offloader action as string.
1200    """
1201    if options.log_size > 0:
1202        log_timestamp = ''
1203    else:
1204        log_timestamp = time.strftime(_LOG_TIMESTAMP_FORMAT)
1205    log_basename = _LOG_FILENAME_FORMAT % (offloader_type, log_timestamp)
1206    return os.path.join(_LOG_LOCATION, log_basename)
1207
1208
1209if __name__ == '__main__':
1210    main()
1211