1import subprocess
2import os
3import pipes
4import threading
5from dataclasses import dataclass, asdict, field
6import logging
7import sqlite3
8import time
9
10
11@dataclass
12class JobInfo:
13    """
14    A class for ota job information
15    """
16    id: str
17    target: str
18    incremental: str = ''
19    verbose: bool = False
20    partial: list[str] = field(default_factory=list)
21    output: str = ''
22    status: str = 'Running'
23    downgrade: bool = False
24    extra: str = ''
25    stdout: str = ''
26    stderr: str = ''
27    start_time: int = 0
28    finish_time: int = 0
29    isPartial: bool = False
30    isIncremental: bool = False
31
32    def __post_init__(self):
33
34        def enforce_bool(t): return t if isinstance(t, bool) else bool(t)
35        self.verbose, self.downgrade = map(
36            enforce_bool,
37            [self.verbose, self.downgrade])
38        if self.incremental:
39            self.isIncremental = True
40        if self.partial:
41            self.isPartial = True
42        else:
43            self.partial = []
44        if type(self.partial) == str:
45            self.partial = self.partial.split(',')
46
47    def to_sql_form_dict(self):
48        """
49        Convert this instance to a dict, which can be later used to insert into
50        the SQL database.
51        Format:
52            id: string, target: string, incremental: string, verbose: int,
53            partial: string, output:string, status:string,
54            downgrade: bool, extra: string, stdout: string, stderr:string,
55            start_time:int, finish_time: int(not required)
56        """
57        sql_form_dict = asdict(self)
58        sql_form_dict['partial'] = ','.join(sql_form_dict['partial'])
59        def bool_to_int(t): return 1 if t else 0
60        sql_form_dict['verbose'], sql_form_dict['downgrade'] = map(
61            bool_to_int,
62            [sql_form_dict['verbose'], sql_form_dict['downgrade']])
63        return sql_form_dict
64
65    def to_dict_basic(self):
66        """
67        Convert the instance to a dict, which includes the file name of target.
68        """
69        basic_info = asdict(self)
70        basic_info['target_name'] = self.target.split('/')[-1]
71        if self.isIncremental:
72            basic_info['incremental_name'] = self.incremental.split('/')[-1]
73        return basic_info
74
75    def to_dict_detail(self, target_lib, offset=0):
76        """
77        Convert this instance into a dict, which includes some detailed information
78        of the target/source build, i.e. build version and file name.
79        """
80        detail_info = asdict(self)
81        try:
82            with open(self.stdout, 'r') as fout:
83                detail_info['stdout'] = fout.read()
84            with open(self.stderr, 'r') as ferr:
85                detail_info['stderr'] = ferr.read()
86        except FileNotFoundError:
87            detail_info['stdout'] = 'NO STD OUTPUT IS FOUND'
88            detail_info['stderr'] = 'NO STD ERROR IS FOUND'
89        target_info = target_lib.get_build_by_path(self.target)
90        detail_info['target_name'] = target_info.file_name
91        detail_info['target_build_version'] = target_info.build_version
92        if self.incremental:
93            incremental_info = target_lib.get_build_by_path(
94                self.incremental)
95            detail_info['incremental_name'] = incremental_info.file_name
96            detail_info['incremental_build_version'] = incremental_info.build_version
97        return detail_info
98
99
100class DependencyError(Exception):
101    pass
102
103
104class ProcessesManagement:
105    """
106    A class manage the ota generate process
107    """
108
109    @staticmethod
110    def check_external_dependencies():
111        try:
112            java_version = subprocess.check_output(["java", "--version"])
113            print("Java version:", java_version.decode())
114        except Exception as e:
115            raise DependencyError(
116                "java not found in PATH. Attempt to generate OTA might fail. " + str(e))
117        try:
118            zip_version = subprocess.check_output(["zip", "-v"])
119            print("Zip version:", zip_version.decode())
120        except Exception as e:
121            raise DependencyError(
122                "zip command not found in PATH. Attempt to generate OTA might fail. " + str(e))
123
124    def __init__(self, *, working_dir='output', db_path=None, otatools_dir=None):
125        """
126        create a table if not exist
127        """
128        ProcessesManagement.check_external_dependencies()
129        self.working_dir = working_dir
130        self.logs_dir = os.path.join(working_dir, 'logs')
131        self.otatools_dir = otatools_dir
132        os.makedirs(self.working_dir, exist_ok=True)
133        os.makedirs(self.logs_dir, exist_ok=True)
134        if not db_path:
135            db_path = os.path.join(self.working_dir, "ota_database.db")
136        self.path = db_path
137        with sqlite3.connect(self.path) as connect:
138            cursor = connect.cursor()
139            cursor.execute("""
140                CREATE TABLE if not exists Jobs (
141                ID TEXT,
142                TargetPath TEXT,
143                IncrementalPath TEXT,
144                Verbose INTEGER,
145                Partial TEXT,
146                OutputPath TEXT,
147                Status TEXT,
148                Downgrade INTEGER,
149                OtherFlags TEXT,
150                STDOUT TEXT,
151                STDERR TEXT,
152                StartTime INTEGER,
153                FinishTime INTEGER
154            )
155            """)
156
157    def insert_database(self, job_info):
158        """
159        Insert the job_info into the database
160        Args:
161            job_info: JobInfo
162        """
163        with sqlite3.connect(self.path) as connect:
164            cursor = connect.cursor()
165            cursor.execute("""
166                    INSERT INTO Jobs (ID, TargetPath, IncrementalPath, Verbose, Partial, OutputPath, Status, Downgrade, OtherFlags, STDOUT, STDERR, StartTime, Finishtime)
167                    VALUES (:id, :target, :incremental, :verbose, :partial, :output, :status, :downgrade, :extra, :stdout, :stderr, :start_time, :finish_time)
168                """, job_info.to_sql_form_dict())
169
170    def get_status_by_ID(self, id):
171        """
172        Return the status of job <id> as a instance of JobInfo
173        Args:
174            id: string
175        Return:
176            JobInfo
177        """
178        with sqlite3.connect(self.path) as connect:
179            cursor = connect.cursor()
180            logging.info(id)
181            cursor.execute("""
182            SELECT ID, TargetPath, IncrementalPath, Verbose, Partial, OutputPath, Status, Downgrade, OtherFlags, STDOUT, STDERR, StartTime, FinishTime
183            FROM Jobs WHERE ID=(?)
184            """, (str(id),))
185            row = cursor.fetchone()
186        status = JobInfo(*row)
187        return status
188
189    def get_status(self):
190        """
191        Return the status of all jobs as a list of JobInfo
192        Return:
193            List[JobInfo]
194        """
195        with sqlite3.connect(self.path) as connect:
196            cursor = connect.cursor()
197            cursor.execute("""
198            SELECT ID, TargetPath, IncrementalPath, Verbose, Partial, OutputPath, Status, Downgrade, OtherFlags, STDOUT, STDERR, StartTime, FinishTime
199            FROM Jobs
200            """)
201            rows = cursor.fetchall()
202        statuses = [JobInfo(*row) for row in rows]
203        return statuses
204
205    def update_status(self, id, status, finish_time):
206        """
207        Change the status and finish time of job <id> in the database
208        Args:
209            id: string
210            status: string
211            finish_time: int
212        """
213        with sqlite3.connect(self.path) as connect:
214            cursor = connect.cursor()
215            cursor.execute("""
216                UPDATE Jobs SET Status=(?), FinishTime=(?)
217                WHERE ID=(?)
218                """,
219                           (status, finish_time, id))
220
221    def ota_run(self, command, id, stdout_path, stderr_path):
222        """
223        Initiate a subprocess to run the ota generation. Wait until it finished and update
224        the record in the database.
225        """
226        stderr_pipes = pipes.Template()
227        stdout_pipes = pipes.Template()
228        ferr = stderr_pipes.open(stdout_path, 'w')
229        fout = stdout_pipes.open(stderr_path, 'w')
230        env = {}
231        if self.otatools_dir:
232            env['PATH'] = os.path.join(
233                self.otatools_dir, "bin") + ":" + os.environ["PATH"]
234        # TODO(lishutong): Enable user to use self-defined stderr/stdout path
235        try:
236            proc = subprocess.Popen(
237                command, stderr=ferr, stdout=fout, shell=False, env=env, cwd=self.otatools_dir)
238        except FileNotFoundError as e:
239            logging.error('ota_from_target_files is not set properly %s', e)
240            self.update_status(id, 'Error', int(time.time()))
241            raise
242        except Exception as e:
243            logging.error('Failed to execute ota_from_target_files %s', e)
244            self.update_status(id, 'Error', int(time.time()))
245            raise
246
247        def wait_result():
248            exit_code = proc.wait()
249            if exit_code == 0:
250                self.update_status(id, 'Finished', int(time.time()))
251            else:
252                self.update_status(id, 'Error', int(time.time()))
253        threading.Thread(target=wait_result).start()
254
255    def ota_generate(self, args, id):
256        """
257        Read in the arguments from the frontend and start running the OTA
258        generation process, then update the records in database.
259        Format of args:
260            output: string, extra_keys: List[string], extra: string,
261            isIncremental: bool, isPartial: bool, partial: List[string],
262            incremental: string, target: string, verbose: bool
263        args:
264            args: dict
265            id: string
266        """
267        command = ['ota_from_target_files']
268        # Check essential configuration is properly set
269        if not os.path.isfile(args['target']):
270            raise FileNotFoundError
271        if not 'output' in args:
272            args['output'] = os.path.join(self.working_dir, str(id) + '.zip')
273        if args['verbose']:
274            command.append('-v')
275        if args['extra_keys']:
276            args['extra'] = '--' + \
277                ' --'.join(args['extra_keys']) + ' ' + args['extra']
278        if args['extra']:
279            command += args['extra'].strip().split(' ')
280        if args['isIncremental']:
281            if not os.path.isfile(args['incremental']):
282                raise FileNotFoundError
283            command.append('-i')
284            command.append(os.path.realpath(args['incremental']))
285        if args['isPartial']:
286            command.append('--partial')
287            command.append(' '.join(args['partial']))
288        command.append(os.path.realpath(args['target']))
289        command.append(os.path.realpath(args['output']))
290        stdout = os.path.join(self.logs_dir, 'stdout.' + str(id))
291        stderr = os.path.join(self.logs_dir, 'stderr.' + str(id))
292        job_info = JobInfo(id,
293                           target=args['target'],
294                           incremental=args['incremental'] if args['isIncremental'] else '',
295                           verbose=args['verbose'],
296                           partial=args['partial'] if args['isPartial'] else [
297                           ],
298                           output=args['output'],
299                           status='Running',
300                           extra=args['extra'],
301                           start_time=int(time.time()),
302                           stdout=stdout,
303                           stderr=stderr
304                           )
305        self.ota_run(command, id, job_info.stdout, job_info.stderr)
306        self.insert_database(job_info)
307        logging.info(
308            'Starting generating OTA package with id {}: \n {}'
309            .format(id, command))
310