• Home
  • History
  • Annotate
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2#
3# Copyright (C) 2019 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16"""deapexer is a tool that prints out content of an APEX.
17
18To print content of an APEX to stdout:
19  deapexer list foo.apex
20
21To extract content of an APEX to the given directory:
22  deapexer extract foo.apex dest
23"""
24from __future__ import print_function
25
26import argparse
27import apex_manifest
28import enum
29import os
30import shutil
31import sys
32import subprocess
33import tempfile
34import zipfile
35
36BLOCK_SIZE = 4096
37
38class ApexImageEntry(object):
39
40  def __init__(self, name, base_dir, permissions, size, ino, extents, is_directory=False,
41               is_symlink=False):
42    self._name = name
43    self._base_dir = base_dir
44    self._permissions = permissions
45    self._size = size
46    self._is_directory = is_directory
47    self._is_symlink = is_symlink
48    self._ino = ino
49    self._extents = extents
50
51  @property
52  def name(self):
53    return self._name
54
55  @property
56  def full_path(self):
57    return os.path.join(self._base_dir, self._name)
58
59  @property
60  def is_directory(self):
61    return self._is_directory
62
63  @property
64  def is_symlink(self):
65    return self._is_symlink
66
67  @property
68  def is_regular_file(self):
69    return not self.is_directory and not self.is_symlink
70
71  @property
72  def permissions(self):
73    return self._permissions
74
75  @property
76  def size(self):
77    return self._size
78
79  @property
80  def ino(self):
81    return self._ino
82
83  @property
84  def extents(self):
85    return self._extents
86
87  def __str__(self):
88    ret = ''
89    if self._is_directory:
90      ret += 'd'
91    elif self._is_symlink:
92      ret += 'l'
93    else:
94      ret += '-'
95
96    def mask_as_string(m):
97      ret = 'r' if m & 4 == 4 else '-'
98      ret += 'w' if m & 2 == 2 else '-'
99      ret += 'x' if m & 1 == 1 else '-'
100      return ret
101
102    ret += mask_as_string(self._permissions >> 6)
103    ret += mask_as_string((self._permissions >> 3) & 7)
104    ret += mask_as_string(self._permissions & 7)
105
106    return ret + ' ' + self._size + ' ' + self._name
107
108
109class ApexImageDirectory(object):
110
111  def __init__(self, path, entries, apex):
112    self._path = path
113    self._entries = sorted(entries, key=lambda e: e.name)
114    self._apex = apex
115
116  def list(self, is_recursive=False):
117    for e in self._entries:
118      yield e
119      if e.is_directory and e.name != '.' and e.name != '..':
120        for ce in self.enter_subdir(e).list(is_recursive):
121          yield ce
122
123  def enter_subdir(self, entry):
124    return self._apex._list(self._path + entry.name + '/')
125
126  def extract(self, dest):
127    path = self._path
128    self._apex._extract(self._path, dest)
129
130
131class Apex(object):
132
133  def __init__(self, args):
134    self._debugfs = args.debugfs_path
135    self._apex = args.apex
136    self._tempdir = tempfile.mkdtemp()
137    # TODO(b/139125405): support flattened APEXes.
138    with zipfile.ZipFile(self._apex, 'r') as zip_ref:
139      self._payload = zip_ref.extract('apex_payload.img', path=self._tempdir)
140    self._cache = {}
141
142  def __del__(self):
143    shutil.rmtree(self._tempdir)
144
145  def __enter__(self):
146    return self._list('./')
147
148  def __exit__(self, type, value, traceback):
149    pass
150
151  def _list(self, path):
152    if path in self._cache:
153      return self._cache[path]
154    process = subprocess.Popen([self._debugfs, '-R', 'ls -l -p %s' % path, self._payload],
155                               stdout=subprocess.PIPE, stderr=subprocess.PIPE,
156                               universal_newlines=True)
157    stdout, _ = process.communicate()
158    res = str(stdout)
159    entries = []
160    for line in res.split('\n'):
161      if not line:
162        continue
163      parts = line.split('/')
164      if len(parts) != 8:
165        continue
166      name = parts[5]
167      if not name:
168        continue
169      ino = parts[1]
170      bits = parts[2]
171      size = parts[6]
172      extents = []
173      is_symlink = bits[1]=='2'
174      is_directory=bits[1]=='4'
175
176      if not is_symlink and not is_directory:
177        process = subprocess.Popen([self._debugfs, '-R', 'dump_extents <%s>' % ino,
178                                    self._payload], stdout=subprocess.PIPE, stderr=subprocess.PIPE,
179                                    universal_newlines=True)
180        stdout, _ = process.communicate()
181        # Output of dump_extents for an inode fragmented in 3 blocks (length and addresses represent
182        # block-sized sections):
183        # Level Entries       Logical      Physical Length Flags
184        # 0/ 0   1/  3     0 -     0    18 -    18      1
185        # 0/ 0   2/  3     1 -    15    20 -    34     15
186        # 0/ 0   3/  3    16 -  1863    37 -  1884   1848
187        res = str(stdout).splitlines()
188        res.pop(0) # the first line contains only columns names
189        left_length = int(size)
190        try: # dump_extents sometimes has an unexpected output
191          for line in res:
192            tokens = line.split()
193            offset = int(tokens[7]) * BLOCK_SIZE
194            length = min(int(tokens[-1]) * BLOCK_SIZE, left_length)
195            left_length -= length
196            extents.append((offset, length))
197          if (left_length != 0): # dump_extents sometimes fails to display "hole" blocks
198            raise ValueError
199        except:
200          extents = [] # [] means that we failed to retrieve the file location successfully
201
202      entries.append(ApexImageEntry(name, base_dir=path, permissions=int(bits[3:], 8), size=size,
203                                    is_directory=is_directory, is_symlink=is_symlink, ino=ino,
204                                    extents=extents))
205
206    return ApexImageDirectory(path, entries, self)
207
208  def _extract(self, path, dest):
209    process = subprocess.Popen([self._debugfs, '-R', 'rdump %s %s' % (path, dest), self._payload],
210                               stdout=subprocess.PIPE, stderr=subprocess.PIPE,
211                               universal_newlines=True)
212    _, stderr = process.communicate()
213    if process.returncode != 0:
214      print(stderr, file=sys.stderr)
215
216
217def RunList(args):
218  if GetType(args.apex) == ApexType.COMPRESSED:
219    with tempfile.TemporaryDirectory() as temp:
220      decompressed_apex = os.path.join(temp, 'temp.apex')
221      decompress(args.apex, decompressed_apex)
222      args.apex = decompressed_apex
223
224      RunList(args)
225      return
226
227  with Apex(args) as apex:
228    for e in apex.list(is_recursive=True):
229      if e.is_directory:
230        continue
231      res = ''
232      if args.size:
233        res += e.size + ' '
234      res += e.full_path
235      if args.extents:
236        res += ' [' + '-'.join(str(x) for x in e.extents) + ']'
237      print(res)
238
239
240def RunExtract(args):
241  if GetType(args.apex) == ApexType.COMPRESSED:
242    with tempfile.TemporaryDirectory() as temp:
243      decompressed_apex = os.path.join(temp, "temp.apex")
244      decompress(args.apex, decompressed_apex)
245      args.apex = decompressed_apex
246
247      RunExtract(args)
248      return
249
250  with Apex(args) as apex:
251    if not os.path.exists(args.dest):
252      os.makedirs(args.dest, mode=0o755)
253    apex.extract(args.dest)
254    shutil.rmtree(os.path.join(args.dest, "lost+found"))
255
256
257class ApexType(enum.Enum):
258  INVALID = 0
259  UNCOMPRESSED = 1
260  COMPRESSED = 2
261
262
263def GetType(apex_path):
264  with zipfile.ZipFile(apex_path, 'r') as zip_file:
265    names = zip_file.namelist()
266    has_payload = 'apex_payload.img' in names
267    has_original_apex = 'original_apex' in names
268    if has_payload and has_original_apex:
269      return ApexType.INVALID
270    if has_payload:
271      return ApexType.UNCOMPRESSED
272    if has_original_apex:
273      return ApexType.COMPRESSED
274    return ApexType.INVALID
275
276
277def RunInfo(args):
278  if args.print_type:
279    res = GetType(args.apex)
280    if res == ApexType.INVALID:
281      print(args.apex + ' is not a valid apex')
282      sys.exit(1)
283    print(res.name)
284  else:
285    manifest = apex_manifest.fromApex(args.apex)
286    print(apex_manifest.toJsonString(manifest))
287
288
289def RunDecompress(args):
290  """RunDecompress takes path to compressed APEX and decompresses it to
291  produce the original uncompressed APEX at give output path
292
293  See apex_compression_tool.py#RunCompress for details on compressed APEX
294  structure.
295
296  Args:
297      args.input: file path to compressed APEX
298      args.output: file path to where decompressed APEX will be placed
299  """
300  compressed_apex_fp = args.input
301  decompressed_apex_fp = args.output
302  return decompress(compressed_apex_fp, decompressed_apex_fp)
303
304def decompress(compressed_apex_fp, decompressed_apex_fp):
305  if os.path.exists(decompressed_apex_fp):
306    print("Output path '" + decompressed_apex_fp + "' already exists")
307    sys.exit(1)
308
309  with zipfile.ZipFile(compressed_apex_fp, 'r') as zip_obj:
310    if 'original_apex' not in zip_obj.namelist():
311      print(compressed_apex_fp + ' is not a compressed APEX. Missing '
312                                 "'original_apex' file inside it.")
313      sys.exit(1)
314    # Rename original_apex file to what user provided as output filename
315    original_apex_info = zip_obj.getinfo('original_apex')
316    original_apex_info.filename = os.path.basename(decompressed_apex_fp)
317    # Extract the original_apex as desired name
318    zip_obj.extract(original_apex_info,
319                    path=os.path.dirname(decompressed_apex_fp))
320
321
322def main(argv):
323  parser = argparse.ArgumentParser()
324
325  debugfs_default = None
326  if 'ANDROID_HOST_OUT' in os.environ:
327    debugfs_default = '%s/bin/debugfs_static' % os.environ['ANDROID_HOST_OUT']
328  parser.add_argument('--debugfs_path', help='The path to debugfs binary', default=debugfs_default)
329
330  subparsers = parser.add_subparsers(required=True, dest='cmd')
331
332  parser_list = subparsers.add_parser('list', help='prints content of an APEX to stdout')
333  parser_list.add_argument('apex', type=str, help='APEX file')
334  parser_list.add_argument('--size', help='also show the size of the files', action="store_true")
335  parser_list.add_argument('--extents', help='also show the location of the files', action="store_true")
336  parser_list.set_defaults(func=RunList)
337
338  parser_extract = subparsers.add_parser('extract', help='extracts content of an APEX to the given '
339                                                         'directory')
340  parser_extract.add_argument('apex', type=str, help='APEX file')
341  parser_extract.add_argument('dest', type=str, help='Directory to extract content of APEX to')
342  parser_extract.set_defaults(func=RunExtract)
343
344  parser_info = subparsers.add_parser('info', help='prints APEX manifest')
345  parser_info.add_argument('apex', type=str, help='APEX file')
346  parser_info.add_argument('--print-type',
347                           help='Prints type of the apex (COMPRESSED or UNCOMPRESSED)',
348                           action='store_true')
349  parser_info.set_defaults(func=RunInfo)
350
351  # Handle sub-command "decompress"
352  parser_decompress = subparsers.add_parser('decompress',
353                                            help='decompresses a compressed '
354                                                 'APEX')
355  parser_decompress.add_argument('--input', type=str, required=True,
356                                 help='path to compressed APEX file that '
357                                      'will be decompressed')
358  parser_decompress.add_argument('--output', type=str, required=True,
359                                 help='output directory path where '
360                                      'decompressed APEX will be extracted')
361  parser_decompress.set_defaults(func=RunDecompress)
362
363  args = parser.parse_args(argv)
364
365  debugfs_required_for_cmd = ['list', 'extract']
366  if args.cmd in debugfs_required_for_cmd and not args.debugfs_path:
367    print('ANDROID_HOST_OUT environment variable is not defined, --debugfs_path must be set',
368          file=sys.stderr)
369    sys.exit(1)
370
371  args.func(args)
372
373
374if __name__ == '__main__':
375  main(sys.argv[1:])
376