1import abc 2import datetime 3import glob 4import json 5import os 6import re 7import shutil 8import time 9 10import common 11from autotest_lib.client.common_lib import time_utils 12from autotest_lib.client.common_lib import utils 13from autotest_lib.server.cros.dynamic_suite import constants 14from autotest_lib.server.cros.dynamic_suite import frontend_wrappers 15 16 17_AFE = frontend_wrappers.RetryingAFE() 18 19SPECIAL_TASK_PATTERN = '.*/hosts/[^/]+/(\d+)-[^/]+' 20JOB_PATTERN = '.*/(\d+)-[^/]+' 21# Pattern of a job folder, e.g., 123-debug_user, where 123 is job id and 22# debug_user is the name of user starts the job. 23JOB_FOLDER_PATTERN = '.*/(\d+-[^/]+)' 24 25def is_job_expired(age_limit, timestamp): 26 """Check whether a job timestamp is older than an age limit. 27 28 @param age_limit: Minimum age, measured in days. If the value is 29 not positive, the job is always expired. 30 @param timestamp: Timestamp of the job whose age we are checking. 31 The format must match time_utils.TIME_FMT. 32 33 @returns True iff the job is old enough to be expired. 34 """ 35 if age_limit <= 0: 36 return True 37 job_time = time_utils.time_string_to_datetime(timestamp) 38 expiration = job_time + datetime.timedelta(days=age_limit) 39 return datetime.datetime.now() >= expiration 40 41 42def get_job_id_or_task_id(result_dir): 43 """Extract job id or special task id from result_dir 44 45 @param result_dir: path to the result dir. 46 For test job: 47 /usr/local/autotest/results/2032-chromeos-test/chromeos1-rack5-host6 48 The hostname at the end is optional. 49 For special task: 50 /usr/local/autotest/results/hosts/chromeos1-rack5-host6/1343-cleanup 51 52 @returns: integer representing the job id or task id. Returns None if fail 53 to parse job or task id from the result_dir. 54 """ 55 if not result_dir: 56 return 57 result_dir = os.path.abspath(result_dir) 58 # Result folder for job running inside container has only job id. 59 ssp_job_pattern = '.*/(\d+)$' 60 # Try to get the job ID from the last pattern of number-text. This avoids 61 # issue with path like 123-results/456-debug_user, in which 456 is the real 62 # job ID. 63 m_job = re.findall(JOB_PATTERN, result_dir) 64 if m_job: 65 return int(m_job[-1]) 66 m_special_task = re.match(SPECIAL_TASK_PATTERN, result_dir) 67 if m_special_task: 68 return int(m_special_task.group(1)) 69 m_ssp_job_pattern = re.match(ssp_job_pattern, result_dir) 70 if m_ssp_job_pattern and utils.is_in_container(): 71 return int(m_ssp_job_pattern.group(1)) 72 73 74def get_job_folder_name(result_dir): 75 """Extract folder name of a job from result_dir. 76 77 @param result_dir: path to the result dir. 78 For test job: 79 /usr/local/autotest/results/2032-chromeos-test/chromeos1-rack5-host6 80 The hostname at the end is optional. 81 For special task: 82 /usr/local/autotest/results/hosts/chromeos1-rack5-host6/1343-cleanup 83 84 @returns: The name of the folder of a job. Returns None if fail to parse 85 the name matching pattern JOB_FOLDER_PATTERN from the result_dir. 86 """ 87 if not result_dir: 88 return 89 m_job = re.findall(JOB_FOLDER_PATTERN, result_dir) 90 if m_job: 91 return m_job[-1] 92 93 94class _JobDirectory(object): 95 """State associated with a job to be offloaded. 96 97 The full life-cycle of a job (including failure events that 98 normally don't occur) looks like this: 99 1. The job's results directory is discovered by 100 `get_job_directories()`, and a job instance is created for it. 101 2. Calls to `offload()` have no effect so long as the job 102 isn't complete in the database and the job isn't expired 103 according to the `age_limit` parameter. 104 3. Eventually, the job is both finished and expired. The next 105 call to `offload()` makes the first attempt to offload the 106 directory to GS. Offload is attempted, but fails to complete 107 (e.g. because of a GS problem). 108 4. Finally, a call to `offload()` succeeds, and the directory no 109 longer exists. Now `is_offloaded()` is true, so the job 110 instance is deleted, and future failures will not mention this 111 directory any more. 112 113 Only steps 1. and 4. are guaranteed to occur. The others depend 114 on the timing of calls to `offload()`, and on the reliability of 115 the actual offload process. 116 117 """ 118 119 __metaclass__ = abc.ABCMeta 120 121 GLOB_PATTERN = None # must be redefined in subclass 122 123 def __init__(self, resultsdir): 124 self._dirname = resultsdir 125 self._id = get_job_id_or_task_id(resultsdir) 126 self._offload_count = 0 127 self._first_offload_start = 0 128 129 @classmethod 130 def get_job_directories(cls): 131 """Return a list of directories of jobs that need offloading.""" 132 return [d for d in glob.glob(cls.GLOB_PATTERN) if os.path.isdir(d)] 133 134 @abc.abstractmethod 135 def get_timestamp_if_finished(self): 136 """Return this job's timestamp from the database. 137 138 If the database has not marked the job as finished, return 139 `None`. Otherwise, return a timestamp for the job. The 140 timestamp is to be used to determine expiration in 141 `is_job_expired()`. 142 143 @return Return `None` if the job is still running; otherwise 144 return a string with a timestamp in the appropriate 145 format. 146 """ 147 raise NotImplementedError("_JobDirectory.get_timestamp_if_finished") 148 149 def enqueue_offload(self, queue, age_limit): 150 """Enqueue the job for offload, if it's eligible. 151 152 The job is eligible for offloading if the database has marked 153 it finished, and the job is older than the `age_limit` 154 parameter. 155 156 If the job is eligible, offload processing is requested by 157 passing the `queue` parameter's `put()` method a sequence with 158 the job's `_dirname` attribute and its directory name. 159 160 @param queue If the job should be offloaded, put the offload 161 parameters into this queue for processing. 162 @param age_limit Minimum age for a job to be offloaded. A value 163 of 0 means that the job will be offloaded as 164 soon as it is finished. 165 166 """ 167 timestamp = self.get_timestamp_if_finished() 168 if not self._offload_count: 169 if not timestamp: 170 return 171 if not is_job_expired(age_limit, timestamp): 172 return 173 self._first_offload_start = time.time() 174 self._offload_count += 1 175 if self.process_gs_instructions(): 176 queue.put([self._dirname, os.path.dirname(self._dirname), timestamp]) 177 178 def is_offloaded(self): 179 """Return whether this job has been successfully offloaded.""" 180 return not os.path.exists(self._dirname) 181 182 def get_failure_time(self): 183 """Return the time of the first offload failure.""" 184 return self._first_offload_start 185 186 def get_failure_count(self): 187 """Return the number of times this job has failed to offload.""" 188 return self._offload_count 189 190 def get_job_directory(self): 191 """Return the name of this job's results directory.""" 192 return self._dirname 193 194 def process_gs_instructions(self): 195 """Process any gs_offloader instructions for this special task. 196 197 @returns True/False if there is anything left to offload. 198 """ 199 # Default support is to still offload the directory. 200 return True 201 202 203NO_OFFLOAD_README = """These results have been deleted rather than offloaded. 204This is the expected behavior for passing jobs from the Commit Queue.""" 205 206 207class RegularJobDirectory(_JobDirectory): 208 """Subclass of _JobDirectory for regular test jobs.""" 209 210 GLOB_PATTERN = '[0-9]*-*' 211 212 def process_gs_instructions(self): 213 """Process any gs_offloader instructions for this job. 214 215 @returns True/False if there is anything left to offload. 216 """ 217 # Go through the gs_offloader instructions file for each test in this job. 218 for path in glob.glob(os.path.join(self._dirname, '*', 219 constants.GS_OFFLOADER_INSTRUCTIONS)): 220 with open(path, 'r') as f: 221 gs_off_instructions = json.load(f) 222 if gs_off_instructions.get(constants.GS_OFFLOADER_NO_OFFLOAD): 223 dirname = os.path.dirname(path) 224 shutil.rmtree(dirname) 225 os.mkdir(dirname) 226 breadcrumb_name = os.path.join(dirname, 'logs-removed-readme.txt') 227 with open(breadcrumb_name, 'w') as f: 228 f.write(NO_OFFLOAD_README) 229 230 # Finally check if there's anything left to offload. 231 if not os.listdir(self._dirname): 232 shutil.rmtree(self._dirname) 233 return False 234 return True 235 236 237 def get_timestamp_if_finished(self): 238 """Get the timestamp to use for finished jobs. 239 240 @returns the latest hqe finished_on time. If the finished_on times are null 241 returns the job's created_on time. 242 """ 243 entry = _AFE.get_jobs(id=self._id, finished=True) 244 if not entry: 245 return None 246 hqes = _AFE.get_host_queue_entries(finished_on__isnull=False, 247 job_id=self._id) 248 if not hqes: 249 return entry[0].created_on 250 # While most Jobs have 1 HQE, some can have multiple, so check them all. 251 return max([hqe.finished_on for hqe in hqes]) 252 253 254class SpecialJobDirectory(_JobDirectory): 255 """Subclass of _JobDirectory for special (per-host) jobs.""" 256 257 GLOB_PATTERN = 'hosts/*/[0-9]*-*' 258 259 def __init__(self, resultsdir): 260 super(SpecialJobDirectory, self).__init__(resultsdir) 261 262 def get_timestamp_if_finished(self): 263 entry = _AFE.get_special_tasks(id=self._id, is_complete=True) 264 return entry[0].time_finished if entry else None 265