1import abc
2import datetime
3import glob
4import json
5import logging
6import os
7import re
8import shutil
9
10import common
11from autotest_lib.client.common_lib import time_utils
12from autotest_lib.client.common_lib import utils
13from autotest_lib.server.cros.dynamic_suite import constants
14from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
15
16try:
17    from chromite.lib import metrics
18except ImportError:
19    metrics = utils.metrics_mock
20
21
22SPECIAL_TASK_PATTERN = '.*/hosts/[^/]+/(\d+)-[^/]+'
23
24def is_job_expired(age_limit, timestamp):
25    """Check whether a job timestamp is older than an age limit.
26
27  @param age_limit: Minimum age, measured in days.  If the value is
28                    not positive, the job is always expired.
29  @param timestamp: Timestamp of the job whose age we are checking.
30                    The format must match time_utils.TIME_FMT.
31
32  @returns True iff the job is old enough to be expired.
33  """
34    if age_limit <= 0:
35        return True
36    job_time = time_utils.time_string_to_datetime(timestamp)
37    expiration = job_time + datetime.timedelta(days=age_limit)
38    return datetime.datetime.now() >= expiration
39
40
41def get_job_id_or_task_id(result_dir):
42    """Extract job id or special task id from result_dir
43
44    @param result_dir: path to the result dir.
45            For test job:
46            /usr/local/autotest/results/2032-chromeos-test/chromeos1-rack5-host6
47            The hostname at the end is optional.
48            For special task:
49            /usr/local/autotest/results/hosts/chromeos1-rack5-host6/1343-cleanup
50
51    @returns: str representing the job id or task id. Returns None if fail
52        to parse job or task id from the result_dir.
53    """
54    if not result_dir:
55        return
56    result_dir = os.path.abspath(result_dir)
57    # Result folder for job running inside container has only job id.
58    ssp_job_pattern = '.*/(\d+)$'
59    # Try to get the job ID from the last pattern of number-text. This avoids
60    # issue with path like 123-results/456-debug_user, in which 456 is the real
61    # job ID.
62    m_job = re.findall('.*/(\d+)-[^/]+', result_dir)
63    if m_job:
64        return m_job[-1]
65    m_special_task = re.match(SPECIAL_TASK_PATTERN, result_dir)
66    if m_special_task:
67        return m_special_task.group(1)
68    m_ssp_job_pattern = re.match(ssp_job_pattern, result_dir)
69    if m_ssp_job_pattern and utils.is_in_container():
70        return m_ssp_job_pattern.group(1)
71    return _get_swarming_run_id(result_dir)
72
73
74def _get_swarming_run_id(path):
75    """Extract the Swarming run_id for a Skylab task from the result path."""
76    # Legacy swarming results are in directories like
77    #   .../results/swarming-3e4391423c3a4311
78    # In particular, the ending digit is never 0
79    m_legacy_path = re.match('.*/swarming-([0-9a-fA-F]*[1-9a-fA-F])$', path)
80    if m_legacy_path:
81        return m_legacy_path.group(1)
82    # New style swarming results are in directories like
83    #   .../results/swarming-3e4391423c3a4310/1
84    # - Results are one directory deeper.
85    # - Ending digit of first directory is always 0.
86    m_path = re.match('.*/swarming-([0-9a-fA-F]*)0/([1-9a-fA-F])$', path)
87    if m_path:
88        return m_path.group(1) + m_path.group(2)
89    return None
90
91
92class _JobDirectory(object):
93    """State associated with a job to be offloaded.
94
95  The full life-cycle of a job (including failure events that
96  normally don't occur) looks like this:
97   1. The job's results directory is discovered by
98      `get_job_directories()`, and a job instance is created for it.
99   2. Calls to `offload()` have no effect so long as the job
100      isn't complete in the database and the job isn't expired
101      according to the `age_limit` parameter.
102   3. Eventually, the job is both finished and expired.  The next
103      call to `offload()` makes the first attempt to offload the
104      directory to GS.  Offload is attempted, but fails to complete
105      (e.g. because of a GS problem).
106   4. Finally, a call to `offload()` succeeds, and the directory no
107      longer exists.  Now `is_offloaded()` is true, so the job
108      instance is deleted, and future failures will not mention this
109      directory any more.
110
111  Only steps 1. and 4. are guaranteed to occur.  The others depend
112  on the timing of calls to `offload()`, and on the reliability of
113  the actual offload process.
114
115  """
116
117    __metaclass__ = abc.ABCMeta
118
119    GLOB_PATTERN = None  # must be redefined in subclass
120
121    def __init__(self, resultsdir):
122        self.dirname = resultsdir
123        self._id = get_job_id_or_task_id(resultsdir)
124        self.offload_count = 0
125        self.first_offload_start = 0
126
127    @classmethod
128    def get_job_directories(cls):
129        """Return a list of directories of jobs that need offloading."""
130        return [d for d in glob.glob(cls.GLOB_PATTERN) if os.path.isdir(d)]
131
132    @abc.abstractmethod
133    def get_timestamp_if_finished(self):
134        """Return this job's timestamp from the database.
135
136    If the database has not marked the job as finished, return
137    `None`.  Otherwise, return a timestamp for the job.  The
138    timestamp is to be used to determine expiration in
139    `is_job_expired()`.
140
141    @return Return `None` if the job is still running; otherwise
142            return a string with a timestamp in the appropriate
143            format.
144    """
145        raise NotImplementedError("_JobDirectory.get_timestamp_if_finished")
146
147    def process_gs_instructions(self):
148        """Process any gs_offloader instructions for this special task.
149
150    @returns True/False if there is anything left to offload.
151    """
152        # Default support is to still offload the directory.
153        return True
154
155
156NO_OFFLOAD_README = """These results have been deleted rather than offloaded.
157This is the expected behavior for passing jobs from the Commit Queue."""
158
159
160class RegularJobDirectory(_JobDirectory):
161    """Subclass of _JobDirectory for regular test jobs."""
162
163    GLOB_PATTERN = '[0-9]*-*'
164
165    def process_gs_instructions(self):
166        """Process any gs_offloader instructions for this job.
167
168    @returns True/False if there is anything left to offload.
169    """
170        # Go through the gs_offloader instructions file for each test in this job.
171        for path in glob.glob(
172                os.path.join(self.dirname, '*',
173                             constants.GS_OFFLOADER_INSTRUCTIONS)):
174            with open(path, 'r') as f:
175                gs_off_instructions = json.load(f)
176            if gs_off_instructions.get(constants.GS_OFFLOADER_NO_OFFLOAD):
177                dirname = os.path.dirname(path)
178                _remove_log_directory_contents(dirname)
179
180        # Finally check if there's anything left to offload.
181        if os.path.exists(self.dirname) and not os.listdir(self.dirname):
182            shutil.rmtree(self.dirname)
183            return False
184        return True
185
186    def get_timestamp_if_finished(self):
187        """Get the timestamp to use for finished jobs.
188
189    @returns the latest hqe finished_on time. If the finished_on times are null
190             returns the job's created_on time.
191    """
192        entry = _cached_afe().get_jobs(id=self._id, finished=True)
193        if not entry:
194            return None
195        hqes = _cached_afe().get_host_queue_entries(finished_on__isnull=False,
196                                                    job_id=self._id)
197        if not hqes:
198            return entry[0].created_on
199        # While most Jobs have 1 HQE, some can have multiple, so check them all.
200        return max([hqe.finished_on for hqe in hqes])
201
202
203def _remove_log_directory_contents(dirpath):
204    """Remove log directory contents.
205
206    Leave a note explaining what has happened to the logs.
207
208    @param dirpath: Path to log directory.
209    """
210    shutil.rmtree(dirpath)
211    os.mkdir(dirpath)
212    breadcrumb_name = os.path.join(dirpath, 'logs-removed-readme.txt')
213    with open(breadcrumb_name, 'w') as f:
214        f.write(NO_OFFLOAD_README)
215
216
217class SpecialJobDirectory(_JobDirectory):
218    """Subclass of _JobDirectory for special (per-host) jobs."""
219
220    GLOB_PATTERN = 'hosts/*/[0-9]*-*'
221
222    def __init__(self, resultsdir):
223        super(SpecialJobDirectory, self).__init__(resultsdir)
224
225    def get_timestamp_if_finished(self):
226        entry = _cached_afe().get_special_tasks(id=self._id, is_complete=True)
227        return entry[0].time_finished if entry else None
228
229
230def _find_results_dir(dirname):
231    subdirs = []
232    for root, dirs, files in os.walk(dirname, topdown=True):
233        for f in files:
234            if f == _OFFLOAD_MARKER:
235                subdirs.append(root)
236    return subdirs
237
238
239_OFFLOAD_MARKER = ".ready_for_offload"
240_marker_parse_error_metric = metrics.Counter(
241    'chromeos/autotest/gs_offloader/offload_marker_parse_errors',
242    description='Errors parsing the offload marker file')
243
244
245class SwarmingJobDirectory(_JobDirectory):
246    """Subclass of _JobDirectory for Skylab swarming jobs."""
247
248    @classmethod
249    def get_job_directories(cls):
250        """Return a list of directories of jobs that need offloading."""
251        # Legacy swarming results are in directories like
252        #   .../results/swarming-3e4391423c3a4311
253        # In particular, the ending digit is never 0
254        jobdirs = [
255                d for d in glob.glob('swarming-[0-9a-f]*[1-9a-f]')
256                if os.path.isdir(d)
257        ]
258        # New style swarming results are in directories like
259        #   .../results/swarming-3e4391423c3a4310/1
260        # - Results are one directory deeper.
261        # - Ending digit of first directory is always 0.
262        new_style_topdir = [
263                d for d in glob.glob('swarming-[0-9a-f]*0') if os.path.isdir(d)
264        ]
265        # When there are multiple tests run in one test_runner build,
266        # the results will be one level deeper with the test_id
267        # as one further subdirectory.
268        # Example: .../results/swarming-3e4391423c3a4310/1/test_id
269        for topdir in new_style_topdir:
270            for d in glob.glob('%s/[1-9a-f]*' % topdir):
271                subdirs = _find_results_dir(d)
272                jobdirs += subdirs
273
274        return jobdirs
275
276    def get_timestamp_if_finished(self):
277        """Get the timestamp to use for finished jobs.
278
279    @returns the latest hqe finished_on time. If the finished_on times are null
280             returns the job's created_on time.
281    """
282        marker_path = os.path.join(self.dirname, _OFFLOAD_MARKER)
283        try:
284            with open(marker_path) as f:
285                ts_string = f.read().strip()
286        except:
287            return None
288        try:
289            ts = int(ts_string)
290            return time_utils.epoch_time_to_date_string(ts)
291        except ValueError as e:
292            logging.debug('Error parsing %s for %s: %s', _OFFLOAD_MARKER,
293                          self.dirname, e)
294            _marker_parse_error_metric.increment()
295            return None
296
297
298_AFE = None
299def _cached_afe():
300    global _AFE
301    if _AFE is None:
302        _AFE = frontend_wrappers.RetryingAFE()
303    return _AFE
304