1from optparse import OptionParser
2from optparse import Option, OptionValueError
3import os
4import mini_parser
5import policy
6from policy import MatchPathPrefix
7import re
8import sys
9
10DEBUG=False
11
12'''
13Use file_contexts and policy to verify Treble requirements
14are not violated.
15'''
16###
17# Differentiate between domains that are part of the core Android platform and
18# domains introduced by vendors
19coreAppdomain = {
20        'bluetooth',
21        'ephemeral_app',
22        'isolated_app',
23        'nfc',
24        'platform_app',
25        'priv_app',
26        'radio',
27        'shared_relro',
28        'shell',
29        'system_app',
30        'untrusted_app',
31        'untrusted_app_25',
32        }
33coredomainWhitelist = {
34        'adbd',
35        'kernel',
36        'postinstall',
37        'postinstall_dexopt',
38        'recovery',
39        'system_server',
40        'vendor_init',
41        }
42coredomainWhitelist |= coreAppdomain
43
44class scontext:
45    def __init__(self):
46        self.fromSystem = False
47        self.fromVendor = False
48        self.coredomain = False
49        self.appdomain = False
50        self.attributes = set()
51        self.entrypoints = []
52        self.entrypointpaths = []
53
54def PrintScontexts():
55    for d in sorted(alldomains.keys()):
56        sctx = alldomains[d]
57        print d
58        print "\tcoredomain="+str(sctx.coredomain)
59        print "\tappdomain="+str(sctx.appdomain)
60        print "\tfromSystem="+str(sctx.fromSystem)
61        print "\tfromVendor="+str(sctx.fromVendor)
62        print "\tattributes="+str(sctx.attributes)
63        print "\tentrypoints="+str(sctx.entrypoints)
64        print "\tentrypointpaths="
65        if sctx.entrypointpaths is not None:
66            for path in sctx.entrypointpaths:
67                print "\t\t"+str(path)
68
69alldomains = {}
70coredomains = set()
71appdomains = set()
72vendordomains = set()
73pol = None
74
75# compat vars
76alltypes = set()
77oldalltypes = set()
78compatMapping = None
79pubtypes = set()
80
81# Distinguish between PRODUCT_FULL_TREBLE and PRODUCT_FULL_TREBLE_OVERRIDE
82FakeTreble = False
83
84def GetAllDomains(pol):
85    global alldomains
86    for result in pol.QueryTypeAttribute("domain", True):
87        alldomains[result] = scontext()
88
89def GetAppDomains():
90    global appdomains
91    global alldomains
92    for d in alldomains:
93        # The application of the "appdomain" attribute is trusted because core
94        # selinux policy contains neverallow rules that enforce that only zygote
95        # and runas spawned processes may transition to processes that have
96        # the appdomain attribute.
97        if "appdomain" in alldomains[d].attributes:
98            alldomains[d].appdomain = True
99            appdomains.add(d)
100
101def GetCoreDomains():
102    global alldomains
103    global coredomains
104    for d in alldomains:
105        # TestCoredomainViolations will verify if coredomain was incorrectly
106        # applied.
107        if "coredomain" in alldomains[d].attributes:
108            alldomains[d].coredomain = True
109            coredomains.add(d)
110        # check whether domains are executed off of /system or /vendor
111        if d in coredomainWhitelist:
112            continue
113        # TODO, add checks to prevent app domains from being incorrectly
114        # labeled as coredomain. Apps don't have entrypoints as they're always
115        # dynamically transitioned to by zygote.
116        if d in appdomains:
117            continue
118        if not alldomains[d].entrypointpaths:
119            continue
120        for path in alldomains[d].entrypointpaths:
121            # Processes with entrypoint on /system
122            if ((MatchPathPrefix(path, "/system") and not
123                    MatchPathPrefix(path, "/system/vendor")) or
124                    MatchPathPrefix(path, "/init") or
125                    MatchPathPrefix(path, "/charger")):
126                alldomains[d].fromSystem = True
127            # Processes with entrypoint on /vendor or /system/vendor
128            if (MatchPathPrefix(path, "/vendor") or
129                    MatchPathPrefix(path, "/system/vendor")):
130                alldomains[d].fromVendor = True
131
132###
133# Add the entrypoint type and path(s) to each domain.
134#
135def GetDomainEntrypoints(pol):
136    global alldomains
137    for x in pol.QueryExpandedTERule(tclass=set(["file"]), perms=set(["entrypoint"])):
138        if not x.sctx in alldomains:
139            continue
140        alldomains[x.sctx].entrypoints.append(str(x.tctx))
141        # postinstall_file represents a special case specific to A/B OTAs.
142        # Update_engine mounts a partition and relabels it postinstall_file.
143        # There is no file_contexts entry associated with postinstall_file
144        # so skip the lookup.
145        if x.tctx == "postinstall_file":
146            continue
147        entrypointpath = pol.QueryFc(x.tctx)
148        if not entrypointpath:
149            continue
150        alldomains[x.sctx].entrypointpaths.extend(entrypointpath)
151###
152# Get attributes associated with each domain
153#
154def GetAttributes(pol):
155    global alldomains
156    for domain in alldomains:
157        for result in pol.QueryTypeAttribute(domain, False):
158            alldomains[domain].attributes.add(result)
159
160def GetAllTypes(pol, oldpol):
161    global alltypes
162    global oldalltypes
163    alltypes = pol.GetAllTypes(False)
164    oldalltypes = oldpol.GetAllTypes(False)
165
166def setup(pol):
167    GetAllDomains(pol)
168    GetAttributes(pol)
169    GetDomainEntrypoints(pol)
170    GetAppDomains()
171    GetCoreDomains()
172
173# setup for the policy compatibility tests
174def compatSetup(pol, oldpol, mapping, types):
175    global compatMapping
176    global pubtypes
177
178    GetAllTypes(pol, oldpol)
179    compatMapping = mapping
180    pubtypes = types
181
182def DomainsWithAttribute(attr):
183    global alldomains
184    domains = []
185    for domain in alldomains:
186        if attr in alldomains[domain].attributes:
187            domains.append(domain)
188    return domains
189
190#############################################################
191# Tests
192#############################################################
193def TestCoredomainViolations():
194    global alldomains
195    # verify that all domains launched from /system have the coredomain
196    # attribute
197    ret = ""
198    violators = []
199    for d in alldomains:
200        domain = alldomains[d]
201        if domain.fromSystem and "coredomain" not in domain.attributes:
202                violators.append(d);
203    if len(violators) > 0:
204        ret += "The following domain(s) must be associated with the "
205        ret += "\"coredomain\" attribute because they are executed off of "
206        ret += "/system:\n"
207        ret += " ".join(str(x) for x in sorted(violators)) + "\n"
208
209    # verify that all domains launched form /vendor do not have the coredomain
210    # attribute
211    violators = []
212    for d in alldomains:
213        domain = alldomains[d]
214        if domain.fromVendor and "coredomain" in domain.attributes:
215            violators.append(d)
216    if len(violators) > 0:
217        ret += "The following domains must not be associated with the "
218        ret += "\"coredomain\" attribute because they are executed off of "
219        ret += "/vendor or /system/vendor:\n"
220        ret += " ".join(str(x) for x in sorted(violators)) + "\n"
221
222    return ret
223
224###
225# Make sure that any new public type introduced in the new policy that was not
226# present in the old policy has been recorded in the mapping file.
227def TestNoUnmappedNewTypes():
228    global alltypes
229    global oldalltypes
230    global compatMapping
231    global pubtypes
232    newt = alltypes - oldalltypes
233    ret = ""
234    violators = []
235
236    for n in newt:
237        if n in pubtypes and compatMapping.rTypeattributesets.get(n) is None:
238            violators.append(n)
239
240    if len(violators) > 0:
241        ret += "SELinux: The following public types were found added to the "
242        ret += "policy without an entry into the compatibility mapping file(s) "
243        ret += "found in private/compat/V.v/V.v[.ignore].cil, where V.v is the "
244        ret += "latest API level.\n"
245        ret += " ".join(str(x) for x in sorted(violators)) + "\n\n"
246        ret += "See examples of how to fix this:\n"
247        ret += "https://android-review.googlesource.com/c/platform/system/sepolicy/+/781036\n"
248        ret += "https://android-review.googlesource.com/c/platform/system/sepolicy/+/852612\n"
249    return ret
250
251###
252# Make sure that any public type removed in the current policy has its
253# declaration added to the mapping file for use in non-platform policy
254def TestNoUnmappedRmTypes():
255    global alltypes
256    global oldalltypes
257    global compatMapping
258    rmt = oldalltypes - alltypes
259    ret = ""
260    violators = []
261
262    for o in rmt:
263        if o in compatMapping.pubtypes and not o in compatMapping.types:
264            violators.append(o)
265
266    if len(violators) > 0:
267        ret += "SELinux: The following formerly public types were removed from "
268        ret += "policy without a declaration in the compatibility mapping "
269        ret += "found in private/compat/V.v/V.v[.ignore].cil, where V.v is the "
270        ret += "latest API level.\n"
271        ret += " ".join(str(x) for x in sorted(violators)) + "\n\n"
272        ret += "See examples of how to fix this:\n"
273        ret += "https://android-review.googlesource.com/c/platform/system/sepolicy/+/822743\n"
274    return ret
275
276def TestTrebleCompatMapping():
277    ret = TestNoUnmappedNewTypes()
278    ret += TestNoUnmappedRmTypes()
279    return ret
280
281def TestViolatorAttribute(attribute):
282    global FakeTreble
283    ret = ""
284    if FakeTreble:
285        return ret
286
287    violators = DomainsWithAttribute(attribute)
288    if len(violators) > 0:
289        ret += "SELinux: The following domains violate the Treble ban "
290        ret += "against use of the " + attribute + " attribute: "
291        ret += " ".join(str(x) for x in sorted(violators)) + "\n"
292    return ret
293
294def TestViolatorAttributes():
295    ret = TestViolatorAttribute("binder_in_vendor_violators")
296    ret += TestViolatorAttribute("socket_between_core_and_vendor_violators")
297    ret += TestViolatorAttribute("vendor_executes_system_violators")
298    return ret
299
300# TODO move this to sepolicy_tests
301def TestCoreDataTypeViolations():
302    global pol
303    return pol.AssertPathTypesDoNotHaveAttr(["/data/vendor/", "/data/vendor_ce/",
304        "/data/vendor_de/"], [], "core_data_file_type")
305
306###
307# extend OptionParser to allow the same option flag to be used multiple times.
308# This is used to allow multiple file_contexts files and tests to be
309# specified.
310#
311class MultipleOption(Option):
312    ACTIONS = Option.ACTIONS + ("extend",)
313    STORE_ACTIONS = Option.STORE_ACTIONS + ("extend",)
314    TYPED_ACTIONS = Option.TYPED_ACTIONS + ("extend",)
315    ALWAYS_TYPED_ACTIONS = Option.ALWAYS_TYPED_ACTIONS + ("extend",)
316
317    def take_action(self, action, dest, opt, value, values, parser):
318        if action == "extend":
319            values.ensure_value(dest, []).append(value)
320        else:
321            Option.take_action(self, action, dest, opt, value, values, parser)
322
323Tests = {"CoredomainViolations": TestCoredomainViolations,
324         "CoreDatatypeViolations": TestCoreDataTypeViolations,
325         "TrebleCompatMapping": TestTrebleCompatMapping,
326         "ViolatorAttributes": TestViolatorAttributes}
327
328if __name__ == '__main__':
329    usage = "treble_sepolicy_tests -l $(ANDROID_HOST_OUT)/lib64/libsepolwrap.so "
330    usage += "-f nonplat_file_contexts -f plat_file_contexts "
331    usage += "-p curr_policy -b base_policy -o old_policy "
332    usage +="-m mapping file [--test test] [--help]"
333    parser = OptionParser(option_class=MultipleOption, usage=usage)
334    parser.add_option("-b", "--basepolicy", dest="basepolicy", metavar="FILE")
335    parser.add_option("-u", "--base-pub-policy", dest="base_pub_policy",
336                      metavar="FILE")
337    parser.add_option("-f", "--file_contexts", dest="file_contexts",
338            metavar="FILE", action="extend", type="string")
339    parser.add_option("-l", "--library-path", dest="libpath", metavar="FILE")
340    parser.add_option("-m", "--mapping", dest="mapping", metavar="FILE")
341    parser.add_option("-o", "--oldpolicy", dest="oldpolicy", metavar="FILE")
342    parser.add_option("-p", "--policy", dest="policy", metavar="FILE")
343    parser.add_option("-t", "--test", dest="tests", action="extend",
344            help="Test options include "+str(Tests))
345    parser.add_option("--fake-treble", action="store_true", dest="faketreble",
346            default=False)
347
348    (options, args) = parser.parse_args()
349
350    if not options.libpath:
351        sys.exit("Must specify path to libsepolwrap library\n" + parser.usage)
352    if not os.path.exists(options.libpath):
353        sys.exit("Error: library-path " + options.libpath + " does not exist\n"
354                + parser.usage)
355    if not options.policy:
356        sys.exit("Must specify current monolithic policy file\n" + parser.usage)
357    if not os.path.exists(options.policy):
358        sys.exit("Error: policy file " + options.policy + " does not exist\n"
359                + parser.usage)
360    if not options.file_contexts:
361        sys.exit("Error: Must specify file_contexts file(s)\n" + parser.usage)
362    for f in options.file_contexts:
363        if not os.path.exists(f):
364            sys.exit("Error: File_contexts file " + f + " does not exist\n" +
365                    parser.usage)
366
367    # Mapping files and public platform policy are only necessary for the
368    # TrebleCompatMapping test.
369    if options.tests is None or options.tests is "TrebleCompatMapping":
370        if not options.basepolicy:
371            sys.exit("Must specify the current platform-only policy file\n"
372                     + parser.usage)
373        if not options.mapping:
374            sys.exit("Must specify a compatibility mapping file\n"
375                     + parser.usage)
376        if not options.oldpolicy:
377            sys.exit("Must specify the previous monolithic policy file\n"
378                     + parser.usage)
379        if not options.base_pub_policy:
380            sys.exit("Must specify the current platform-only public policy "
381                     + ".cil file\n" + parser.usage)
382        basepol = policy.Policy(options.basepolicy, None, options.libpath)
383        oldpol = policy.Policy(options.oldpolicy, None, options.libpath)
384        mapping = mini_parser.MiniCilParser(options.mapping)
385        pubpol = mini_parser.MiniCilParser(options.base_pub_policy)
386        compatSetup(basepol, oldpol, mapping, pubpol.types)
387
388    if options.faketreble:
389        FakeTreble = True
390
391    pol = policy.Policy(options.policy, options.file_contexts, options.libpath)
392    setup(pol)
393
394    if DEBUG:
395        PrintScontexts()
396
397    results = ""
398    # If an individual test is not specified, run all tests.
399    if options.tests is None:
400        for t in Tests.values():
401            results += t()
402    else:
403        for tn in options.tests:
404            t = Tests.get(tn)
405            if t:
406                results += t()
407            else:
408                err = "Error: unknown test: " + tn + "\n"
409                err += "Available tests:\n"
410                for tn in Tests.keys():
411                    err += tn + "\n"
412                sys.exit(err)
413
414    if len(results) > 0:
415        sys.exit(results)
416