1#!/usr/bin/env python 2# 3# Copyright (C) 2019 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16"""deapexer is a tool that prints out content of an APEX. 17 18To print content of an APEX to stdout: 19 deapexer list foo.apex 20 21To extract content of an APEX to the given directory: 22 deapexer extract foo.apex dest 23""" 24from __future__ import print_function 25 26import argparse 27import apex_manifest 28import enum 29import os 30import shutil 31import sys 32import subprocess 33import tempfile 34import zipfile 35 36BLOCK_SIZE = 4096 37 38class ApexImageEntry(object): 39 40 def __init__(self, name, base_dir, permissions, size, ino, extents, is_directory=False, 41 is_symlink=False): 42 self._name = name 43 self._base_dir = base_dir 44 self._permissions = permissions 45 self._size = size 46 self._is_directory = is_directory 47 self._is_symlink = is_symlink 48 self._ino = ino 49 self._extents = extents 50 51 @property 52 def name(self): 53 return self._name 54 55 @property 56 def full_path(self): 57 return os.path.join(self._base_dir, self._name) 58 59 @property 60 def is_directory(self): 61 return self._is_directory 62 63 @property 64 def is_symlink(self): 65 return self._is_symlink 66 67 @property 68 def is_regular_file(self): 69 return not self.is_directory and not self.is_symlink 70 71 @property 72 def permissions(self): 73 return self._permissions 74 75 @property 76 def size(self): 77 return self._size 78 79 @property 80 def ino(self): 81 return self._ino 82 83 @property 84 def extents(self): 85 return self._extents 86 87 def __str__(self): 88 ret = '' 89 if self._is_directory: 90 ret += 'd' 91 elif self._is_symlink: 92 ret += 'l' 93 else: 94 ret += '-' 95 96 def mask_as_string(m): 97 ret = 'r' if m & 4 == 4 else '-' 98 ret += 'w' if m & 2 == 2 else '-' 99 ret += 'x' if m & 1 == 1 else '-' 100 return ret 101 102 ret += mask_as_string(self._permissions >> 6) 103 ret += mask_as_string((self._permissions >> 3) & 7) 104 ret += mask_as_string(self._permissions & 7) 105 106 return ret + ' ' + self._size + ' ' + self._name 107 108 109class ApexImageDirectory(object): 110 111 def __init__(self, path, entries, apex): 112 self._path = path 113 self._entries = sorted(entries, key=lambda e: e.name) 114 self._apex = apex 115 116 def list(self, is_recursive=False): 117 for e in self._entries: 118 yield e 119 if e.is_directory and e.name != '.' and e.name != '..': 120 for ce in self.enter_subdir(e).list(is_recursive): 121 yield ce 122 123 def enter_subdir(self, entry): 124 return self._apex._list(self._path + entry.name + '/') 125 126 def extract(self, dest): 127 path = self._path 128 self._apex._extract(self._path, dest) 129 130 131class Apex(object): 132 133 def __init__(self, args): 134 self._debugfs = args.debugfs_path 135 self._apex = args.apex 136 self._tempdir = tempfile.mkdtemp() 137 # TODO(b/139125405): support flattened APEXes. 138 with zipfile.ZipFile(self._apex, 'r') as zip_ref: 139 self._payload = zip_ref.extract('apex_payload.img', path=self._tempdir) 140 self._cache = {} 141 142 def __del__(self): 143 shutil.rmtree(self._tempdir) 144 145 def __enter__(self): 146 return self._list('./') 147 148 def __exit__(self, type, value, traceback): 149 pass 150 151 def _list(self, path): 152 if path in self._cache: 153 return self._cache[path] 154 process = subprocess.Popen([self._debugfs, '-R', 'ls -l -p %s' % path, self._payload], 155 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 156 universal_newlines=True) 157 stdout, _ = process.communicate() 158 res = str(stdout) 159 entries = [] 160 for line in res.split('\n'): 161 if not line: 162 continue 163 parts = line.split('/') 164 if len(parts) != 8: 165 continue 166 name = parts[5] 167 if not name: 168 continue 169 ino = parts[1] 170 bits = parts[2] 171 size = parts[6] 172 extents = [] 173 is_symlink = bits[1]=='2' 174 is_directory=bits[1]=='4' 175 176 if not is_symlink and not is_directory: 177 process = subprocess.Popen([self._debugfs, '-R', 'dump_extents <%s>' % ino, 178 self._payload], stdout=subprocess.PIPE, stderr=subprocess.PIPE, 179 universal_newlines=True) 180 stdout, _ = process.communicate() 181 # Output of dump_extents for an inode fragmented in 3 blocks (length and addresses represent 182 # block-sized sections): 183 # Level Entries Logical Physical Length Flags 184 # 0/ 0 1/ 3 0 - 0 18 - 18 1 185 # 0/ 0 2/ 3 1 - 15 20 - 34 15 186 # 0/ 0 3/ 3 16 - 1863 37 - 1884 1848 187 res = str(stdout).splitlines() 188 res.pop(0) # the first line contains only columns names 189 left_length = int(size) 190 try: # dump_extents sometimes has an unexpected output 191 for line in res: 192 tokens = line.split() 193 offset = int(tokens[7]) * BLOCK_SIZE 194 length = min(int(tokens[-1]) * BLOCK_SIZE, left_length) 195 left_length -= length 196 extents.append((offset, length)) 197 if (left_length != 0): # dump_extents sometimes fails to display "hole" blocks 198 raise ValueError 199 except: 200 extents = [] # [] means that we failed to retrieve the file location successfully 201 202 entries.append(ApexImageEntry(name, base_dir=path, permissions=int(bits[3:], 8), size=size, 203 is_directory=is_directory, is_symlink=is_symlink, ino=ino, 204 extents=extents)) 205 206 return ApexImageDirectory(path, entries, self) 207 208 def _extract(self, path, dest): 209 process = subprocess.Popen([self._debugfs, '-R', 'rdump %s %s' % (path, dest), self._payload], 210 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 211 universal_newlines=True) 212 _, stderr = process.communicate() 213 if process.returncode != 0: 214 print(stderr, file=sys.stderr) 215 216 217def RunList(args): 218 if GetType(args.apex) == ApexType.COMPRESSED: 219 with tempfile.TemporaryDirectory() as temp: 220 decompressed_apex = os.path.join(temp, 'temp.apex') 221 decompress(args.apex, decompressed_apex) 222 args.apex = decompressed_apex 223 224 RunList(args) 225 return 226 227 with Apex(args) as apex: 228 for e in apex.list(is_recursive=True): 229 if e.is_directory: 230 continue 231 res = '' 232 if args.size: 233 res += e.size + ' ' 234 res += e.full_path 235 if args.extents: 236 res += ' [' + '-'.join(str(x) for x in e.extents) + ']' 237 print(res) 238 239 240def RunExtract(args): 241 if GetType(args.apex) == ApexType.COMPRESSED: 242 with tempfile.TemporaryDirectory() as temp: 243 decompressed_apex = os.path.join(temp, "temp.apex") 244 decompress(args.apex, decompressed_apex) 245 args.apex = decompressed_apex 246 247 RunExtract(args) 248 return 249 250 with Apex(args) as apex: 251 if not os.path.exists(args.dest): 252 os.makedirs(args.dest, mode=0o755) 253 apex.extract(args.dest) 254 shutil.rmtree(os.path.join(args.dest, "lost+found")) 255 256 257class ApexType(enum.Enum): 258 INVALID = 0 259 UNCOMPRESSED = 1 260 COMPRESSED = 2 261 262 263def GetType(apex_path): 264 with zipfile.ZipFile(apex_path, 'r') as zip_file: 265 names = zip_file.namelist() 266 has_payload = 'apex_payload.img' in names 267 has_original_apex = 'original_apex' in names 268 if has_payload and has_original_apex: 269 return ApexType.INVALID 270 if has_payload: 271 return ApexType.UNCOMPRESSED 272 if has_original_apex: 273 return ApexType.COMPRESSED 274 return ApexType.INVALID 275 276 277def RunInfo(args): 278 if args.print_type: 279 res = GetType(args.apex) 280 if res == ApexType.INVALID: 281 print(args.apex + ' is not a valid apex') 282 sys.exit(1) 283 print(res.name) 284 else: 285 manifest = apex_manifest.fromApex(args.apex) 286 print(apex_manifest.toJsonString(manifest)) 287 288 289def RunDecompress(args): 290 """RunDecompress takes path to compressed APEX and decompresses it to 291 produce the original uncompressed APEX at give output path 292 293 See apex_compression_tool.py#RunCompress for details on compressed APEX 294 structure. 295 296 Args: 297 args.input: file path to compressed APEX 298 args.output: file path to where decompressed APEX will be placed 299 """ 300 compressed_apex_fp = args.input 301 decompressed_apex_fp = args.output 302 return decompress(compressed_apex_fp, decompressed_apex_fp) 303 304def decompress(compressed_apex_fp, decompressed_apex_fp): 305 if os.path.exists(decompressed_apex_fp): 306 print("Output path '" + decompressed_apex_fp + "' already exists") 307 sys.exit(1) 308 309 with zipfile.ZipFile(compressed_apex_fp, 'r') as zip_obj: 310 if 'original_apex' not in zip_obj.namelist(): 311 print(compressed_apex_fp + ' is not a compressed APEX. Missing ' 312 "'original_apex' file inside it.") 313 sys.exit(1) 314 # Rename original_apex file to what user provided as output filename 315 original_apex_info = zip_obj.getinfo('original_apex') 316 original_apex_info.filename = os.path.basename(decompressed_apex_fp) 317 # Extract the original_apex as desired name 318 zip_obj.extract(original_apex_info, 319 path=os.path.dirname(decompressed_apex_fp)) 320 321 322def main(argv): 323 parser = argparse.ArgumentParser() 324 325 debugfs_default = None 326 if 'ANDROID_HOST_OUT' in os.environ: 327 debugfs_default = '%s/bin/debugfs_static' % os.environ['ANDROID_HOST_OUT'] 328 parser.add_argument('--debugfs_path', help='The path to debugfs binary', default=debugfs_default) 329 330 subparsers = parser.add_subparsers(required=True, dest='cmd') 331 332 parser_list = subparsers.add_parser('list', help='prints content of an APEX to stdout') 333 parser_list.add_argument('apex', type=str, help='APEX file') 334 parser_list.add_argument('--size', help='also show the size of the files', action="store_true") 335 parser_list.add_argument('--extents', help='also show the location of the files', action="store_true") 336 parser_list.set_defaults(func=RunList) 337 338 parser_extract = subparsers.add_parser('extract', help='extracts content of an APEX to the given ' 339 'directory') 340 parser_extract.add_argument('apex', type=str, help='APEX file') 341 parser_extract.add_argument('dest', type=str, help='Directory to extract content of APEX to') 342 parser_extract.set_defaults(func=RunExtract) 343 344 parser_info = subparsers.add_parser('info', help='prints APEX manifest') 345 parser_info.add_argument('apex', type=str, help='APEX file') 346 parser_info.add_argument('--print-type', 347 help='Prints type of the apex (COMPRESSED or UNCOMPRESSED)', 348 action='store_true') 349 parser_info.set_defaults(func=RunInfo) 350 351 # Handle sub-command "decompress" 352 parser_decompress = subparsers.add_parser('decompress', 353 help='decompresses a compressed ' 354 'APEX') 355 parser_decompress.add_argument('--input', type=str, required=True, 356 help='path to compressed APEX file that ' 357 'will be decompressed') 358 parser_decompress.add_argument('--output', type=str, required=True, 359 help='output directory path where ' 360 'decompressed APEX will be extracted') 361 parser_decompress.set_defaults(func=RunDecompress) 362 363 args = parser.parse_args(argv) 364 365 debugfs_required_for_cmd = ['list', 'extract'] 366 if args.cmd in debugfs_required_for_cmd and not args.debugfs_path: 367 print('ANDROID_HOST_OUT environment variable is not defined, --debugfs_path must be set', 368 file=sys.stderr) 369 sys.exit(1) 370 371 args.func(args) 372 373 374if __name__ == '__main__': 375 main(sys.argv[1:]) 376