1from dataclasses import dataclass, asdict, field 2import sqlite3 3import time 4import logging 5import os 6import zipfile 7import re 8import json 9 10 11class BuildFileInvalidError(Exception): 12 pass 13 14 15@dataclass 16class BuildInfo: 17 """ 18 A class for Android build information. 19 """ 20 file_name: str 21 path: str 22 time: int 23 build_id: str = '' 24 build_version: str = '' 25 build_flavor: str = '' 26 partitions: list[str] = field(default_factory=list) 27 28 def analyse_buildprop(self): 29 """ 30 Analyse the build's version info and partitions included 31 Then write them into the build_info 32 """ 33 def extract_info(pattern, lines): 34 # Try to match a regex in a list of string 35 line = list(filter(pattern.search, lines))[0] 36 if line: 37 return pattern.search(line).group(0) 38 else: 39 return '' 40 41 with zipfile.ZipFile(self.path) as build: 42 try: 43 with build.open('SYSTEM/build.prop', 'r') as build_prop: 44 raw_info = build_prop.readlines() 45 pattern_id = re.compile(b'(?<=ro\.build\.id\=).+') 46 pattern_version = re.compile( 47 b'(?<=ro\.build\.version\.incremental\=).+') 48 pattern_flavor = re.compile(b'(?<=ro\.build\.flavor\=).+') 49 self.build_id = extract_info( 50 pattern_id, raw_info).decode('utf-8') 51 self.build_version = extract_info( 52 pattern_version, raw_info).decode('utf-8') 53 self.build_flavor = extract_info( 54 pattern_flavor, raw_info).decode('utf-8') 55 with build.open('META/ab_partitions.txt', 'r') as partition_info: 56 raw_info = partition_info.readlines() 57 for line in raw_info: 58 self.partitions.append(line.decode('utf-8').rstrip()) 59 except KeyError as e: 60 raise BuildFileInvalidError("Invalid build due to " + str(e)) 61 62 def to_sql_form_dict(self): 63 """ 64 Because sqlite can only store text but self.partitions is a list 65 Turn the list into a string joined by ',', for example: 66 ['system', 'vendor'] => 'system,vendor' 67 """ 68 sql_form_dict = asdict(self) 69 sql_form_dict['partitions'] = ','.join(sql_form_dict['partitions']) 70 return sql_form_dict 71 72 def to_dict(self): 73 """ 74 Return as a normal dict. 75 """ 76 return asdict(self) 77 78 79class TargetLib: 80 """ 81 A class that manages the builds in database. 82 """ 83 84 def __init__(self, working_dir="target", db_path=None): 85 """ 86 Create a build table if not existing 87 """ 88 self.working_dir = working_dir 89 if db_path is None: 90 db_path = os.path.join(working_dir, "ota_database.db") 91 self.db_path = db_path 92 with sqlite3.connect(self.db_path) as connect: 93 cursor = connect.cursor() 94 cursor.execute(""" 95 CREATE TABLE if not exists Builds ( 96 FileName TEXT, 97 UploadTime INTEGER, 98 Path TEXT, 99 BuildID TEXT, 100 BuildVersion TEXT, 101 BuildFlavor TEXT, 102 Partitions TEXT 103 ) 104 """) 105 106 def new_build(self, filename, path): 107 """ 108 Insert a new build into the database 109 Args: 110 filename: the name of the file 111 path: the relative path of the file 112 """ 113 build_info = BuildInfo(filename, path, int(time.time())) 114 build_info.analyse_buildprop() 115 # Ignore name specified by user, instead use a standard format 116 build_info.path = os.path.join(self.working_dir, "{}-{}-{}.zip".format( 117 build_info.build_flavor, build_info.build_id, build_info.build_version)) 118 if path != build_info.path: 119 os.rename(path, build_info.path) 120 with sqlite3.connect(self.db_path) as connect: 121 cursor = connect.cursor() 122 cursor.execute(""" 123 SELECT * FROM Builds WHERE FileName=:file_name and Path=:path 124 """, build_info.to_sql_form_dict()) 125 if cursor.fetchall(): 126 cursor.execute(""" 127 DELETE FROM Builds WHERE FileName=:file_name and Path=:path 128 """, build_info.to_sql_form_dict()) 129 cursor.execute(""" 130 INSERT INTO Builds (FileName, UploadTime, Path, BuildID, BuildVersion, BuildFlavor, Partitions) 131 VALUES (:file_name, :time, :path, :build_id, :build_version, :build_flavor, :partitions) 132 """, build_info.to_sql_form_dict()) 133 134 def new_build_from_dir(self): 135 """ 136 Update the database using files under a directory 137 Args: 138 path: a directory 139 """ 140 build_dir = self.working_dir 141 if os.path.isdir(build_dir): 142 builds_name = os.listdir(build_dir) 143 for build_name in builds_name: 144 path = os.path.join(build_dir, build_name) 145 if build_name.endswith(".zip") and zipfile.is_zipfile(path): 146 self.new_build(build_name, path) 147 elif os.path.isfile(build_dir) and build_dir.endswith(".zip"): 148 self.new_build(os.path.split(build_dir)[-1], build_dir) 149 return self.get_builds() 150 151 def sql_to_buildinfo(self, row): 152 build_info = BuildInfo(*row[:6], row[6].split(',')) 153 return build_info 154 155 def get_builds(self): 156 """ 157 Get a list of builds in the database 158 Return: 159 A list of build_info, each of which is an object: 160 (FileName, UploadTime, Path, Build ID, Build Version, Build Flavor, Partitions) 161 """ 162 with sqlite3.connect(self.db_path) as connect: 163 cursor = connect.cursor() 164 cursor.execute(""" 165 SELECT FileName, Path, UploadTime, BuildID, BuildVersion, BuildFlavor, Partitions 166 FROM Builds""") 167 return list(map(self.sql_to_buildinfo, cursor.fetchall())) 168 169 def get_build_by_path(self, path): 170 """ 171 Get a build in the database by its path 172 Return: 173 A build_info, which is an object: 174 (FileName, UploadTime, Path, Build ID, Build Version, Build Flavor, Partitions) 175 """ 176 with sqlite3.connect(self.db_path) as connect: 177 cursor = connect.cursor() 178 cursor.execute(""" 179 SELECT FileName, Path, UploadTime, BuildID, BuildVersion, BuildFlavor, Partitions 180 FROM Builds WHERE Path==(?) 181 """, (path, )) 182 return self.sql_to_buildinfo(cursor.fetchone()) 183