1from dataclasses import dataclass, asdict, field
2import sqlite3
3import time
4import logging
5import os
6import zipfile
7import re
8import json
9
10
11class BuildFileInvalidError(Exception):
12    pass
13
14
15@dataclass
16class BuildInfo:
17    """
18    A class for Android build information.
19    """
20    file_name: str
21    path: str
22    time: int
23    build_id: str = ''
24    build_version: str = ''
25    build_flavor: str = ''
26    partitions: list[str] = field(default_factory=list)
27
28    def analyse_buildprop(self):
29        """
30        Analyse the build's version info and partitions included
31        Then write them into the build_info
32        """
33        def extract_info(pattern, lines):
34            # Try to match a regex in a list of string
35            line = list(filter(pattern.search, lines))[0]
36            if line:
37                return pattern.search(line).group(0)
38            else:
39                return ''
40
41        with zipfile.ZipFile(self.path) as build:
42            try:
43                with build.open('SYSTEM/build.prop', 'r') as build_prop:
44                    raw_info = build_prop.readlines()
45                    pattern_id = re.compile(b'(?<=ro\.build\.id\=).+')
46                    pattern_version = re.compile(
47                        b'(?<=ro\.build\.version\.incremental\=).+')
48                    pattern_flavor = re.compile(b'(?<=ro\.build\.flavor\=).+')
49                    self.build_id = extract_info(
50                        pattern_id, raw_info).decode('utf-8')
51                    self.build_version = extract_info(
52                        pattern_version, raw_info).decode('utf-8')
53                    self.build_flavor = extract_info(
54                        pattern_flavor, raw_info).decode('utf-8')
55                with build.open('META/ab_partitions.txt', 'r') as partition_info:
56                    raw_info = partition_info.readlines()
57                    for line in raw_info:
58                        self.partitions.append(line.decode('utf-8').rstrip())
59            except KeyError as e:
60                raise BuildFileInvalidError("Invalid build due to " + str(e))
61
62    def to_sql_form_dict(self):
63        """
64        Because sqlite can only store text but self.partitions is a list
65        Turn the list into a string joined by ',', for example:
66        ['system', 'vendor'] => 'system,vendor'
67        """
68        sql_form_dict = asdict(self)
69        sql_form_dict['partitions'] = ','.join(sql_form_dict['partitions'])
70        return sql_form_dict
71
72    def to_dict(self):
73        """
74        Return as a normal dict.
75        """
76        return asdict(self)
77
78
79class TargetLib:
80    """
81    A class that manages the builds in database.
82    """
83
84    def __init__(self, working_dir="target", db_path=None):
85        """
86        Create a build table if not existing
87        """
88        self.working_dir = working_dir
89        if db_path is None:
90            db_path = os.path.join(working_dir, "ota_database.db")
91        self.db_path = db_path
92        with sqlite3.connect(self.db_path) as connect:
93            cursor = connect.cursor()
94            cursor.execute("""
95                CREATE TABLE if not exists Builds (
96                FileName TEXT,
97                UploadTime INTEGER,
98                Path TEXT,
99                BuildID TEXT,
100                BuildVersion TEXT,
101                BuildFlavor TEXT,
102                Partitions TEXT
103            )
104            """)
105
106    def new_build(self, filename, path):
107        """
108        Insert a new build into the database
109        Args:
110            filename: the name of the file
111            path: the relative path of the file
112        """
113        build_info = BuildInfo(filename, path, int(time.time()))
114        build_info.analyse_buildprop()
115        # Ignore name specified by user, instead use a standard format
116        build_info.path = os.path.join(self.working_dir, "{}-{}-{}.zip".format(
117            build_info.build_flavor, build_info.build_id, build_info.build_version))
118        if path != build_info.path:
119            os.rename(path, build_info.path)
120        with sqlite3.connect(self.db_path) as connect:
121            cursor = connect.cursor()
122            cursor.execute("""
123            SELECT * FROM Builds WHERE FileName=:file_name and Path=:path
124            """, build_info.to_sql_form_dict())
125            if cursor.fetchall():
126                cursor.execute("""
127                DELETE FROM Builds WHERE FileName=:file_name and Path=:path
128                """, build_info.to_sql_form_dict())
129            cursor.execute("""
130            INSERT INTO Builds (FileName, UploadTime, Path, BuildID, BuildVersion, BuildFlavor, Partitions)
131            VALUES (:file_name, :time, :path, :build_id, :build_version, :build_flavor, :partitions)
132            """, build_info.to_sql_form_dict())
133
134    def new_build_from_dir(self):
135        """
136        Update the database using files under a directory
137        Args:
138            path: a directory
139        """
140        build_dir = self.working_dir
141        if os.path.isdir(build_dir):
142            builds_name = os.listdir(build_dir)
143            for build_name in builds_name:
144                path = os.path.join(build_dir, build_name)
145                if build_name.endswith(".zip") and zipfile.is_zipfile(path):
146                    self.new_build(build_name, path)
147        elif os.path.isfile(build_dir) and build_dir.endswith(".zip"):
148            self.new_build(os.path.split(build_dir)[-1], build_dir)
149        return self.get_builds()
150
151    def sql_to_buildinfo(self, row):
152        build_info = BuildInfo(*row[:6], row[6].split(','))
153        return build_info
154
155    def get_builds(self):
156        """
157        Get a list of builds in the database
158        Return:
159            A list of build_info, each of which is an object:
160            (FileName, UploadTime, Path, Build ID, Build Version, Build Flavor, Partitions)
161        """
162        with sqlite3.connect(self.db_path) as connect:
163            cursor = connect.cursor()
164            cursor.execute("""
165            SELECT FileName, Path, UploadTime, BuildID, BuildVersion, BuildFlavor, Partitions
166            FROM Builds""")
167            return list(map(self.sql_to_buildinfo, cursor.fetchall()))
168
169    def get_build_by_path(self, path):
170        """
171        Get a build in the database by its path
172        Return:
173            A build_info, which is an object:
174            (FileName, UploadTime, Path, Build ID, Build Version, Build Flavor, Partitions)
175        """
176        with sqlite3.connect(self.db_path) as connect:
177            cursor = connect.cursor()
178            cursor.execute("""
179            SELECT FileName, Path, UploadTime, BuildID, BuildVersion, BuildFlavor, Partitions
180            FROM Builds WHERE Path==(?)
181            """, (path, ))
182        return self.sql_to_buildinfo(cursor.fetchone())
183