1#!/usr/bin/python
2#
3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Script to archive old Autotest results to Google Storage.
8
9Uses gsutil to archive files to the configured Google Storage bucket.
10Upon successful copy, the local results directory is deleted.
11"""
12
13import datetime
14import errno
15import logging
16import logging.handlers
17import os
18import re
19import shutil
20import signal
21import socket
22import subprocess
23import sys
24import tempfile
25import time
26
27from optparse import OptionParser
28
29import common
30from autotest_lib.client.common_lib import error
31from autotest_lib.client.common_lib import utils
32from autotest_lib.site_utils import job_directories
33
34try:
35    # Does not exist, nor is needed, on moblab.
36    import psutil
37except ImportError:
38    psutil = None
39
40import job_directories
41from autotest_lib.client.common_lib import global_config
42from autotest_lib.client.common_lib.cros.graphite import autotest_stats
43from autotest_lib.scheduler import email_manager
44from chromite.lib import parallel
45
46
47GS_OFFLOADING_ENABLED = global_config.global_config.get_config_value(
48        'CROS', 'gs_offloading_enabled', type=bool, default=True)
49
50STATS_KEY = 'gs_offloader.%s' % socket.gethostname().replace('.', '_')
51METADATA_TYPE = 'result_dir_size'
52
53timer = autotest_stats.Timer(STATS_KEY)
54
55# Nice setting for process, the higher the number the lower the priority.
56NICENESS = 10
57
58# Maximum number of seconds to allow for offloading a single
59# directory.
60OFFLOAD_TIMEOUT_SECS = 60 * 60
61
62# Sleep time per loop.
63SLEEP_TIME_SECS = 5
64
65# Minimum number of seconds between e-mail reports.
66REPORT_INTERVAL_SECS = 60 * 60
67
68# Location of Autotest results on disk.
69RESULTS_DIR = '/usr/local/autotest/results'
70
71# Hosts sub-directory that contains cleanup, verify and repair jobs.
72HOSTS_SUB_DIR = 'hosts'
73
74LOG_LOCATION = '/usr/local/autotest/logs/'
75LOG_FILENAME_FORMAT = 'gs_offloader_%s_log_%s.txt'
76LOG_TIMESTAMP_FORMAT = '%Y%m%d_%H%M%S'
77LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
78
79# pylint: disable=E1120
80NOTIFY_ADDRESS = global_config.global_config.get_config_value(
81        'SCHEDULER', 'notify_email', default='')
82
83ERROR_EMAIL_HELPER_URL = 'http://go/cros-triage-gsoffloader'
84ERROR_EMAIL_SUBJECT_FORMAT = 'GS Offloader notifications from %s'
85ERROR_EMAIL_REPORT_FORMAT = '''\
86gs_offloader is failing to offload results directories.
87
88Check %s to triage the issue.
89
90First failure       Count   Directory name
91=================== ======  ==============================
92''' % ERROR_EMAIL_HELPER_URL
93# --+----1----+----  ----+  ----+----1----+----2----+----3
94
95ERROR_EMAIL_DIRECTORY_FORMAT = '%19s  %5d  %-1s\n'
96ERROR_EMAIL_TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
97
98USE_RSYNC_ENABLED = global_config.global_config.get_config_value(
99        'CROS', 'gs_offloader_use_rsync', type=bool, default=False)
100
101# According to https://cloud.google.com/storage/docs/bucket-naming#objectnames
102INVALID_GS_CHARS = ['[', ']', '*', '?', '#']
103INVALID_GS_CHAR_RANGE = [(0x00, 0x1F), (0x7F, 0x84), (0x86, 0xFF)]
104
105# Maximum number of files in the folder.
106MAX_FILE_COUNT = 500
107FOLDERS_NEVER_ZIP = ['debug', 'ssp_logs']
108LIMIT_FILE_COUNT = global_config.global_config.get_config_value(
109        'CROS', 'gs_offloader_limit_file_count', type=bool, default=False)
110
111
112class TimeoutException(Exception):
113    """Exception raised by the timeout_handler."""
114    pass
115
116
117def timeout_handler(_signum, _frame):
118    """Handler for SIGALRM when the offloading process times out.
119
120    @param _signum: Signal number of the signal that was just caught.
121                    14 for SIGALRM.
122    @param _frame: Current stack frame.
123
124    @raise TimeoutException: Automatically raises so that the time out
125                             is caught by the try/except surrounding the
126                             Popen call.
127    """
128    raise TimeoutException('Process Timed Out')
129
130
131def get_cmd_list(multiprocessing, dir_entry, gs_path):
132    """Return the command to offload a specified directory.
133
134    @param multiprocessing: True to turn on -m option for gsutil.
135    @param dir_entry: Directory entry/path that which we need a cmd_list
136                      to offload.
137    @param gs_path: Location in google storage where we will
138                    offload the directory.
139
140    @return A command list to be executed by Popen.
141    """
142    cmd = ['gsutil']
143    if multiprocessing:
144        cmd.append('-m')
145    if USE_RSYNC_ENABLED:
146        cmd.append('rsync')
147        target = os.path.join(gs_path, os.path.basename(dir_entry))
148    else:
149        cmd.append('cp')
150        target = gs_path
151    cmd += ['-eR', dir_entry, target]
152    return cmd
153
154
155def get_directory_size_kibibytes_cmd_list(directory):
156    """Returns command to get a directory's total size."""
157    # Having this in its own method makes it easier to mock in
158    # unittests.
159    return ['du', '-sk', directory]
160
161
162def get_directory_size_kibibytes(directory):
163    """Calculate the total size of a directory with all its contents.
164
165    @param directory: Path to the directory
166
167    @return Size of the directory in kibibytes.
168    """
169    cmd = get_directory_size_kibibytes_cmd_list(directory)
170    process = subprocess.Popen(cmd,
171                               stdout=subprocess.PIPE,
172                               stderr=subprocess.PIPE)
173    stdout_data, stderr_data = process.communicate()
174
175    if process.returncode != 0:
176        # This function is used for statistics only, if it fails,
177        # nothing else should crash.
178        logging.warning('Getting size of %s failed. Stderr:', directory)
179        logging.warning(stderr_data)
180        return 0
181
182    return int(stdout_data.split('\t', 1)[0])
183
184
185def get_sanitized_name(name):
186    """Get a string with all invalid characters in the name being replaced.
187
188    @param name: Name to be processed.
189
190    @return A string with all invalid characters in the name being
191             replaced.
192    """
193    match_pattern = ''.join([re.escape(c) for c in INVALID_GS_CHARS])
194    match_pattern += ''.join([r'\x%02x-\x%02x' % (r[0], r[1])
195                              for r in INVALID_GS_CHAR_RANGE])
196    invalid = re.compile('[%s]' % match_pattern)
197    return invalid.sub(lambda x: '%%%02x' % ord(x.group(0)), name)
198
199
200def sanitize_dir(dir_entry):
201    """Replace all invalid characters in folder and file names with valid ones.
202
203    @param dir_entry: Directory entry to be sanitized.
204    """
205    if not os.path.exists(dir_entry):
206        return
207    renames = []
208    for root, dirs, files in os.walk(dir_entry):
209        sanitized_root = get_sanitized_name(root)
210        for name in dirs + files:
211            sanitized_name = get_sanitized_name(name)
212            if name != sanitized_name:
213                orig_path = os.path.join(sanitized_root, name)
214                rename_path = os.path.join(sanitized_root,
215                                           sanitized_name)
216                renames.append((orig_path, rename_path))
217    for src, dest in renames:
218        logging.warn('Invalid character found. Renaming %s to %s.',
219                     src, dest)
220        shutil.move(src, dest)
221
222
223def _get_zippable_folders(dir_entry):
224    folders_list = []
225    for folder in os.listdir(dir_entry):
226        folder_path = os.path.join(dir_entry, folder)
227        if (not os.path.isfile(folder_path) and
228                not folder in FOLDERS_NEVER_ZIP):
229            folders_list.append(folder_path)
230    return folders_list
231
232
233def limit_file_count(dir_entry):
234    """Limit the number of files in given directory.
235
236    The method checks the total number of files in the given directory.
237    If the number is greater than MAX_FILE_COUNT, the method will
238    compress each folder in the given directory, except folders in
239    FOLDERS_NEVER_ZIP.
240
241    @param dir_entry: Directory entry to be checked.
242    """
243    count = utils.run('find "%s" | wc -l' % dir_entry,
244                      ignore_status=True).stdout.strip()
245    try:
246        count = int(count)
247    except ValueError, TypeError:
248        logging.warn('Fail to get the file count in folder %s.',
249                     dir_entry)
250        return
251    if count < MAX_FILE_COUNT:
252        return
253
254    # For test job, zip folders in a second level, e.g. 123-debug/host1.
255    # This is to allow autoserv debug folder still be accessible.
256    # For special task, it does not need to dig one level deeper.
257    is_special_task = re.match(job_directories.SPECIAL_TASK_PATTERN,
258                               dir_entry)
259
260    folders = _get_zippable_folders(dir_entry)
261    if not is_special_task:
262        subfolders = []
263        for folder in folders:
264            subfolders.extend(_get_zippable_folders(folder))
265        folders = subfolders
266
267    for folder in folders:
268        try:
269            zip_name = '%s.tgz' % folder
270            utils.run('tar -cz -C "%s" -f "%s" "%s"' %
271                      (os.path.dirname(folder), zip_name,
272                       os.path.basename(folder)))
273        except error.CmdError as e:
274            logging.error('Fail to compress folder %s. Error: %s',
275                          folder, e)
276            continue
277        shutil.rmtree(folder)
278
279
280def correct_results_folder_permission(dir_entry):
281    """Make sure the results folder has the right permission settings.
282
283    For tests running with server-side packaging, the results folder has
284    the owner of root. This must be changed to the user running the
285    autoserv process, so parsing job can access the results folder.
286
287    @param dir_entry: Path to the results folder.
288    """
289    if not dir_entry:
290        return
291    try:
292        subprocess.check_call(
293                ['sudo', '-n', 'chown', '-R', str(os.getuid()), dir_entry])
294        subprocess.check_call(
295                ['sudo', '-n', 'chgrp', '-R', str(os.getgid()), dir_entry])
296    except subprocess.CalledProcessError as e:
297        logging.error('Failed to modify permission for %s: %s',
298                      dir_entry, e)
299
300
301def get_offload_dir_func(gs_uri, multiprocessing):
302    """Returns the offload directory function for the given gs_uri
303
304    @param gs_uri: Google storage bucket uri to offload to.
305    @param multiprocessing: True to turn on -m option for gsutil.
306
307    @return offload_dir function to perform the offload.
308    """
309    @timer.decorate
310    def offload_dir(dir_entry, dest_path):
311        """Offload the specified directory entry to Google storage.
312
313        @param dir_entry: Directory entry to offload.
314        @param dest_path: Location in google storage where we will
315                          offload the directory.
316
317        """
318        try:
319            counter = autotest_stats.Counter(STATS_KEY)
320            counter.increment('jobs_offload_started')
321
322            sanitize_dir(dir_entry)
323
324            if LIMIT_FILE_COUNT:
325                limit_file_count(dir_entry)
326
327            error = False
328            stdout_file = tempfile.TemporaryFile('w+')
329            stderr_file = tempfile.TemporaryFile('w+')
330            process = None
331            signal.alarm(OFFLOAD_TIMEOUT_SECS)
332            gs_path = '%s%s' % (gs_uri, dest_path)
333            process = subprocess.Popen(
334                    get_cmd_list(multiprocessing, dir_entry, gs_path),
335                    stdout=stdout_file, stderr=stderr_file)
336            process.wait()
337            signal.alarm(0)
338
339            if process.returncode == 0:
340                dir_size = get_directory_size_kibibytes(dir_entry)
341
342                counter.increment('kibibytes_transferred_total',
343                                  dir_size)
344                metadata = {
345                    '_type': METADATA_TYPE,
346                    'size_KB': dir_size,
347                    'result_dir': dir_entry,
348                    'drone': socket.gethostname().replace('.', '_')
349                }
350                autotest_stats.Gauge(STATS_KEY, metadata=metadata).send(
351                        'kibibytes_transferred', dir_size)
352                counter.increment('jobs_offloaded')
353                shutil.rmtree(dir_entry)
354            else:
355                error = True
356        except TimeoutException:
357            # If we finished the call to Popen(), we may need to
358            # terminate the child process.  We don't bother calling
359            # process.poll(); that inherently races because the child
360            # can die any time it wants.
361            if process:
362                try:
363                    process.terminate()
364                except OSError:
365                    # We don't expect any error other than "No such
366                    # process".
367                    pass
368            logging.error('Offloading %s timed out after waiting %d '
369                          'seconds.', dir_entry, OFFLOAD_TIMEOUT_SECS)
370            error = True
371        except OSError as e:
372            # The wrong file permission can lead call
373            # `shutil.rmtree(dir_entry)` to raise OSError with message
374            # 'Permission denied'. Details can be found in
375            # crbug.com/536151
376            if e.errno == errno.EACCES:
377                logging.warn('Try to correct file permission of %s.', dir_entry)
378                correct_results_folder_permission(dir_entry)
379        finally:
380            signal.alarm(0)
381            if error:
382                # Rewind the log files for stdout and stderr and log
383                # their contents.
384                stdout_file.seek(0)
385                stderr_file.seek(0)
386                stderr_content = stderr_file.read()
387                logging.error('Error occurred when offloading %s:',
388                              dir_entry)
389                logging.error('Stdout:\n%s \nStderr:\n%s',
390                              stdout_file.read(), stderr_content)
391                # Some result files may have wrong file permission. Try
392                # to correct such error so later try can success.
393                # TODO(dshi): The code is added to correct result files
394                # with wrong file permission caused by bug 511778. After
395                # this code is pushed to lab and run for a while to
396                # clean up these files, following code and function
397                # correct_results_folder_permission can be deleted.
398                if 'CommandException: Error opening file' in stderr_content:
399                    logging.warn('Try to correct file permission of %s.',
400                                 dir_entry)
401                    correct_results_folder_permission(dir_entry)
402            stdout_file.close()
403            stderr_file.close()
404    return offload_dir
405
406
407def delete_files(dir_entry, dest_path):
408    """Simply deletes the dir_entry from the filesystem.
409
410    Uses same arguments as offload_dir so that it can be used in replace
411    of it on systems that only want to delete files instead of
412    offloading them.
413
414    @param dir_entry: Directory entry to offload.
415    @param dest_path: NOT USED.
416    """
417    shutil.rmtree(dir_entry)
418
419
420def report_offload_failures(joblist):
421    """Generate e-mail notification for failed offloads.
422
423    The e-mail report will include data from all jobs in `joblist`.
424
425    @param joblist List of jobs to be reported in the message.
426    """
427    def _format_job(job):
428        d = datetime.datetime.fromtimestamp(job.get_failure_time())
429        data = (d.strftime(ERROR_EMAIL_TIME_FORMAT),
430                job.get_failure_count(),
431                job.get_job_directory())
432        return ERROR_EMAIL_DIRECTORY_FORMAT % data
433    joblines = [_format_job(job) for job in joblist]
434    joblines.sort()
435    email_subject = ERROR_EMAIL_SUBJECT_FORMAT % socket.gethostname()
436    email_message = ERROR_EMAIL_REPORT_FORMAT + ''.join(joblines)
437    email_manager.manager.send_email(NOTIFY_ADDRESS, email_subject,
438                                     email_message)
439
440
441def wait_for_gs_write_access(gs_uri):
442    """Verify and wait until we have write access to Google Storage.
443
444    @param gs_uri: The Google Storage URI we are trying to offload to.
445    """
446    # TODO (sbasi) Try to use the gsutil command to check write access.
447    # Ensure we have write access to gs_uri.
448    dummy_file = tempfile.NamedTemporaryFile()
449    test_cmd = get_cmd_list(False, dummy_file.name, gs_uri)
450    while True:
451        try:
452            subprocess.check_call(test_cmd)
453            subprocess.check_call(
454                    ['gsutil', 'rm',
455                     os.path.join(gs_uri,
456                                  os.path.basename(dummy_file.name))])
457            break
458        except subprocess.CalledProcessError:
459            logging.debug('Unable to offload to %s, sleeping.', gs_uri)
460            time.sleep(120)
461
462
463class Offloader(object):
464    """State of the offload process.
465
466    Contains the following member fields:
467      * _offload_func:  Function to call for each attempt to offload
468        a job directory.
469      * _jobdir_classes:  List of classes of job directory to be
470        offloaded.
471      * _processes:  Maximum number of outstanding offload processes
472        to allow during an offload cycle.
473      * _age_limit:  Minimum age in days at which a job may be
474        offloaded.
475      * _open_jobs: a dictionary mapping directory paths to Job
476        objects.
477      * _next_report_time:  Earliest time that we should send e-mail
478        if there are failures to be reported.
479    """
480
481    def __init__(self, options):
482        if options.delete_only:
483            self._offload_func = delete_files
484        else:
485            self.gs_uri = utils.get_offload_gsuri()
486            logging.debug('Offloading to: %s', self.gs_uri)
487            self._offload_func = get_offload_dir_func(
488                    self.gs_uri, options.multiprocessing)
489        classlist = []
490        if options.process_hosts_only or options.process_all:
491            classlist.append(job_directories.SpecialJobDirectory)
492        if not options.process_hosts_only:
493            classlist.append(job_directories.RegularJobDirectory)
494        self._jobdir_classes = classlist
495        assert self._jobdir_classes
496        self._processes = options.parallelism
497        self._age_limit = options.days_old
498        self._open_jobs = {}
499        self._next_report_time = time.time()
500
501
502    def _add_new_jobs(self):
503        """Find new job directories that need offloading.
504
505        Go through the file system looking for valid job directories
506        that are currently not in `self._open_jobs`, and add them in.
507
508        """
509        new_job_count = 0
510        for cls in self._jobdir_classes:
511            for resultsdir in cls.get_job_directories():
512                if resultsdir in self._open_jobs:
513                    continue
514                self._open_jobs[resultsdir] = cls(resultsdir)
515                new_job_count += 1
516        logging.debug('Start of offload cycle - found %d new jobs',
517                      new_job_count)
518
519
520    def _remove_offloaded_jobs(self):
521        """Removed offloaded jobs from `self._open_jobs`."""
522        removed_job_count = 0
523        for jobkey, job in self._open_jobs.items():
524            if job.is_offloaded():
525                del self._open_jobs[jobkey]
526                removed_job_count += 1
527        logging.debug('End of offload cycle - cleared %d new jobs, '
528                      'carrying %d open jobs',
529                      removed_job_count, len(self._open_jobs))
530
531
532    def _have_reportable_errors(self):
533        """Return whether any jobs need reporting via e-mail.
534
535        @return True if there are reportable jobs in `self._open_jobs`,
536                or False otherwise.
537        """
538        for job in self._open_jobs.values():
539            if job.is_reportable():
540                return True
541        return False
542
543
544    def _update_offload_results(self):
545        """Check and report status after attempting offload.
546
547        This function processes all jobs in `self._open_jobs`, assuming
548        an attempt has just been made to offload all of them.
549
550        Any jobs that have been successfully offloaded are removed.
551
552        If any jobs have reportable errors, and we haven't generated
553        an e-mail report in the last `REPORT_INTERVAL_SECS` seconds,
554        send new e-mail describing the failures.
555
556        """
557        self._remove_offloaded_jobs()
558        if self._have_reportable_errors():
559            # N.B. We include all jobs that have failed at least once,
560            # which may include jobs that aren't otherwise reportable.
561            failed_jobs = [j for j in self._open_jobs.values()
562                           if j.get_failure_time()]
563            logging.debug('Currently there are %d jobs with offload '
564                          'failures', len(failed_jobs))
565            if time.time() >= self._next_report_time:
566                logging.debug('Reporting failures by e-mail')
567                report_offload_failures(failed_jobs)
568                self._next_report_time = (
569                        time.time() + REPORT_INTERVAL_SECS)
570
571
572    def offload_once(self):
573        """Perform one offload cycle.
574
575        Find all job directories for new jobs that we haven't seen
576        before.  Then, attempt to offload the directories for any
577        jobs that have finished running.  Offload of multiple jobs
578        is done in parallel, up to `self._processes` at a time.
579
580        After we've tried uploading all directories, go through the list
581        checking the status of all uploaded directories.  If necessary,
582        report failures via e-mail.
583
584        """
585        self._add_new_jobs()
586        with parallel.BackgroundTaskRunner(
587                self._offload_func, processes=self._processes) as queue:
588            for job in self._open_jobs.values():
589                job.enqueue_offload(queue, self._age_limit)
590        self._update_offload_results()
591
592
593def parse_options():
594    """Parse the args passed into gs_offloader."""
595    defaults = 'Defaults:\n  Destination: %s\n  Results Path: %s' % (
596            utils.DEFAULT_OFFLOAD_GSURI, RESULTS_DIR)
597    usage = 'usage: %prog [options]\n' + defaults
598    parser = OptionParser(usage)
599    parser.add_option('-a', '--all', dest='process_all',
600                      action='store_true',
601                      help='Offload all files in the results directory.')
602    parser.add_option('-s', '--hosts', dest='process_hosts_only',
603                      action='store_true',
604                      help='Offload only the special tasks result files '
605                      'located in the results/hosts subdirectory')
606    parser.add_option('-p', '--parallelism', dest='parallelism',
607                      type='int', default=1,
608                      help='Number of parallel workers to use.')
609    parser.add_option('-o', '--delete_only', dest='delete_only',
610                      action='store_true',
611                      help='GS Offloader will only the delete the '
612                      'directories and will not offload them to google '
613                      'storage. NOTE: If global_config variable '
614                      'CROS.gs_offloading_enabled is False, --delete_only '
615                      'is automatically True.',
616                      default=not GS_OFFLOADING_ENABLED)
617    parser.add_option('-d', '--days_old', dest='days_old',
618                      help='Minimum job age in days before a result can be '
619                      'offloaded.', type='int', default=0)
620    parser.add_option('-l', '--log_size', dest='log_size',
621                      help='Limit the offloader logs to a specified '
622                      'number of Mega Bytes.', type='int', default=0)
623    parser.add_option('-m', dest='multiprocessing', action='store_true',
624                      help='Turn on -m option for gsutil.',
625                      default=False)
626    options = parser.parse_args()[0]
627    if options.process_all and options.process_hosts_only:
628        parser.print_help()
629        print ('Cannot process all files and only the hosts '
630               'subdirectory. Please remove an argument.')
631        sys.exit(1)
632    return options
633
634
635def main():
636    """Main method of gs_offloader."""
637    options = parse_options()
638
639    if options.process_all:
640        offloader_type = 'all'
641    elif options.process_hosts_only:
642        offloader_type = 'hosts'
643    else:
644        offloader_type = 'jobs'
645
646    log_timestamp = time.strftime(LOG_TIMESTAMP_FORMAT)
647    if options.log_size > 0:
648        log_timestamp = ''
649    log_basename = LOG_FILENAME_FORMAT % (offloader_type, log_timestamp)
650    log_filename = os.path.join(LOG_LOCATION, log_basename)
651    log_formatter = logging.Formatter(LOGGING_FORMAT)
652    # Replace the default logging handler with a RotatingFileHandler. If
653    # options.log_size is 0, the file size will not be limited. Keeps
654    # one backup just in case.
655    handler = logging.handlers.RotatingFileHandler(
656            log_filename, maxBytes=1024 * options.log_size, backupCount=1)
657    handler.setFormatter(log_formatter)
658    logger = logging.getLogger()
659    logger.setLevel(logging.DEBUG)
660    logger.addHandler(handler)
661
662    # Nice our process (carried to subprocesses) so we don't overload
663    # the system.
664    logging.debug('Set process to nice value: %d', NICENESS)
665    os.nice(NICENESS)
666    if psutil:
667        proc = psutil.Process()
668        logging.debug('Set process to ionice IDLE')
669        proc.ionice(psutil.IOPRIO_CLASS_IDLE)
670
671    # os.listdir returns relative paths, so change to where we need to
672    # be to avoid an os.path.join on each loop.
673    logging.debug('Offloading Autotest results in %s', RESULTS_DIR)
674    os.chdir(RESULTS_DIR)
675
676    signal.signal(signal.SIGALRM, timeout_handler)
677
678    offloader = Offloader(options)
679    if not options.delete_only:
680        wait_for_gs_write_access(offloader.gs_uri)
681    while True:
682        offloader.offload_once()
683        time.sleep(SLEEP_TIME_SECS)
684
685
686if __name__ == '__main__':
687    main()
688