1import abc
2import datetime
3import glob
4import json
5import os
6import re
7import shutil
8import time
10import common
11from autotest_lib.client.common_lib import time_utils
12from autotest_lib.client.common_lib import utils
13from autotest_lib.server.cros.dynamic_suite import constants
14from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
17_AFE = frontend_wrappers.RetryingAFE()
19SPECIAL_TASK_PATTERN = '.*/hosts/[^/]+/(\d+)-[^/]+'
20JOB_PATTERN = '.*/(\d+)-[^/]+'
22def _is_job_expired(age_limit, timestamp):
23  """Check whether a job timestamp is older than an age limit.
25  @param age_limit: Minimum age, measured in days.  If the value is
26                    not positive, the job is always expired.
27  @param timestamp: Timestamp of the job whose age we are checking.
28                    The format must match time_utils.TIME_FMT.
30  @returns True iff the job is old enough to be expired.
31  """
32  if age_limit <= 0:
33    return True
34  job_time = time_utils.time_string_to_datetime(timestamp)
35  expiration = job_time + datetime.timedelta(days=age_limit)
36  return datetime.datetime.now() >= expiration
39def get_job_id_or_task_id(result_dir):
40    """Extract job id or special task id from result_dir
42    @param result_dir: path to the result dir.
43            For test job:
44            /usr/local/autotest/results/2032-chromeos-test/chromeos1-rack5-host6
45            The hostname at the end is optional.
46            For special task:
47            /usr/local/autotest/results/hosts/chromeos1-rack5-host6/1343-cleanup
49    @returns: integer representing the job id or task id. Returns None if fail
50              to parse job or task id from the result_dir.
51    """
52    if not result_dir:
53        return
54    result_dir = os.path.abspath(result_dir)
55    # Result folder for job running inside container has only job id.
56    ssp_job_pattern = '.*/(\d+)$'
57    # Try to get the job ID from the last pattern of number-text. This avoids
58    # issue with path like 123-results/456-debug_user, in which 456 is the real
59    # job ID.
60    m_job = re.findall(JOB_PATTERN, result_dir)
61    if m_job:
62        return int(m_job[-1])
63    m_special_task = re.match(SPECIAL_TASK_PATTERN, result_dir)
64    if m_special_task:
65        return int(m_special_task.group(1))
66    m_ssp_job_pattern = re.match(ssp_job_pattern, result_dir)
67    if m_ssp_job_pattern and utils.is_in_container():
68        return int(m_ssp_job_pattern.group(1))
71class _JobDirectory(object):
72  """State associated with a job to be offloaded.
74  The full life-cycle of a job (including failure events that
75  normally don't occur) looks like this:
76   1. The job's results directory is discovered by
77      `get_job_directories()`, and a job instance is created for it.
78   2. Calls to `offload()` have no effect so long as the job
79      isn't complete in the database and the job isn't expired
80      according to the `age_limit` parameter.
81   3. Eventually, the job is both finished and expired.  The next
82      call to `offload()` makes the first attempt to offload the
83      directory to GS.  Offload is attempted, but fails to complete
84      (e.g. because of a GS problem).
85   4. After the first failed offload `is_offloaded()` is false,
86      but `is_reportable()` is also false, so the failure is not
87      reported.
88   5. Another call to `offload()` again tries to offload the
89      directory, and again fails.
90   6. After a second failure, `is_offloaded()` is false and
91      `is_reportable()` is true, so the failure generates an e-mail
92      notification.
93   7. Finally, a call to `offload()` succeeds, and the directory no
94      longer exists.  Now `is_offloaded()` is true, so the job
95      instance is deleted, and future failures will not mention this
96      directory any more.
98  Only steps 1. and 7. are guaranteed to occur.  The others depend
99  on the timing of calls to `offload()`, and on the reliability of
100  the actual offload process.
102  """
104  __metaclass__ = abc.ABCMeta
106  GLOB_PATTERN = None   # must be redefined in subclass
108  def __init__(self, resultsdir):
109    self._dirname = resultsdir
110    self._id = get_job_id_or_task_id(resultsdir)
111    self._offload_count = 0
112    self._first_offload_start = 0
114  @classmethod
115  def get_job_directories(cls):
116    """Return a list of directories of jobs that need offloading."""
117    return [d for d in glob.glob(cls.GLOB_PATTERN) if os.path.isdir(d)]
119  @abc.abstractmethod
120  def get_timestamp_if_finished(self):
121    """Return this job's timestamp from the database.
123    If the database has not marked the job as finished, return
124    `None`.  Otherwise, return a timestamp for the job.  The
125    timestamp is to be used to determine expiration in
126    `_is_job_expired()`.
128    @return Return `None` if the job is still running; otherwise
129            return a string with a timestamp in the appropriate
130            format.
131    """
132    raise NotImplementedError("_JobDirectory.get_timestamp_if_finished")
134  def enqueue_offload(self, queue, age_limit):
135    """Enqueue the job for offload, if it's eligible.
137    The job is eligible for offloading if the database has marked
138    it finished, and the job is older than the `age_limit`
139    parameter.
141    If the job is eligible, offload processing is requested by
142    passing the `queue` parameter's `put()` method a sequence with
143    the job's `_dirname` attribute and its directory name.
145    @param queue     If the job should be offloaded, put the offload
146                     parameters into this queue for processing.
147    @param age_limit Minimum age for a job to be offloaded.  A value
148                     of 0 means that the job will be offloaded as
149                     soon as it is finished.
151    """
152    if not self._offload_count:
153      timestamp = self.get_timestamp_if_finished()
154      if not timestamp:
155        return
156      if not _is_job_expired(age_limit, timestamp):
157        return
158      self._first_offload_start = time.time()
159    self._offload_count += 1
160    if self.process_gs_instructions():
161      queue.put([self._dirname, os.path.dirname(self._dirname)])
163  def is_offloaded(self):
164    """Return whether this job has been successfully offloaded."""
165    return not os.path.exists(self._dirname)
167  def is_reportable(self):
168    """Return whether this job has a reportable failure."""
169    return self._offload_count > 1
171  def get_failure_time(self):
172    """Return the time of the first offload failure."""
173    return self._first_offload_start
175  def get_failure_count(self):
176    """Return the number of times this job has failed to offload."""
177    return self._offload_count
179  def get_job_directory(self):
180    """Return the name of this job's results directory."""
181    return self._dirname
183  def process_gs_instructions(self):
184    """Process any gs_offloader instructions for this special task.
186    @returns True/False if there is anything left to offload.
187    """
188    # Default support is to still offload the directory.
189    return True
192class RegularJobDirectory(_JobDirectory):
193  """Subclass of _JobDirectory for regular test jobs."""
195  GLOB_PATTERN = '[0-9]*-*'
197  def process_gs_instructions(self):
198    """Process any gs_offloader instructions for this job.
200    @returns True/False if there is anything left to offload.
201    """
202    # Go through the gs_offloader instructions file for each test in this job.
203    for path in glob.glob(os.path.join(self._dirname, '*',
204                                       constants.GS_OFFLOADER_INSTRUCTIONS)):
205      with open(path, 'r') as f:
206        gs_off_instructions = json.load(f)
207      if gs_off_instructions.get(constants.GS_OFFLOADER_NO_OFFLOAD):
208        shutil.rmtree(os.path.dirname(path))
210    # Finally check if there's anything left to offload.
211    if not os.listdir(self._dirname):
212      shutil.rmtree(self._dirname)
213      return False
214    return True
217  def get_timestamp_if_finished(self):
218    """Get the timestamp to use for finished jobs.
220    @returns the latest hqe finished_on time. If the finished_on times are null
221             returns the job's created_on time.
222    """
223    entry = _AFE.get_jobs(id=self._id, finished=True)
224    if not entry:
225      return None
226    hqes = _AFE.get_host_queue_entries(finished_on__isnull=False,
227                                       job_id=self._id)
228    if not hqes:
229      return entry[0].created_on
230    # While most Jobs have 1 HQE, some can have multiple, so check them all.
231    return max([hqe.finished_on for hqe in hqes])
234class SpecialJobDirectory(_JobDirectory):
235  """Subclass of _JobDirectory for special (per-host) jobs."""
237  GLOB_PATTERN = 'hosts/*/[0-9]*-*'
239  def __init__(self, resultsdir):
240    super(SpecialJobDirectory, self).__init__(resultsdir)
242  def get_timestamp_if_finished(self):
243    entry = _AFE.get_special_tasks(id=self._id, is_complete=True)
244    return entry[0].time_finished if entry else None