1#!/usr/bin/env python
2#
3# Copyright (C) 2009 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Check the signatures of all APKs in a target_files .zip file.  With
19-c, compare the signatures of each package to the ones in a separate
20target_files (usually a previously distributed build for the same
21device) and flag any changes.
22
23Usage:  check_target_file_signatures [flags] target_files
24
25  -c  (--compare_with)  <other_target_files>
26      Look for compatibility problems between the two sets of target
27      files (eg., packages whose keys have changed).
28
29  -l  (--local_cert_dirs)  <dir,dir,...>
30      Comma-separated list of top-level directories to scan for
31      .x509.pem files.  Defaults to "vendor,build".  Where cert files
32      can be found that match APK signatures, the filename will be
33      printed as the cert name, otherwise a hash of the cert plus its
34      subject string will be printed instead.
35
36  -t  (--text)
37      Dump the certificate information for both packages in comparison
38      mode (this output is normally suppressed).
39
40"""
41
42import os
43import re
44import subprocess
45import sys
46import zipfile
47
48import common
49
50if sys.hexversion < 0x02070000:
51  print >> sys.stderr, "Python 2.7 or newer is required."
52  sys.exit(1)
53
54
55# Work around a bug in Python's zipfile module that prevents opening of zipfiles
56# if any entry has an extra field of between 1 and 3 bytes (which is common with
57# zipaligned APKs). This overrides the ZipInfo._decodeExtra() method (which
58# contains the bug) with an empty version (since we don't need to decode the
59# extra field anyway).
60# Issue #14315: https://bugs.python.org/issue14315, fixed in Python 2.7.8 and
61# Python 3.5.0 alpha 1.
62class MyZipInfo(zipfile.ZipInfo):
63  def _decodeExtra(self):
64    pass
65zipfile.ZipInfo = MyZipInfo
66
67OPTIONS = common.OPTIONS
68
69OPTIONS.text = False
70OPTIONS.compare_with = None
71OPTIONS.local_cert_dirs = ("vendor", "build")
72
73PROBLEMS = []
74PROBLEM_PREFIX = []
75
76def AddProblem(msg):
77  PROBLEMS.append(" ".join(PROBLEM_PREFIX) + " " + msg)
78def Push(msg):
79  PROBLEM_PREFIX.append(msg)
80def Pop():
81  PROBLEM_PREFIX.pop()
82
83
84def Banner(msg):
85  print "-" * 70
86  print "  ", msg
87  print "-" * 70
88
89
90def GetCertSubject(cert):
91  p = common.Run(["openssl", "x509", "-inform", "DER", "-text"],
92                 stdin=subprocess.PIPE,
93                 stdout=subprocess.PIPE)
94  out, err = p.communicate(cert)
95  if err and not err.strip():
96    return "(error reading cert subject)"
97  for line in out.split("\n"):
98    line = line.strip()
99    if line.startswith("Subject:"):
100      return line[8:].strip()
101  return "(unknown cert subject)"
102
103
104class CertDB(object):
105  def __init__(self):
106    self.certs = {}
107
108  def Add(self, cert, name=None):
109    if cert in self.certs:
110      if name:
111        self.certs[cert] = self.certs[cert] + "," + name
112    else:
113      if name is None:
114        name = "unknown cert %s (%s)" % (common.sha1(cert).hexdigest()[:12],
115                                         GetCertSubject(cert))
116      self.certs[cert] = name
117
118  def Get(self, cert):
119    """Return the name for a given cert."""
120    return self.certs.get(cert, None)
121
122  def FindLocalCerts(self):
123    to_load = []
124    for top in OPTIONS.local_cert_dirs:
125      for dirpath, _, filenames in os.walk(top):
126        certs = [os.path.join(dirpath, i)
127                 for i in filenames if i.endswith(".x509.pem")]
128        if certs:
129          to_load.extend(certs)
130
131    for i in to_load:
132      f = open(i)
133      cert = common.ParseCertificate(f.read())
134      f.close()
135      name, _ = os.path.splitext(i)
136      name, _ = os.path.splitext(name)
137      self.Add(cert, name)
138
139ALL_CERTS = CertDB()
140
141
142def CertFromPKCS7(data, filename):
143  """Read the cert out of a PKCS#7-format file (which is what is
144  stored in a signed .apk)."""
145  Push(filename + ":")
146  try:
147    p = common.Run(["openssl", "pkcs7",
148                    "-inform", "DER",
149                    "-outform", "PEM",
150                    "-print_certs"],
151                   stdin=subprocess.PIPE,
152                   stdout=subprocess.PIPE)
153    out, err = p.communicate(data)
154    if err and not err.strip():
155      AddProblem("error reading cert:\n" + err)
156      return None
157
158    cert = common.ParseCertificate(out)
159    if not cert:
160      AddProblem("error parsing cert output")
161      return None
162    return cert
163  finally:
164    Pop()
165
166
167class APK(object):
168  def __init__(self, full_filename, filename):
169    self.filename = filename
170    self.certs = None
171    self.shared_uid = None
172    self.package = None
173
174    Push(filename+":")
175    try:
176      self.RecordCerts(full_filename)
177      self.ReadManifest(full_filename)
178    finally:
179      Pop()
180
181  def RecordCerts(self, full_filename):
182    out = set()
183    try:
184      f = open(full_filename)
185      apk = zipfile.ZipFile(f, "r")
186      pkcs7 = None
187      for info in apk.infolist():
188        if info.filename.startswith("META-INF/") and \
189           (info.filename.endswith(".DSA") or info.filename.endswith(".RSA")):
190          pkcs7 = apk.read(info.filename)
191          cert = CertFromPKCS7(pkcs7, info.filename)
192          out.add(cert)
193          ALL_CERTS.Add(cert)
194      if not pkcs7:
195        AddProblem("no signature")
196    finally:
197      f.close()
198      self.certs = frozenset(out)
199
200  def ReadManifest(self, full_filename):
201    p = common.Run(["aapt", "dump", "xmltree", full_filename,
202                    "AndroidManifest.xml"],
203                   stdout=subprocess.PIPE)
204    manifest, err = p.communicate()
205    if err:
206      AddProblem("failed to read manifest")
207      return
208
209    self.shared_uid = None
210    self.package = None
211
212    for line in manifest.split("\n"):
213      line = line.strip()
214      m = re.search(r'A: (\S*?)(?:\(0x[0-9a-f]+\))?="(.*?)" \(Raw', line)
215      if m:
216        name = m.group(1)
217        if name == "android:sharedUserId":
218          if self.shared_uid is not None:
219            AddProblem("multiple sharedUserId declarations")
220          self.shared_uid = m.group(2)
221        elif name == "package":
222          if self.package is not None:
223            AddProblem("multiple package declarations")
224          self.package = m.group(2)
225
226    if self.package is None:
227      AddProblem("no package declaration")
228
229
230class TargetFiles(object):
231  def __init__(self):
232    self.max_pkg_len = 30
233    self.max_fn_len = 20
234    self.apks = None
235    self.apks_by_basename = None
236    self.certmap = None
237
238  def LoadZipFile(self, filename):
239    # First read the APK certs file to figure out whether there are compressed
240    # APKs in the archive. If we do have compressed APKs in the archive, then we
241    # must decompress them individually before we perform any analysis.
242
243    # This is the list of wildcards of files we extract from |filename|.
244    apk_extensions = ['*.apk']
245
246    self.certmap, compressed_extension = common.ReadApkCerts(
247        zipfile.ZipFile(filename, "r"))
248    if compressed_extension:
249      apk_extensions.append("*.apk" + compressed_extension)
250
251    d = common.UnzipTemp(filename, apk_extensions)
252    self.apks = {}
253    self.apks_by_basename = {}
254    for dirpath, _, filenames in os.walk(d):
255      for fn in filenames:
256        # Decompress compressed APKs before we begin processing them.
257        if compressed_extension and fn.endswith(compressed_extension):
258          # First strip the compressed extension from the file.
259          uncompressed_fn = fn[:-len(compressed_extension)]
260
261          # Decompress the compressed file to the output file.
262          common.Gunzip(os.path.join(dirpath, fn),
263                        os.path.join(dirpath, uncompressed_fn))
264
265          # Finally, delete the compressed file and use the uncompressed file
266          # for further processing. Note that the deletion is not strictly
267          # required, but is done here to ensure that we're not using too much
268          # space in the temporary directory.
269          os.remove(os.path.join(dirpath, fn))
270          fn = uncompressed_fn
271
272        if fn.endswith(".apk"):
273          fullname = os.path.join(dirpath, fn)
274          displayname = fullname[len(d)+1:]
275          apk = APK(fullname, displayname)
276          self.apks[apk.filename] = apk
277          self.apks_by_basename[os.path.basename(apk.filename)] = apk
278
279          self.max_pkg_len = max(self.max_pkg_len, len(apk.package))
280          self.max_fn_len = max(self.max_fn_len, len(apk.filename))
281
282  def CheckSharedUids(self):
283    """Look for any instances where packages signed with different
284    certs request the same sharedUserId."""
285    apks_by_uid = {}
286    for apk in self.apks.itervalues():
287      if apk.shared_uid:
288        apks_by_uid.setdefault(apk.shared_uid, []).append(apk)
289
290    for uid in sorted(apks_by_uid):
291      apks = apks_by_uid[uid]
292      for apk in apks[1:]:
293        if apk.certs != apks[0].certs:
294          break
295      else:
296        # all packages have the same set of certs; this uid is fine.
297        continue
298
299      AddProblem("different cert sets for packages with uid %s" % (uid,))
300
301      print "uid %s is shared by packages with different cert sets:" % (uid,)
302      for apk in apks:
303        print "%-*s  [%s]" % (self.max_pkg_len, apk.package, apk.filename)
304        for cert in apk.certs:
305          print "   ", ALL_CERTS.Get(cert)
306      print
307
308  def CheckExternalSignatures(self):
309    for apk_filename, certname in self.certmap.iteritems():
310      if certname == "EXTERNAL":
311        # Apps marked EXTERNAL should be signed with the test key
312        # during development, then manually re-signed after
313        # predexopting.  Consider it an error if this app is now
314        # signed with any key that is present in our tree.
315        apk = self.apks_by_basename[apk_filename]
316        name = ALL_CERTS.Get(apk.cert)
317        if not name.startswith("unknown "):
318          Push(apk.filename)
319          AddProblem("hasn't been signed with EXTERNAL cert")
320          Pop()
321
322  def PrintCerts(self):
323    """Display a table of packages grouped by cert."""
324    by_cert = {}
325    for apk in self.apks.itervalues():
326      for cert in apk.certs:
327        by_cert.setdefault(cert, []).append((apk.package, apk))
328
329    order = [(-len(v), k) for (k, v) in by_cert.iteritems()]
330    order.sort()
331
332    for _, cert in order:
333      print "%s:" % (ALL_CERTS.Get(cert),)
334      apks = by_cert[cert]
335      apks.sort()
336      for _, apk in apks:
337        if apk.shared_uid:
338          print "  %-*s  %-*s  [%s]" % (self.max_fn_len, apk.filename,
339                                        self.max_pkg_len, apk.package,
340                                        apk.shared_uid)
341        else:
342          print "  %-*s  %s" % (self.max_fn_len, apk.filename, apk.package)
343      print
344
345  def CompareWith(self, other):
346    """Look for instances where a given package that exists in both
347    self and other have different certs."""
348
349    all_apks = set(self.apks.keys())
350    all_apks.update(other.apks.keys())
351
352    max_pkg_len = max(self.max_pkg_len, other.max_pkg_len)
353
354    by_certpair = {}
355
356    for i in all_apks:
357      if i in self.apks:
358        if i in other.apks:
359          # in both; should have same set of certs
360          if self.apks[i].certs != other.apks[i].certs:
361            by_certpair.setdefault((other.apks[i].certs,
362                                    self.apks[i].certs), []).append(i)
363        else:
364          print "%s [%s]: new APK (not in comparison target_files)" % (
365              i, self.apks[i].filename)
366      else:
367        if i in other.apks:
368          print "%s [%s]: removed APK (only in comparison target_files)" % (
369              i, other.apks[i].filename)
370
371    if by_certpair:
372      AddProblem("some APKs changed certs")
373      Banner("APK signing differences")
374      for (old, new), packages in sorted(by_certpair.items()):
375        for i, o in enumerate(old):
376          if i == 0:
377            print "was", ALL_CERTS.Get(o)
378          else:
379            print "   ", ALL_CERTS.Get(o)
380        for i, n in enumerate(new):
381          if i == 0:
382            print "now", ALL_CERTS.Get(n)
383          else:
384            print "   ", ALL_CERTS.Get(n)
385        for i in sorted(packages):
386          old_fn = other.apks[i].filename
387          new_fn = self.apks[i].filename
388          if old_fn == new_fn:
389            print "  %-*s  [%s]" % (max_pkg_len, i, old_fn)
390          else:
391            print "  %-*s  [was: %s; now: %s]" % (max_pkg_len, i,
392                                                  old_fn, new_fn)
393        print
394
395
396def main(argv):
397  def option_handler(o, a):
398    if o in ("-c", "--compare_with"):
399      OPTIONS.compare_with = a
400    elif o in ("-l", "--local_cert_dirs"):
401      OPTIONS.local_cert_dirs = [i.strip() for i in a.split(",")]
402    elif o in ("-t", "--text"):
403      OPTIONS.text = True
404    else:
405      return False
406    return True
407
408  args = common.ParseOptions(argv, __doc__,
409                             extra_opts="c:l:t",
410                             extra_long_opts=["compare_with=",
411                                              "local_cert_dirs="],
412                             extra_option_handler=option_handler)
413
414  if len(args) != 1:
415    common.Usage(__doc__)
416    sys.exit(1)
417
418  ALL_CERTS.FindLocalCerts()
419
420  Push("input target_files:")
421  try:
422    target_files = TargetFiles()
423    target_files.LoadZipFile(args[0])
424  finally:
425    Pop()
426
427  compare_files = None
428  if OPTIONS.compare_with:
429    Push("comparison target_files:")
430    try:
431      compare_files = TargetFiles()
432      compare_files.LoadZipFile(OPTIONS.compare_with)
433    finally:
434      Pop()
435
436  if OPTIONS.text or not compare_files:
437    Banner("target files")
438    target_files.PrintCerts()
439  target_files.CheckSharedUids()
440  target_files.CheckExternalSignatures()
441  if compare_files:
442    if OPTIONS.text:
443      Banner("comparison files")
444      compare_files.PrintCerts()
445    target_files.CompareWith(compare_files)
446
447  if PROBLEMS:
448    print "%d problem(s) found:\n" % (len(PROBLEMS),)
449    for p in PROBLEMS:
450      print p
451    return 1
452
453  return 0
454
455
456if __name__ == '__main__':
457  try:
458    r = main(sys.argv[1:])
459    sys.exit(r)
460  except common.ExternalError as e:
461    print
462    print "   ERROR: %s" % (e,)
463    print
464    sys.exit(1)
465  finally:
466    common.Cleanup()
467