1import abc
2import datetime
3import glob
4import json
5import os
6import re
7import shutil
8import time
9
10import common
11from autotest_lib.client.common_lib import time_utils
12from autotest_lib.client.common_lib import utils
13from autotest_lib.server.cros.dynamic_suite import constants
14from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
15
16
17_AFE = frontend_wrappers.RetryingAFE()
18
19SPECIAL_TASK_PATTERN = '.*/hosts/[^/]+/(\d+)-[^/]+'
20JOB_PATTERN = '.*/(\d+)-[^/]+'
21# Pattern of a job folder, e.g., 123-debug_user, where 123 is job id and
22# debug_user is the name of user starts the job.
23JOB_FOLDER_PATTERN = '.*/(\d+-[^/]+)'
24
25def is_job_expired(age_limit, timestamp):
26  """Check whether a job timestamp is older than an age limit.
27
28  @param age_limit: Minimum age, measured in days.  If the value is
29                    not positive, the job is always expired.
30  @param timestamp: Timestamp of the job whose age we are checking.
31                    The format must match time_utils.TIME_FMT.
32
33  @returns True iff the job is old enough to be expired.
34  """
35  if age_limit <= 0:
36    return True
37  job_time = time_utils.time_string_to_datetime(timestamp)
38  expiration = job_time + datetime.timedelta(days=age_limit)
39  return datetime.datetime.now() >= expiration
40
41
42def get_job_id_or_task_id(result_dir):
43    """Extract job id or special task id from result_dir
44
45    @param result_dir: path to the result dir.
46            For test job:
47            /usr/local/autotest/results/2032-chromeos-test/chromeos1-rack5-host6
48            The hostname at the end is optional.
49            For special task:
50            /usr/local/autotest/results/hosts/chromeos1-rack5-host6/1343-cleanup
51
52    @returns: integer representing the job id or task id. Returns None if fail
53              to parse job or task id from the result_dir.
54    """
55    if not result_dir:
56        return
57    result_dir = os.path.abspath(result_dir)
58    # Result folder for job running inside container has only job id.
59    ssp_job_pattern = '.*/(\d+)$'
60    # Try to get the job ID from the last pattern of number-text. This avoids
61    # issue with path like 123-results/456-debug_user, in which 456 is the real
62    # job ID.
63    m_job = re.findall(JOB_PATTERN, result_dir)
64    if m_job:
65        return int(m_job[-1])
66    m_special_task = re.match(SPECIAL_TASK_PATTERN, result_dir)
67    if m_special_task:
68        return int(m_special_task.group(1))
69    m_ssp_job_pattern = re.match(ssp_job_pattern, result_dir)
70    if m_ssp_job_pattern and utils.is_in_container():
71        return int(m_ssp_job_pattern.group(1))
72
73
74def get_job_folder_name(result_dir):
75    """Extract folder name of a job from result_dir.
76
77    @param result_dir: path to the result dir.
78            For test job:
79            /usr/local/autotest/results/2032-chromeos-test/chromeos1-rack5-host6
80            The hostname at the end is optional.
81            For special task:
82            /usr/local/autotest/results/hosts/chromeos1-rack5-host6/1343-cleanup
83
84    @returns: The name of the folder of a job. Returns None if fail to parse
85            the name matching pattern JOB_FOLDER_PATTERN from the result_dir.
86    """
87    if not result_dir:
88        return
89    m_job = re.findall(JOB_FOLDER_PATTERN, result_dir)
90    if m_job:
91        return m_job[-1]
92
93
94class _JobDirectory(object):
95  """State associated with a job to be offloaded.
96
97  The full life-cycle of a job (including failure events that
98  normally don't occur) looks like this:
99   1. The job's results directory is discovered by
100      `get_job_directories()`, and a job instance is created for it.
101   2. Calls to `offload()` have no effect so long as the job
102      isn't complete in the database and the job isn't expired
103      according to the `age_limit` parameter.
104   3. Eventually, the job is both finished and expired.  The next
105      call to `offload()` makes the first attempt to offload the
106      directory to GS.  Offload is attempted, but fails to complete
107      (e.g. because of a GS problem).
108   4. Finally, a call to `offload()` succeeds, and the directory no
109      longer exists.  Now `is_offloaded()` is true, so the job
110      instance is deleted, and future failures will not mention this
111      directory any more.
112
113  Only steps 1. and 4. are guaranteed to occur.  The others depend
114  on the timing of calls to `offload()`, and on the reliability of
115  the actual offload process.
116
117  """
118
119  __metaclass__ = abc.ABCMeta
120
121  GLOB_PATTERN = None   # must be redefined in subclass
122
123  def __init__(self, resultsdir):
124    self._dirname = resultsdir
125    self._id = get_job_id_or_task_id(resultsdir)
126    self._offload_count = 0
127    self._first_offload_start = 0
128
129  @classmethod
130  def get_job_directories(cls):
131    """Return a list of directories of jobs that need offloading."""
132    return [d for d in glob.glob(cls.GLOB_PATTERN) if os.path.isdir(d)]
133
134  @abc.abstractmethod
135  def get_timestamp_if_finished(self):
136    """Return this job's timestamp from the database.
137
138    If the database has not marked the job as finished, return
139    `None`.  Otherwise, return a timestamp for the job.  The
140    timestamp is to be used to determine expiration in
141    `is_job_expired()`.
142
143    @return Return `None` if the job is still running; otherwise
144            return a string with a timestamp in the appropriate
145            format.
146    """
147    raise NotImplementedError("_JobDirectory.get_timestamp_if_finished")
148
149  def enqueue_offload(self, queue, age_limit):
150    """Enqueue the job for offload, if it's eligible.
151
152    The job is eligible for offloading if the database has marked
153    it finished, and the job is older than the `age_limit`
154    parameter.
155
156    If the job is eligible, offload processing is requested by
157    passing the `queue` parameter's `put()` method a sequence with
158    the job's `_dirname` attribute and its directory name.
159
160    @param queue     If the job should be offloaded, put the offload
161                     parameters into this queue for processing.
162    @param age_limit Minimum age for a job to be offloaded.  A value
163                     of 0 means that the job will be offloaded as
164                     soon as it is finished.
165
166    """
167    timestamp = self.get_timestamp_if_finished()
168    if not self._offload_count:
169      if not timestamp:
170        return
171      if not is_job_expired(age_limit, timestamp):
172        return
173      self._first_offload_start = time.time()
174    self._offload_count += 1
175    if self.process_gs_instructions():
176      queue.put([self._dirname, os.path.dirname(self._dirname), timestamp])
177
178  def is_offloaded(self):
179    """Return whether this job has been successfully offloaded."""
180    return not os.path.exists(self._dirname)
181
182  def get_failure_time(self):
183    """Return the time of the first offload failure."""
184    return self._first_offload_start
185
186  def get_failure_count(self):
187    """Return the number of times this job has failed to offload."""
188    return self._offload_count
189
190  def get_job_directory(self):
191    """Return the name of this job's results directory."""
192    return self._dirname
193
194  def process_gs_instructions(self):
195    """Process any gs_offloader instructions for this special task.
196
197    @returns True/False if there is anything left to offload.
198    """
199    # Default support is to still offload the directory.
200    return True
201
202
203NO_OFFLOAD_README = """These results have been deleted rather than offloaded.
204This is the expected behavior for passing jobs from the Commit Queue."""
205
206
207class RegularJobDirectory(_JobDirectory):
208  """Subclass of _JobDirectory for regular test jobs."""
209
210  GLOB_PATTERN = '[0-9]*-*'
211
212  def process_gs_instructions(self):
213    """Process any gs_offloader instructions for this job.
214
215    @returns True/False if there is anything left to offload.
216    """
217    # Go through the gs_offloader instructions file for each test in this job.
218    for path in glob.glob(os.path.join(self._dirname, '*',
219                                       constants.GS_OFFLOADER_INSTRUCTIONS)):
220      with open(path, 'r') as f:
221        gs_off_instructions = json.load(f)
222      if gs_off_instructions.get(constants.GS_OFFLOADER_NO_OFFLOAD):
223        dirname = os.path.dirname(path)
224        shutil.rmtree(dirname)
225        os.mkdir(dirname)
226        breadcrumb_name = os.path.join(dirname, 'logs-removed-readme.txt')
227        with open(breadcrumb_name, 'w') as f:
228          f.write(NO_OFFLOAD_README)
229
230    # Finally check if there's anything left to offload.
231    if not os.listdir(self._dirname):
232      shutil.rmtree(self._dirname)
233      return False
234    return True
235
236
237  def get_timestamp_if_finished(self):
238    """Get the timestamp to use for finished jobs.
239
240    @returns the latest hqe finished_on time. If the finished_on times are null
241             returns the job's created_on time.
242    """
243    entry = _AFE.get_jobs(id=self._id, finished=True)
244    if not entry:
245      return None
246    hqes = _AFE.get_host_queue_entries(finished_on__isnull=False,
247                                       job_id=self._id)
248    if not hqes:
249      return entry[0].created_on
250    # While most Jobs have 1 HQE, some can have multiple, so check them all.
251    return max([hqe.finished_on for hqe in hqes])
252
253
254class SpecialJobDirectory(_JobDirectory):
255  """Subclass of _JobDirectory for special (per-host) jobs."""
256
257  GLOB_PATTERN = 'hosts/*/[0-9]*-*'
258
259  def __init__(self, resultsdir):
260    super(SpecialJobDirectory, self).__init__(resultsdir)
261
262  def get_timestamp_if_finished(self):
263    entry = _AFE.get_special_tasks(id=self._id, is_complete=True)
264    return entry[0].time_finished if entry else None
265