1import subprocess 2import os 3import pipes 4import threading 5from dataclasses import dataclass, asdict, field 6import logging 7import sqlite3 8import time 9 10 11@dataclass 12class JobInfo: 13 """ 14 A class for ota job information 15 """ 16 id: str 17 target: str 18 incremental: str = '' 19 verbose: bool = False 20 partial: list[str] = field(default_factory=list) 21 output: str = '' 22 status: str = 'Running' 23 downgrade: bool = False 24 extra: str = '' 25 stdout: str = '' 26 stderr: str = '' 27 start_time: int = 0 28 finish_time: int = 0 29 isPartial: bool = False 30 isIncremental: bool = False 31 32 def __post_init__(self): 33 34 def enforce_bool(t): return t if isinstance(t, bool) else bool(t) 35 self.verbose, self.downgrade = map( 36 enforce_bool, 37 [self.verbose, self.downgrade]) 38 if self.incremental: 39 self.isIncremental = True 40 if self.partial: 41 self.isPartial = True 42 else: 43 self.partial = [] 44 if type(self.partial) == str: 45 self.partial = self.partial.split(',') 46 47 def to_sql_form_dict(self): 48 """ 49 Convert this instance to a dict, which can be later used to insert into 50 the SQL database. 51 Format: 52 id: string, target: string, incremental: string, verbose: int, 53 partial: string, output:string, status:string, 54 downgrade: bool, extra: string, stdout: string, stderr:string, 55 start_time:int, finish_time: int(not required) 56 """ 57 sql_form_dict = asdict(self) 58 sql_form_dict['partial'] = ','.join(sql_form_dict['partial']) 59 def bool_to_int(t): return 1 if t else 0 60 sql_form_dict['verbose'], sql_form_dict['downgrade'] = map( 61 bool_to_int, 62 [sql_form_dict['verbose'], sql_form_dict['downgrade']]) 63 return sql_form_dict 64 65 def to_dict_basic(self): 66 """ 67 Convert the instance to a dict, which includes the file name of target. 68 """ 69 basic_info = asdict(self) 70 basic_info['target_name'] = self.target.split('/')[-1] 71 if self.isIncremental: 72 basic_info['incremental_name'] = self.incremental.split('/')[-1] 73 return basic_info 74 75 def to_dict_detail(self, target_lib, offset=0): 76 """ 77 Convert this instance into a dict, which includes some detailed information 78 of the target/source build, i.e. build version and file name. 79 """ 80 detail_info = asdict(self) 81 try: 82 with open(self.stdout, 'r') as fout: 83 detail_info['stdout'] = fout.read() 84 with open(self.stderr, 'r') as ferr: 85 detail_info['stderr'] = ferr.read() 86 except FileNotFoundError: 87 detail_info['stdout'] = 'NO STD OUTPUT IS FOUND' 88 detail_info['stderr'] = 'NO STD ERROR IS FOUND' 89 target_info = target_lib.get_build_by_path(self.target) 90 detail_info['target_name'] = target_info.file_name 91 detail_info['target_build_version'] = target_info.build_version 92 if self.incremental: 93 incremental_info = target_lib.get_build_by_path( 94 self.incremental) 95 detail_info['incremental_name'] = incremental_info.file_name 96 detail_info['incremental_build_version'] = incremental_info.build_version 97 return detail_info 98 99 100class DependencyError(Exception): 101 pass 102 103 104class ProcessesManagement: 105 """ 106 A class manage the ota generate process 107 """ 108 109 @staticmethod 110 def check_external_dependencies(): 111 try: 112 java_version = subprocess.check_output(["java", "--version"]) 113 print("Java version:", java_version.decode()) 114 except Exception as e: 115 raise DependencyError( 116 "java not found in PATH. Attempt to generate OTA might fail. " + str(e)) 117 try: 118 zip_version = subprocess.check_output(["zip", "-v"]) 119 print("Zip version:", zip_version.decode()) 120 except Exception as e: 121 raise DependencyError( 122 "zip command not found in PATH. Attempt to generate OTA might fail. " + str(e)) 123 124 def __init__(self, *, working_dir='output', db_path=None, otatools_dir=None): 125 """ 126 create a table if not exist 127 """ 128 ProcessesManagement.check_external_dependencies() 129 self.working_dir = working_dir 130 self.logs_dir = os.path.join(working_dir, 'logs') 131 self.otatools_dir = otatools_dir 132 os.makedirs(self.working_dir, exist_ok=True) 133 os.makedirs(self.logs_dir, exist_ok=True) 134 if not db_path: 135 db_path = os.path.join(self.working_dir, "ota_database.db") 136 self.path = db_path 137 with sqlite3.connect(self.path) as connect: 138 cursor = connect.cursor() 139 cursor.execute(""" 140 CREATE TABLE if not exists Jobs ( 141 ID TEXT, 142 TargetPath TEXT, 143 IncrementalPath TEXT, 144 Verbose INTEGER, 145 Partial TEXT, 146 OutputPath TEXT, 147 Status TEXT, 148 Downgrade INTEGER, 149 OtherFlags TEXT, 150 STDOUT TEXT, 151 STDERR TEXT, 152 StartTime INTEGER, 153 FinishTime INTEGER 154 ) 155 """) 156 157 def insert_database(self, job_info): 158 """ 159 Insert the job_info into the database 160 Args: 161 job_info: JobInfo 162 """ 163 with sqlite3.connect(self.path) as connect: 164 cursor = connect.cursor() 165 cursor.execute(""" 166 INSERT INTO Jobs (ID, TargetPath, IncrementalPath, Verbose, Partial, OutputPath, Status, Downgrade, OtherFlags, STDOUT, STDERR, StartTime, Finishtime) 167 VALUES (:id, :target, :incremental, :verbose, :partial, :output, :status, :downgrade, :extra, :stdout, :stderr, :start_time, :finish_time) 168 """, job_info.to_sql_form_dict()) 169 170 def get_status_by_ID(self, id): 171 """ 172 Return the status of job <id> as a instance of JobInfo 173 Args: 174 id: string 175 Return: 176 JobInfo 177 """ 178 with sqlite3.connect(self.path) as connect: 179 cursor = connect.cursor() 180 logging.info(id) 181 cursor.execute(""" 182 SELECT ID, TargetPath, IncrementalPath, Verbose, Partial, OutputPath, Status, Downgrade, OtherFlags, STDOUT, STDERR, StartTime, FinishTime 183 FROM Jobs WHERE ID=(?) 184 """, (str(id),)) 185 row = cursor.fetchone() 186 status = JobInfo(*row) 187 return status 188 189 def get_status(self): 190 """ 191 Return the status of all jobs as a list of JobInfo 192 Return: 193 List[JobInfo] 194 """ 195 with sqlite3.connect(self.path) as connect: 196 cursor = connect.cursor() 197 cursor.execute(""" 198 SELECT ID, TargetPath, IncrementalPath, Verbose, Partial, OutputPath, Status, Downgrade, OtherFlags, STDOUT, STDERR, StartTime, FinishTime 199 FROM Jobs 200 """) 201 rows = cursor.fetchall() 202 statuses = [JobInfo(*row) for row in rows] 203 return statuses 204 205 def update_status(self, id, status, finish_time): 206 """ 207 Change the status and finish time of job <id> in the database 208 Args: 209 id: string 210 status: string 211 finish_time: int 212 """ 213 with sqlite3.connect(self.path) as connect: 214 cursor = connect.cursor() 215 cursor.execute(""" 216 UPDATE Jobs SET Status=(?), FinishTime=(?) 217 WHERE ID=(?) 218 """, 219 (status, finish_time, id)) 220 221 def ota_run(self, command, id, stdout_path, stderr_path): 222 """ 223 Initiate a subprocess to run the ota generation. Wait until it finished and update 224 the record in the database. 225 """ 226 stderr_pipes = pipes.Template() 227 stdout_pipes = pipes.Template() 228 ferr = stderr_pipes.open(stdout_path, 'w') 229 fout = stdout_pipes.open(stderr_path, 'w') 230 env = {} 231 if self.otatools_dir: 232 env['PATH'] = os.path.join( 233 self.otatools_dir, "bin") + ":" + os.environ["PATH"] 234 # TODO(lishutong): Enable user to use self-defined stderr/stdout path 235 try: 236 proc = subprocess.Popen( 237 command, stderr=ferr, stdout=fout, shell=False, env=env, cwd=self.otatools_dir) 238 except FileNotFoundError as e: 239 logging.error('ota_from_target_files is not set properly %s', e) 240 self.update_status(id, 'Error', int(time.time())) 241 raise 242 except Exception as e: 243 logging.error('Failed to execute ota_from_target_files %s', e) 244 self.update_status(id, 'Error', int(time.time())) 245 raise 246 247 def wait_result(): 248 exit_code = proc.wait() 249 if exit_code == 0: 250 self.update_status(id, 'Finished', int(time.time())) 251 else: 252 self.update_status(id, 'Error', int(time.time())) 253 threading.Thread(target=wait_result).start() 254 255 def ota_generate(self, args, id): 256 """ 257 Read in the arguments from the frontend and start running the OTA 258 generation process, then update the records in database. 259 Format of args: 260 output: string, extra_keys: List[string], extra: string, 261 isIncremental: bool, isPartial: bool, partial: List[string], 262 incremental: string, target: string, verbose: bool 263 args: 264 args: dict 265 id: string 266 """ 267 command = ['ota_from_target_files'] 268 # Check essential configuration is properly set 269 if not os.path.isfile(args['target']): 270 raise FileNotFoundError 271 if not 'output' in args: 272 args['output'] = os.path.join(self.working_dir, str(id) + '.zip') 273 if args['verbose']: 274 command.append('-v') 275 if args['extra_keys']: 276 args['extra'] = '--' + \ 277 ' --'.join(args['extra_keys']) + ' ' + args['extra'] 278 if args['extra']: 279 command += args['extra'].strip().split(' ') 280 if args['isIncremental']: 281 if not os.path.isfile(args['incremental']): 282 raise FileNotFoundError 283 command.append('-i') 284 command.append(os.path.realpath(args['incremental'])) 285 if args['isPartial']: 286 command.append('--partial') 287 command.append(' '.join(args['partial'])) 288 command.append(os.path.realpath(args['target'])) 289 command.append(os.path.realpath(args['output'])) 290 stdout = os.path.join(self.logs_dir, 'stdout.' + str(id)) 291 stderr = os.path.join(self.logs_dir, 'stderr.' + str(id)) 292 job_info = JobInfo(id, 293 target=args['target'], 294 incremental=args['incremental'] if args['isIncremental'] else '', 295 verbose=args['verbose'], 296 partial=args['partial'] if args['isPartial'] else [ 297 ], 298 output=args['output'], 299 status='Running', 300 extra=args['extra'], 301 start_time=int(time.time()), 302 stdout=stdout, 303 stderr=stderr 304 ) 305 self.ota_run(command, id, job_info.stdout, job_info.stderr) 306 self.insert_database(job_info) 307 logging.info( 308 'Starting generating OTA package with id {}: \n {}' 309 .format(id, command)) 310