1#!/usr/bin/env python3
2#
3# Copyright (C) 2022 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the 'License');
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#    http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an 'AS IS' BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16import sys, argparse, os
17import subprocess
18import re
19import queue
20from threading  import Thread
21import itertools
22import time
23
24class CurrentUserState:
25    def __init__(self, args):
26        self.args = args
27        self.current_user = get_current_user(args)
28
29    def name(self):
30        return "RUN_ON_CURRENT_USER"
31
32    def is_active(self, device_state):
33        return True
34
35    def include_annotations(self, args):
36        return []
37
38    def initialise(self, device_state):
39        pass
40
41    def enter(self, device_state):
42        debug(self.args, "[Test] Entering state " + self.name())
43
44    def get_user(self):
45        return self.current_user
46
47    def all_supported_annotations(self, args):
48        return self.include_annotations(args)
49
50class SystemUserState:
51    def __init__(self, args):
52        self.args = args
53
54    def name(self):
55        return "RUN_ON_SYSTEM_USER"
56
57    def is_active(self, device_state):
58        return device_state["current_user"] == 0
59
60    def include_annotations(self, args):
61        if args.headless:
62            # We want to skip all of the RequireRunOnPrimaryUser ones which get assumption failed
63            return ["com.android.bedstead.harrier.annotations.RequireRunOnSystemUser"]
64        return []
65
66    def initialise(self, device_state):
67        pass
68
69    def enter(self, device_state):
70        debug(self.args, "[Test] Entering state " + self.name())
71        execute_shell_command("Test", self.args, ["adb", "shell", "am", "switch-user", "0"])
72
73    def get_user(self):
74        return 0
75
76    def all_supported_annotations(self, args):
77        return self.include_annotations(args)
78
79class SecondaryUserState:
80    def __init__(self, args):
81        self.args = args
82
83    def name(self):
84        return "RUN_ON_SECONDARY_USER"
85
86    def is_active(self, device_state):
87        if not is_secondary_user(device_state, device_state["users"][device_state["current_user"]]):
88            return False
89        if not self.args.headless:
90            return True
91
92        secondary_user_id = get_or_create_secondary_user(device_state, self.args)
93        return device_state["current_user"] == secondary_user_id
94
95    def include_annotations(self, args):
96        return ["com.android.bedstead.harrier.annotations.RequireRunOnSecondaryUser"]
97
98    def initialise(self, device_state):
99        self.user_id = device_state["current_user"]
100
101    def enter(self, device_state):
102        debug(self.args, "[Test] Entering state " + self.name())
103
104        self.user_id = get_or_create_secondary_user(device_state, self.args)
105        execute_shell_command("Test", self.args, ["adb", "shell", "am", "switch-user", str(self.user_id)])
106        for module in self.args.modules:
107            execute_shell_command("Test", self.args, ["adb", "shell", "pm", "install-existing", "--user", str(self.user_id), supported_modules[module][PACKAGE_NAME]])
108
109            for additional_app in supported_modules[module].get(ADDITIONAL_APPS, []):
110                command = ["adb", "install-existing", "--user", str(self.user_id), additional_app[PACKAGE_NAME]]
111                execute_shell_command("Test", self.args, command, shell=True, executable="/bin/bash")
112
113    def get_user(self):
114        return self.user_id
115
116    def all_supported_annotations(self, args):
117        return self.include_annotations(args)
118
119class AdditionalUserState:
120    """ This state is only useful for headless devices. """
121    def __init__(self, args):
122        self.args = args
123
124    def name(self):
125        return "RUN_ON_ADDITIONAL_USER"
126
127    def is_active(self, device_state):
128        return is_additional_user(device_state, device_state["users"][device_state["current_user"]])
129
130    def include_annotations(self, args):
131        return ["com.android.bedstead.harrier.annotations.RequireRunOnAdditionalUser"]
132
133    def initialise(self, device_state):
134        self.user_id = device_state["current_user"]
135
136    def enter(self, device_state):
137        debug(self.args, "[Test] Entering state " + self.name())
138
139        self.user_id = get_or_create_additional_user(device_state, self.args)
140        execute_shell_command("Test", self.args, ["adb", "shell", "am", "switch-user", str(self.user_id)])
141        for module in self.args.modules:
142            execute_shell_command("Test", self.args, ["adb", "shell", "pm", "install-existing", "--user", str(self.user_id), supported_modules[module][PACKAGE_NAME]])
143
144            for additional_app in supported_modules[module].get(ADDITIONAL_APPS, []):
145                command = ["adb", "install-existing", "--user", str(self.user_id), additional_app[PACKAGE_NAME]]
146                execute_shell_command("Test", self.args, command, shell=True, executable="/bin/bash")
147
148    def get_user(self):
149        return self.user_id
150
151    def all_supported_annotations(self, args):
152        return self.include_annotations(args) + ["com.android.bedstead.harrier.annotations.RequireRunOnSecondaryUser"]
153
154class WorkProfileState:
155    def __init__(self, args):
156        self.args = args
157
158    def name(self):
159        return "RUN_ON_WORK_PROFILE"
160
161    def is_active(self, device_state):
162        if self.args.headless:
163            if device_state["current_user"] == 0:
164                return False
165        else:
166            if not device_state["current_user"] == 0:
167                return False
168        return self._has_work_profile(device_state["users"][device_state["current_user"]])
169
170    def include_annotations(self, args):
171        return ["com.android.bedstead.harrier.annotations.RequireRunOnWorkProfile"]
172
173    def initialise(self, device_state):
174        self.user_id = device_state["users"][device_state["current_user"]]["work_profile_id"]
175
176    def enter(self, device_state):
177        debug(self.args, "[Test] Entering state " + self.name())
178        user = self._get_or_create_work_profile(device_state)
179        self.user_id = user["id"]
180        execute_shell_command("Test", self.args, ["adb", "shell", "am", "switch-user", str(user["parent"])])
181        execute_shell_command("Test", self.args, ["adb", "shell", "am", "start-user ", str(self.user_id)])
182        for module in self.args.modules:
183            execute_shell_command("Test", self.args, ["adb", "shell", "pm", "install-existing", "--user", str(self.user_id), supported_modules[module][PACKAGE_NAME]])
184
185            for additional_app in supported_modules[module].get(ADDITIONAL_APPS, []):
186                command = ["adb", "install-existing", "--user", str(self.user_id), additional_app[PACKAGE_NAME]]
187                execute_shell_command("Test", self.args, command, shell=True, executable="/bin/bash")
188
189    def _get_or_create_work_profile(self, device_state):
190        users = get_users(self.args)
191        for user in users.values():
192            if self._is_work_profile(user):
193                return user
194
195        parent_id = 0 if not self.args.headless else get_or_create_secondary_user(device_state, self.args)
196
197        work_profile_id = create_work_profile(device_state, self.args, parent_id)
198        return {"id": work_profile_id, "type": "profile.MANAGED", "flags": None, "parent": str(parent_id)}
199
200    def get_user(self):
201        return self.user_id
202
203    def _has_work_profile(self, user):
204        return "work_profile_id" in user
205
206    def _is_work_profile(self, user):
207        return user["type"] == "profile.MANAGED"
208
209    def all_supported_annotations(self, args):
210        return self.include_annotations(args)
211
212class CloneProfileState:
213    def __init__(self, args):
214        self.args = args
215
216    def name(self):
217        return "RUN_ON_CLONE_PROFILE"
218
219    def is_active(self, device_state):
220        if not is_clone_profile(device_state, device_state["users"][device_state["current_user"]]):
221            return False
222        if self.args.headless:
223            return False
224        return self._has_clone_profile(device_state["users"][device_state["current_user"]])
225
226    def include_annotations(self, args):
227        return ["com.android.bedstead.harrier.annotations.RequireRunOnCloneProfile"]
228
229    def initialise(self, device_state):
230        self.user_id = device_state["users"][device_state["current_user"]]["clone_profile_id"]
231
232    def enter(self, device_state):
233        debug(self.args, "[Test] Entering state " + self.name())
234        user = self._get_or_create_clone_profile(device_state)
235        debug(self.args, "[Test] clone is " + str(user))
236        self.user_id = user["id"]
237        execute_shell_command("Test", self.args, ["adb", "shell", "am", "switch-user", str(user["parent"])])
238        execute_shell_command("Test", self.args, ["adb", "shell", "am", "start-user ", str(self.user_id)])
239        for module in self.args.modules:
240            execute_shell_command("Test", self.args, ["adb", "shell", "pm", "install-existing", "--user", str(self.user_id), supported_modules[module][PACKAGE_NAME]])
241
242            for additional_app in supported_modules[module].get(ADDITIONAL_APPS, []):
243                command = ["adb", "install-existing", "--user", str(self.user_id), additional_app[PACKAGE_NAME]]
244                execute_shell_command("Test", self.args, command, shell=True, executable="/bin/bash")
245
246    def _get_or_create_clone_profile(self, device_state):
247        users = get_users(self.args)
248        for user in users.values():
249            if is_clone_profile(device_state, user):
250                return user
251
252        parent_id = 0 # Clone user only supported on system user only as of now.
253
254        clone_profile_id = create_clone_profile(device_state, self.args, parent_id)
255        return {"id": clone_profile_id, "type": "profile.CLONE", "flags": None, "parent": str(parent_id)}
256
257    def get_user(self):
258        return self.user_id
259
260    def _has_clone_profile(self, user):
261        return "clone_profile_id" in user
262
263    def all_supported_annotations(self, args):
264        return self.include_annotations(args)
265
266
267class PrivateProfileState:
268    def __init__(self, args):
269        self.args = args
270
271    def name(self):
272        return "RUN_ON_PRIVATE_PROFILE"
273
274    def is_active(self, device_state):
275        if not is_private_profile(device_state, device_state["users"][device_state["current_user"]]):
276            return False
277        if self.args.headless:
278            return False
279        return self._has_private_profile(device_state["users"][device_state["current_user"]])
280
281    def include_annotations(self, args):
282        return ["com.android.bedstead.harrier.annotations.RequireRunOnPrivateProfile"]
283
284    def initialise(self, device_state):
285        self.user_id = device_state["users"][device_state["current_user"]]["private_profile_id"]
286
287    def enter(self, device_state):
288        debug(self.args, "[Test] Entering state " + self.name())
289        user = self._get_or_create_private_profile(device_state)
290        debug(self.args, "[Test] private profile is " + str(user))
291        self.user_id = user["id"]
292        execute_shell_command("Test", self.args, ["adb", "shell", "am", "switch-user", str(user["parent"])])
293        execute_shell_command("Test", self.args, ["adb", "shell", "am", "start-user ", str(self.user_id)])
294        for module in self.args.modules:
295            execute_shell_command("Test", self.args, ["adb", "shell", "pm", "install-existing", "--user", str(self.user_id), supported_modules[module][PACKAGE_NAME]])
296
297            for additional_app in supported_modules[module].get(ADDITIONAL_APPS, []):
298                command = ["adb", "install-existing", "--user", str(self.user_id), additional_app[PACKAGE_NAME]]
299                execute_shell_command("Test", self.args, command, shell=True, executable="/bin/bash")
300
301    def _get_or_create_private_profile(self, device_state):
302        users = get_users(self.args)
303        for user in users.values():
304            if is_private_profile(device_state, user):
305                return user
306
307        parent_id = 0 # Private profiles are only supported on system user only as of now.
308
309        private_profile_id = create_private_profile(device_state, self.args, parent_id)
310        return {"id": private_profile_id, "type": "profile.PRIVATE", "flags": None, "parent": str(parent_id)}
311
312    def get_user(self):
313        return self.user_id
314
315    def _has_private_profile(self, user):
316        return "private_profile_id" in user
317
318    def all_supported_annotations(self, args):
319        return self.include_annotations(args)
320
321RUN_ON_CURRENT_USER = CurrentUserState
322RUN_ON_SYSTEM_USER = SystemUserState
323RUN_ON_SECONDARY_USER = SecondaryUserState
324RUN_ON_WORK_PROFILE = WorkProfileState
325RUN_ON_ADDITIONAL_USER = AdditionalUserState
326RUN_ON_CLONE_PROFILE = CloneProfileState
327RUN_ON_PRIVATE_PROFILE = PrivateProfileState
328
329STATE_CODES = {
330    "c": RUN_ON_CURRENT_USER,
331    "s": RUN_ON_SYSTEM_USER,
332    "y": RUN_ON_SECONDARY_USER,
333    "w": RUN_ON_WORK_PROFILE,
334    "a": RUN_ON_ADDITIONAL_USER,
335    "l": RUN_ON_CLONE_PROFILE,
336    "p": RUN_ON_PRIVATE_PROFILE,
337    "i": "i" # SPECIAL CASE DEALT WITH AT THE START OF PARSING
338}
339
340DEFAULT_STATES = "csywalp"
341
342SHORT_PACKAGE_PREFIXES = {
343    "a.d.i": "android.devicepolicy.internal",
344    "a.d.c.t": "android.devicepolicy.cts.telephony",
345    "a.d.c": "android.devicepolicy.cts",
346    "a.d.g": "android.devicepolicy.gts",
347    "a.m.c": "android.multiuser.cts",
348    "nene": "com.android.bedstead.nene"
349}
350
351# We hardcode supported modules so we can optimise for
352# development of those modules. It is not our intention to support all tests.
353supported_modules = {
354    # XTS
355    "CtsDevicePolicyTestCases": {
356        "package": "android.devicepolicy.cts",
357        "runner": "androidx.test.runner.AndroidJUnitRunner",
358        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
359    },
360    "CtsDevicePolicySimTestCases": {
361        "package": "android.devicepolicy.cts.telephony",
362        "runner": "androidx.test.runner.AndroidJUnitRunner",
363        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
364    },
365    "GtsDevicePolicyTestCases": {
366        "package": "android.devicepolicy.gts",
367        "runner": "androidx.test.runner.AndroidJUnitRunner",
368        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
369    },
370    "GtsInteractiveAudioTestCases": {
371        "package": "com.google.android.audio.gts",
372        "runner": "androidx.test.runner.AndroidJUnitRunner",
373        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
374    },
375    "CtsMultiUserTestCases": {
376        "package": "android.multiuser.cts",
377        "runner": "androidx.test.runner.AndroidJUnitRunner",
378        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
379    },
380    "CtsAccountManagerMultiuserTestCases": {
381        "package": "android.accounts.cts.multiuser",
382        "runner": "androidx.test.runner.AndroidJUnitRunner",
383        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER]
384    },
385
386    # Internal
387    "InternalDevicePolicyTestCases": {
388        "package": "android.devicepolicy.internal",
389        "runner": "androidx.test.runner.AndroidJUnitRunner",
390        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
391    },
392
393    # Bedstead
394    "ActivityContextTest": {
395        "package": "com.android.activitycontext.test",
396        "runner": "androidx.test.runner.AndroidJUnitRunner",
397        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
398    },
399
400    "DeviceAdminAppTest": {
401        "package": "com.android.bedstead.deviceadminapp.test",
402        "runner": "androidx.test.runner.AndroidJUnitRunner",
403        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
404    },
405    "EventLibTest": {
406        "package": "com.android.eventlib.test",
407        "path": "com.android.eventlib",
408        "runner": "androidx.test.runner.AndroidJUnitRunner",
409        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE],
410        "additional_apps": [{"target": "EventLibTestApp", "package": "com.android.eventlib.tests.testapp"}]
411    },
412    "HarrierTest": {
413        "package": "com.android.bedstead.harrier.test",
414        "path": "com.android.bedstead.harrier",
415        "runner": "androidx.test.runner.AndroidJUnitRunner",
416        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
417    },
418    "NeneTest": {
419        "package": "com.android.bedstead.nene.test",
420        "path": "com.android.bedstead.nene",
421        "runner": "androidx.test.runner.AndroidJUnitRunner",
422        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE],
423        "additional_targets": ["NeneTestApp1"],
424        "files": [{"from": "NeneTestApp1.apk", "to": "/data/local/tmp/NeneTestApp1.apk"}]
425    },
426    "BedsteadQueryableTest": {
427        "package": "com.android.queryable.test",
428        "path": "com.android.queryable",
429        "runner": "androidx.test.runner.AndroidJUnitRunner",
430        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
431    },
432    "RemoteDPCTest": {
433        "package": "com.android.bedstead.remotedpc.test",
434        "path": "com.android.bedstead.remotedpc",
435        "runner": "androidx.test.runner.AndroidJUnitRunner",
436        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
437    },
438    "RemoteAccountAuthenticatorTest": {
439        "package": "com.android.bedstead.remoteaccountauthenticator.test",
440        "path": "com.android.bedstead.remoteaccountauthenticator",
441        "runner": "androidx.test.runner.AndroidJUnitRunner",
442        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER]
443    },
444    "TestAppTest": {
445        "package": "com.android.bedstead.testapp.test",
446        "path": "com.android.bedstead.testapp",
447        "runner": "androidx.test.runner.AndroidJUnitRunner",
448        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
449    },
450    "CtsDevicePolicySimTestCases": {
451        "package": "android.devicepolicy.cts.telephony",
452        "runner": "androidx.test.runner.AndroidJUnitRunner",
453        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_ADDITIONAL_USER]
454    },
455
456    # Modules
457    "bedstead-usb-test": {
458        "package": "com.android.bedstead.usb.test",
459        "path": "com.android.bedstead.usb",
460        "runner": "androidx.test.runner.AndroidJUnitRunner",
461        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE],
462    },
463    "bedstead-adb-test": {
464        "package": "com.android.bedstead.adb.test",
465        "path": "com.android.bedstead.adb",
466        "runner": "androidx.test.runner.AndroidJUnitRunner",
467        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE],
468    },
469    "bedstead-root-test": {
470        "package": "com.android.xts.root.test",
471        "path": "com.android.xts.root",
472        "runner": "androidx.test.runner.AndroidJUnitRunner",
473        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE],
474    },
475    "bedstead-permissions-test": {
476        "package": "com.android.bedstead.permissions.test",
477        "path": "com.android.bedstead.permissions",
478        "runner": "androidx.test.runner.AndroidJUnitRunner",
479        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE],
480    },
481    "bedstead-flags-test": {
482        "package": "com.android.bedstead.flags.test",
483        "path": "com.android.bedstead.flags",
484        "runner": "androidx.test.runner.AndroidJUnitRunner",
485        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE],
486    },
487    "bedstead-multiuser-test": {
488        "package": "com.android.bedstead.multiuser.test",
489        "path": "com.android.bedstead.multiuser",
490        "runner": "androidx.test.runner.AndroidJUnitRunner",
491        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE],
492    },
493    "bedstead-enterprise-test": {
494        "package": "com.android.bedstead.enterprise.test",
495        "path": "com.android.bedstead.enterprise",
496        "runner": "androidx.test.runner.AndroidJUnitRunner",
497        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE],
498    },
499}
500
501TARGET_NAME = "target"
502PACKAGE_NAME = "package"
503PATH = "path"
504RUNNER = "runner"
505STATES = "states"
506ADDITIONAL_APPS = "additional_apps"
507ADDITIONAL_TARGETS = "additional_targets"
508FILES = "files"
509
510# Theme configuration
511RESET_CODE = "\33[0m"
512CLASS_NAME_COLOUR_CODE = "\33[35m"
513TEST_NAME_COLOUR_CODE = "\33[33m"
514PASSED_CODE = "\33[32m"
515FAILED_CODE = "\33[31m"
516IGNORED_CODE = "\33[33m"
517
518AVAILABLE_PARAMETER_COLOUR_CODES = [
519    '\33[40m',
520    '\33[41m',
521    '\33[42m',
522    '\33[43m',
523    '\33[44m',
524    '\33[45m',
525    '\33[46m',
526    '\33[101m',
527    '\33[104m',
528    '\33[105m',
529]
530
531def find_module_for_class_method(class_method):
532    """ If only a class#method is provided, see if we can find the module. Will return None if not. """
533    matching_modules = []
534    for module in supported_modules:
535        path = supported_modules[module].get(PATH, supported_modules[module][PACKAGE_NAME])
536
537        if class_method.startswith(path):
538            matching_modules.append(module)
539
540    if len(matching_modules) == 0:
541        return None
542    elif len(matching_modules) == 1:
543        return matching_modules[0]
544    else:
545        print("Found multiple potential modules. Please add module name to command")
546        sys.exit(1)
547
548def get_args():
549    """ Parse command line arguments. """
550    parser = argparse.ArgumentParser(description="Run tests during development")
551    parser.add_argument("targets", nargs="+", type=str, help="The target to run. This is in the form module:class#method. Method and class are optional. Use LAST_PASSES, LAST_FAILURES, LAST_IGNORED, or LAST_ASSUMPTION_FAILED to include the passed/failed/ignored/assumption failed tests from the most recent run.")
552    parser.add_argument("-b", "--build", action='store_true', help="Builds test targets. (default)")
553    parser.add_argument("-i", "--install", action='store_true', help="Builds test targets. (default)")
554    parser.add_argument("-t", "--test", action='store_true', help="Builds test targets. (default)")
555    parser.add_argument("-d", "--debug", action="store_true", help="Include debug output.")
556    parser.add_argument("-c", "--coexistence", action="store", default="?", help="Force device policy coexistence on or off (ignoring bedstead test preferences)")
557    parser.add_argument("-s", "--states", help="Specify states which should be included. Options are (c)urrent (s)ystem secondar(y), (w)ork profile, c(l)one (i)nitial, (a)dditional, (p)rivate profile. Defaults to all states.")
558    parser.add_argument("-n", "--interactive", action='store', nargs='?', default="disabled", help="Run interactive tests. This will exclude non-interactive tests and will enable manual testing. Pass 'bi' to build and install automations, 't' to use automations (default) and m to enable manual interaction (default)")
559    parser.add_argument("--stop-after-first-failure", action="store_true", help="If true, will stop execution after encountering a single failure")
560    parser.add_argument("--rerun-after-all-pass", action="store_true", help="If true, will re-start the run if all pass")
561    parser.add_argument("-r", "--root", action="store_true", help="Run rooted tests. This will ensure that adb is running as root and will include tests annotated as @RequireAdbRoot - which are otherwise excluded.")
562    # parser.add_argument("--capture-bugreport-after-crash", action="store_true", help="If true, will capture a bug report on failure due to a process or system server crash")
563    parser.add_argument("--timer", action="store", choices=['junit', 'btest'], default='junit', help="Either 'junit' (default) which is the internal junit timer and reflects the numbers shown on dashboards, etc. or 'btest' which tracks the actual time from the start of the test to the result being posted (including bedstead setup and teardown)")
564    args = parser.parse_args()
565    if not args.build and not args.install and not args.test:
566        args.build = True
567        args.install = True
568        args.test = True
569
570    if not args.states:
571        args.states = DEFAULT_STATES
572    args.states = set(args.states)
573    valid_states = ["c", "s", "y", "w", "l", "a", "i", "p"]
574    for state in args.states:
575        if not state in valid_states:
576            print("State " + state + " is invalid, must be one of " + str(valid_states))
577            sys.exit(1)
578    args.states = [STATE_CODES[a] for a in args.states]
579
580    args.build_interactive = False
581    args.install_interactive = False
582    args.manual_interactive = False
583    args.automate_interactive = False
584    if not args.interactive:
585        args.build_interactive = True
586        args.install_interactive = True
587        args.manual_interactive = True
588        args.automate_interactive = True
589        args.interactive = "enabled"
590
591    if args.interactive == "disabled":
592        args.interactive = False
593    elif args.interactive != "enabled":
594        valid_instructions = ["b", "i", "t", "m"]
595        for instruction in args.interactive:
596            if instruction == "b":
597                args.build_interactive = True
598            elif instruction == "i":
599                args.install_interactive = True
600            elif instruction == "t":
601                args.automate_interactive = True
602            elif instruction == "m":
603                args.manual_interactive = True
604            else:
605                print("Instruction " + instruction + " is invalid, must be one of " + str(valid_instructions))
606                sys.exit(1)
607
608    load_module_and_class_methods(args)
609
610    return args
611
612def expand_short_target(target):
613    if target == "LAST_FAILURES":
614        out = os.environ["OUT"]
615        with open(out + "/btest_failures.txt", "r") as f:
616            return [t.strip() for t in f.readlines()]
617    if target == "LAST_PASSES":
618        out = os.environ["OUT"]
619        with open(out + "/btest_passes.txt", "r") as f:
620            return [t.strip() for t in f.readlines()]
621    if target == "LAST_IGNORED":
622        out = os.environ["OUT"]
623        with open(out + "/btest_ignored.txt", "r") as f:
624            return [t.strip() for t in f.readlines()]
625    if target == "LAST_ASSUMPTION_FAILED":
626        out = os.environ["OUT"]
627        with open(out + "/btest_assumption_failed.txt", "r") as f:
628            return [t.strip() for t in f.readlines()]
629
630    for short in SHORT_PACKAGE_PREFIXES.keys():
631        if target.startswith(short):
632            target = SHORT_PACKAGE_PREFIXES[short] + target[len(short):]
633            break
634    return [target]
635
636def flatten(list_of_lists):
637    return list(itertools.chain.from_iterable(list_of_lists))
638
639def load_module_and_class_methods(args):
640    """ Parse targets from args and load module and class_method. """
641    args.targets = flatten([expand_short_target(target) for target in args.targets])
642
643    if len(args.targets) == 0:
644        print("No targets to run")
645        sys.exit(0)
646
647    new_targets = []
648
649    for target in args.targets:
650        target_parts = target.split(":", 1)
651        module = target_parts[0]
652        class_method = target_parts[1] if len(target_parts) > 1 else None
653
654        if not module in supported_modules:
655            # Let's guess that maybe they omitted the module
656            class_method = module
657            module = find_module_for_class_method(class_method)
658            if not module:
659                print("Could not find module or module not supported " + class_method + ". btest only supports a small number of test modules.")
660                sys.exit(1)
661        new_targets.append((module, class_method))
662    args.targets = new_targets
663    args.modules = set([t[0] for t in args.targets])
664
665def build_modules(args):
666    build_top = os.environ["ANDROID_BUILD_TOP"]
667
668    # Unfortunately I haven't figured out a way to just import the envsetup so we need to run it each time
669    must_build = False
670    command = ". " + build_top + "/build/envsetup.sh"
671
672    if args.build:
673        must_build = True
674        targets = args.modules.copy()
675        for t in args.targets:
676            targets.update(supported_modules[t[0]].get(ADDITIONAL_TARGETS, []))
677            targets.update([app[TARGET_NAME] for app in supported_modules[t[0]].get(ADDITIONAL_APPS, [])])
678
679        command += " &&" + " && ".join(["m " + t for t in targets])
680
681    if args.interactive and args.build_interactive:
682        must_build = True
683        command += " && m InteractiveAutomation"
684
685    if must_build:
686        debug(args, "[Build] Executing '" + command + "'")
687
688        # TODO: We should also stream the output
689        output, err = execute_shell_command("BUILD", args, [command], shell=True, executable="/bin/bash")
690        print(output)
691        if "failed to build some targets" in output:
692            sys.exit(1)
693
694
695def install(args):
696    out = os.environ["OUT"]
697
698    if args.install:
699        for module in args.modules:
700            command = ["adb install --user all -t -g " + out + "/testcases/" + module + "/*/" + module + ".apk"]
701            execute_shell_command("Install", args, command, shell=True, executable="/bin/bash")
702
703    if args.interactive and args.install_interactive:
704        command = ["adb push " + out + "/system/app/InteractiveAutomation/InteractiveAutomation.apk /sdcard"]
705        execute_shell_command("Install", args, command, shell=True, executable="/bin/bash")
706
707    for module in args.modules:
708        for additional_app in supported_modules[module].get(ADDITIONAL_APPS, []):
709            command = ["adb install --user all -t -g " + out + "/testcases/" + additional_app[TARGET_NAME] + "/*/" + additional_app[TARGET_NAME] + ".apk"]
710            execute_shell_command("Install", args, command, shell=True, executable="/bin/bash")
711
712    for module in args.modules:
713        for file in supported_modules[module].get(FILES, []):
714            command = ["adb push " + out + "/testcases/*/" + file["from"]  + " " + file["to"]]
715            execute_shell_command("Install", args, command, shell=True, executable="/bin/bash")
716
717class Test:
718
719    def __init__(self, args, module, class_methods, state, btest_run, total_test_count, next_test, include_annotations, exclude_annotations, has_later_states):
720        self.args = args
721        self.state = state
722        self.module_package = supported_modules[module][PACKAGE_NAME]
723        self.runner = supported_modules[module][RUNNER]
724        self.class_methods = class_methods
725        self.parameter_colour_codes = {}
726        self.available_parameter_colour_codes_pointer = 0
727        self.total_test_count = total_test_count
728        self.next_test = next_test
729        self.btest_run = btest_run
730        self.test_results = {}
731        self.include_annotations = include_annotations.copy()
732        self.exclude_annotations = exclude_annotations.copy()
733        self.has_no_tests = False # Marked at the end of the test if there were no new tests
734        self.has_loaded_total_test_count = False # Used to ensure we don't double count test counts
735        self.has_later_states = has_later_states # True if we don't know the full number of tests because we'll be running more states later
736
737        if self.args.interactive:
738            self.include_annotations.append("com.android.interactive.annotations.Interactive")
739        else:
740            self.exclude_annotations.append("com.android.interactive.annotations.Interactive")
741
742        if self.args.root:
743            self.include_annotations.append("com.android.xts.root.annotations.RequireAdbRoot")
744        else:
745            self.exclude_annotations.append("com.android.xts.root.annotations.RequireAdbRoot")
746
747    def run(self):
748        execute_shell_command("Test", self.args, ["adb", "shell", "am", "start-user ", str(self.state.get_user())])
749
750        # This prepares the devices to begin running a test without a persistent adb connection.
751        # it will write the ongoing result log to /sdcard/ex.log and then we will repeatedly
752        # check that - allowing for usb disconnection without losing the results
753        execute_shell_command("Test", self.args, ["adb", "shell", "rm -f /sdcard/ex.log && rm -f /sdcard/ex_now.log && rm -f /sdcard/ex_read.log && rm -f /sdcard/test_pid.txt && touch /sdcard/ex_now.log"])
754
755        command = "adb shell nohup am instrument --user " + str(self.state.get_user())
756
757        # Use the formatted output
758        command += " -e listener com.android.bedstead.harrier.BedsteadRunListener"
759
760        if self.include_annotations:
761            command += " -e annotation " + ",".join(self.include_annotations)
762
763        if self.exclude_annotations:
764            command += " -e notAnnotation " + ",".join(self.exclude_annotations)
765
766        # btest supports untethering during the run
767        command += " -e CAN_UNTETHER true"
768
769        if self.args.interactive:
770            if self.args.manual_interactive:
771                command += " -e ENABLE_MANUAL true"
772            else:
773                command += " -e ENABLE_MANUAL false"
774            if self.args.automate_interactive:
775                command += " -e ENABLE_AUTOMATION true"
776            else:
777                command += " -e ENABLE_AUTOMATION false"
778
779        if self.args.coexistence == "yes":
780            command += " -e COEXISTENCE true"
781        elif self.args.coexistence == "no":
782            command += " -e COEXISTENCE false"
783
784        if len(self.class_methods) > 0:
785            if any("*" in s for s in self.class_methods):
786                if len(self.class_methods) > 1:
787                    print("Error. If you use a wildcard target, you can only specify one target")
788                    sys.exit(1)
789                for class_method in self.class_methods:
790                    # class_method = self.class_methods[0]
791                    # We need to escape 3 times to get through the various interpreters
792                    # Using regex adds 5 seconds or so to running a single test so has to be opt-in
793                    command += " -e tests_regex " + ".*".join([re.escape(re.escape(re.escape(s))) for s in class_method.split("*")])
794            else:
795                command += " -e class " + ",".join(self.class_methods)
796
797        command += " -w " + self.module_package + "/" + self.runner
798
799        # This means we will have /sdcard/ex.log with the test results and /sdcard/test_pid.txt
800        # with the pid of the process. We can check the pid to see if the tests are still running
801        command += " > /sdcard/ex.log 2>&1 & echo $! > /sdcard/test_pid.txt"
802
803        if self.args.debug:
804            print("[Test] Executing '" + command + "'")
805
806        def enqueue_output(queue):
807            output, stderr = execute_shell_command("Test", self.args, ["adb", "shell", "cp /sdcard/ex_now.log /sdcard/ex_read.log && cp /sdcard/ex.log /sdcard/ex_now.log && comm -23 /sdcard/ex_now.log /sdcard/ex_read.log"])
808            for line in output.split("\n"):
809                debug(self.args, "[DEBUG] " + line)
810
811                if "Time: " in line:
812                    debug(self.args, "[DEBUG] Output finished")
813                    self.tests_finished = True
814                queue.put(line)
815
816        execute_shell_command("Test", self.args, command.split(" "))
817        self.tests_finished = False
818        self.test_process_queue = queue.Queue()
819        self.test_process_thread = Thread(target=enqueue_output, args=(self.test_process_queue,))
820        self.test_process_thread.daemon = True
821        self.test_process_thread.start()
822
823        start_timer = time.monotonic_ns()
824
825        num_tests = self.get_num_tests()
826        if num_tests > -1:
827            self.total_test_count += num_tests
828
829            modified_total_test_count = str(self.total_test_count)
830            if self.has_later_states:
831                modified_total_test_count += "+"
832            total_test_length = len(modified_total_test_count)
833
834            for i in range(num_tests):
835                result = self.get_result(i)
836
837                if not result:
838                    # Process has finished
839                    break
840
841                test_name_parts = re.split('[#\[\]]', result["testName"])
842                print("[" + str(self.next_test).rjust(total_test_length) + "/" + modified_total_test_count + "] " + CLASS_NAME_COLOUR_CODE + test_name_parts[0] + RESET_CODE + "#" + TEST_NAME_COLOUR_CODE + test_name_parts[1] + RESET_CODE, end='')
843                self.next_test += 1
844
845                if len(test_name_parts) > 2:
846                    # 1 or more parameterizations - [2] will be a name, then every other one is empty
847                    parameterizations = (test_name_parts[2::2])
848                    for p in parameterizations:
849                        print("[" + self.get_parameter_colour_code(p) + p + RESET_CODE + "]", end='')
850                sys.stdout.flush()
851
852                while not result["isFinished"]:
853                    result = self.get_result(i)
854                    if not self.tests_are_running():
855                        break
856
857                if result:
858                    if self.args.timer == "btest":
859                        # Replace junit runtime with actual elapsed time
860                        result["btestRunTime"] = time.monotonic_ns() - start_timer
861                        start_timer = time.monotonic_ns()
862
863                    self.print_result(result)
864
865                    if self.args.stop_after_first_failure and str(result["result"]) == "1":
866                        # Failure
867                        self.kill()
868                        raise KeyboardInterrupt
869                        return
870
871            debug(self.args, "Waiting for tests to stop running...")
872            wait_to_end_timer = time.monotonic_ns()
873            while self.tests_are_running():
874                if time.monotonic_ns() - wait_to_end_timer > 10000000000:
875                    print("(Waited 10 seconds for test process to end. Killing)")
876                    self.kill()
877                    break
878            debug(self.args, "Done")
879
880            if self.next_test <= num_tests:
881                # Tests are missing - probably something went wrong...
882                print(">> ERROR: Expected " + str(num_tests) + " results but got " + str(self.next_test))
883                self.dump_output()
884
885        while not self.test_process_queue.empty():
886            output = self.test_process_queue.get()
887            if "(0 tests)" in output:
888                debug(self.args, "[" + self.state.name() + "] No tests to run")
889            if "Process crashed before executing the test" in output:
890                print(output)
891                sys.exit(1)
892
893    def dump_output(self):
894        while not self.test_process_queue.empty():
895            output = self.test_process_queue.get()
896            print(output)
897
898    def tests_are_running(self):
899        output, err = execute_shell_command("Test", self.args, ["adb", "shell", "ps | grep $(cat /sdcard/test_pid.txt)"])
900        return output.strip()
901
902    def kill(self):
903        execute_shell_command("Test", self.args, ["adb", "shell", "kill $(cat /sdcard/test_pid.txt)"])
904
905    def get_num_tests(self):
906        numTests = -1
907        while numTests == -1:
908            if not self.tests_are_running():
909                return -1
910
911            output, err = execute_shell_command("TEST", self.args, ["adb", "shell", "content query --user " + str(self.state.get_user()) + " --uri content://" + self.module_package + ".BedsteadRunResultsProvider/numTests"])
912            if not output:
913                continue # Not running yet?
914            if "No result found" in output:
915                continue
916            numTests = int(output.split("tests=", 2)[1].strip())
917        return numTests
918
919    def get_result(self, i):
920        result = None
921        while not result:
922            output, err = execute_shell_command("TEST", self.args, ["adb", "shell", "content query --user " + str(self.state.get_user()) + " --uri content://" + self.module_package + ".BedsteadRunResultsProvider/" + str(i)])
923            if not output:
924                continue # Not running yet?
925            if "No result found" in output:
926                if self.get_num_tests() == -1:
927                    # The process has ended
928                    return None
929
930                continue
931
932            result = {}
933            result["index"] = int(output.split("index=", 2)[1].split(",", 2)[0])
934            result["testName"] = output.split("testName=", 2)[1].split(", result=", 2)[0]
935            result["isFinished"] = output.split("isFinished=", 2)[1].strip() == "true"
936            if result["isFinished"]:
937                result["result"] = int(output.split("result=", 2)[1].split(",", 2)[0])
938                result["message"] = output.split("message=", 2)[1].split(", stackTrace=", 2)[0]
939                result["stackTrace"] = output.split("stackTrace=", 2)[1].split(", runTime=", 2)[0]
940                result["runTime"] = int(output.split("runTime=", 2)[1].split(",", 2)[0])
941
942        return result
943
944    def get_parameter_colour_code(self, parameter):
945        if not parameter in self.parameter_colour_codes:
946            self.parameter_colour_codes[parameter] = AVAILABLE_PARAMETER_COLOUR_CODES[self.available_parameter_colour_codes_pointer]
947            self.available_parameter_colour_codes_pointer = (self.available_parameter_colour_codes_pointer + 1) % len(AVAILABLE_PARAMETER_COLOUR_CODES)
948        return self.parameter_colour_codes[parameter]
949
950    def print_result(self, test_result):
951        try:
952            time_str = format_nanos(test_result["runTime"])
953            if "btestRunTime" in test_result:
954                time_str = time_str + "/" + format_nanos(test_result["btestRunTime"])
955            if test_result["result"] == 0:
956                self.btest_run.passed_tests.append(test_result)
957                print(" ✅  " + PASSED_CODE + "PASSED" + RESET_CODE + " (" + time_str + ")", flush=True)
958            elif test_result["result"] == 1:
959                self.btest_run.failed_tests.append(test_result)
960                print(" ❌  " + FAILED_CODE + "FAILED (" + test_result["message"] + ")" + RESET_CODE + " (" + time_str + ")\n\n" + test_result["stackTrace"] + "\n", flush=True)
961            elif test_result["result"]  == 2:
962                self.btest_run.ignored_tests.append(test_result)
963                print(" " + IGNORED_CODE + "// IGNORED" + RESET_CODE + " (" + time_str + ")", flush=True)
964            elif test_result["result"] == 3:
965                self.btest_run.assumption_failed_tests.append(test_result)
966                print(" " + IGNORED_CODE + "// ASSUMPTION FAILED (" + test_result["message"] + ")" + RESET_CODE + " (" + time_str + ")", flush=True)
967            return
968        except Exception as e:
969            if not is_connected():
970                print("\nDevice not connected to ADB. Waiting for reconnection...")
971                wait_for_adb_reconnection()
972                return
973            print("Exception ", e)
974        print("ERROR PARSING TEST RESULT " + str(test_result), flush=True)
975        self.dump_output()
976        sys.exit(1)
977
978def format_nanos(nanos):
979    ms = int(nanos) / 1000000
980    seconds = ms / 1000
981    if ms < 800:
982        timestr = "{:.2f}ms".format(ms)
983    else:
984        if seconds < 60:
985            timestr = "{:.2f}s".format(seconds)
986        else:
987            minutes = seconds / 60
988            timestr = "{:.2f}m".format(minutes)
989    if seconds > 30:
990        timestr = FAILED_CODE + timestr + RESET_CODE
991    return timestr
992
993class BtestRun:
994    def __init__(self):
995        self.passed_tests = []
996        self.failed_tests = []
997        self.ignored_tests = []
998        self.assumption_failed_tests = []
999
1000def execute_shell_command(stage, args, command, **extra_args):
1001    debug(args, "[" + stage + "] Executing '" + " ".join(command) + "'")
1002    r = subprocess.run(command, capture_output=True, text=True, **extra_args)
1003    output = r.stdout
1004    debug(args, "[" + stage + "] Output: '" + output + "' Err: '" + r.stderr + "'")
1005
1006    if r.stderr:
1007        if "no devices/emulators found" in r.stderr or "device offline" in r.stderr:
1008            print("\nDevice not connected to ADB. Waiting for reconnection...")
1009            wait_for_adb_reconnection()
1010            return execute_shell_command(stage, args, command, **extra_args)
1011
1012    return output, r.stderr
1013
1014def wait_for_adb_reconnection():
1015    while True:
1016        if is_connected():
1017            print("\nFound connection")
1018            return
1019        time.sleep(0.5)
1020
1021def is_connected():
1022    r = subprocess.run(["adb", "shell", "echo", "hello"], capture_output=True, text=True)
1023    output = r.stdout
1024
1025    return not r.stderr or (not "no devices/emulators found" in r.stderr and not "device offline" in r.stderr)
1026
1027def get_or_create_additional_user(device_state, args):
1028    users = get_users(args)
1029    secondary_users = sorted([u for u in users.keys() if is_secondary_user(device_state, users[u])])
1030    if (len(secondary_users) == 0):
1031        create_user(device_state, args)
1032    if len(secondary_users) < 2:
1033        return create_user(device_state, args)
1034
1035    return secondary_users[1]
1036
1037def get_or_create_secondary_user(device_state, args):
1038    users = get_users(args)
1039    secondary_users = sorted([u for u in users.keys() if is_secondary_user(device_state, users[u])])
1040    if len(secondary_users) > 0:
1041        return secondary_users[0]
1042    return create_user(device_state, args)
1043
1044def is_additional_user(device_state, user):
1045    if not is_secondary_user(device_state, user):
1046        return False
1047
1048    secondary_users = sorted([u for u in device_state["users"].keys() if is_secondary_user(device_state, device_state["users"][u])])
1049    return user["id"] != secondary_users[0]
1050
1051def is_for_testing(device_state, user):
1052    """ FOR_TESTING introduced in SDK 34 - before that we can assume all users are suitable """
1053    return device_state["sdk_version"] < 34 or "FOR_TESTING" in user["flags"]
1054
1055def is_secondary_user(device_state, user):
1056    if ("HEADLESS" in device_state["users"][0]["type"]):
1057        # The initial user is still useful for secondary user tests even if non-for-testing on
1058        # headless as it's default
1059        return user["type"] == "full.SECONDARY"
1060    return is_for_testing(device_state, user) and user["type"] == "full.SECONDARY"
1061
1062def is_clone_profile(device_state, user):
1063    return is_for_testing(device_state, user) and user["type"] == "profile.CLONE"
1064
1065def is_private_profile(device_state, user):
1066    return is_for_testing(device_state, user) and user["type"] == "profile.PRIVATE"
1067
1068def remove_user(args, id):
1069    execute_shell_command("Test", args, ["adb", "shell", "pm", "remove-user", str(id)])
1070
1071def create_user(device_state, args):
1072    ensure_no_dpcs(device_state, args) # avoid no_add_user TODO: Be more specific
1073    commands = ["adb", "shell", "pm", "create-user"]
1074
1075    if device_state["sdk_version"] >= 34:
1076        commands.append("--for-testing")
1077
1078    commands.append("user")
1079
1080    output, err = execute_shell_command("Test", args, commands)
1081    try:
1082        id = int(output.rsplit(" ", 1)[1].strip())
1083    except IndexError:
1084        print("Error parsing user id. Output: " + output + ", err: " + err)
1085        sys.exit(1)
1086    execute_shell_command("Test", args, ["adb", "shell", "am start-user " + str(id)])
1087    return id
1088
1089def create_work_profile(device_state, args, parent_id):
1090    ensure_no_dpcs(device_state, args) # avoid no_add_user TODO: Be more specific
1091
1092    commands = ["adb", "shell", "pm", "create-user", "--managed", "--profileOf", str(parent_id)]
1093
1094    if device_state["sdk_version"] >= 34:
1095        commands.append("--for-testing")
1096
1097    commands.append("user")
1098
1099    output, err = execute_shell_command("Test", args, commands)
1100    try:
1101        id = int(output.rsplit(" ", 1)[1].strip())
1102    except IndexError:
1103        print("Error parsing profile id. Output: " + output + ", err: " + err)
1104        sys.exit(1)
1105    return id
1106
1107def create_clone_profile(device_state, args, parent_id):
1108    ensure_no_dpcs(device_state, args) # avoid no_add_clone_profile TODO: Be more specific
1109    commands = ["adb", "shell", "pm", "create-user", "--profileOf", str(parent_id), "--user-type android.os.usertype.profile.CLONE"]
1110    if device_state["sdk_version"] >= 34:
1111        commands.append("--for-testing")
1112    commands.append("user")
1113
1114    output, err = execute_shell_command("Test", args, commands)
1115
1116    try:
1117        id = int(output.rsplit(" ", 1)[1].strip())
1118    except IndexError:
1119        print("Error parsing profile id. Output: " + output + ", err: " + err)
1120        sys.exit(1)
1121    return id
1122
1123def create_private_profile(device_state, args, parent_id):
1124    ensure_no_dpcs(device_state, args) # avoid no_add_private_profile TODO: Be more specific
1125    commands = ["adb", "shell", "pm", "create-user", "--profileOf", str(parent_id), "--user-type android.os.usertype.profile.PRIVATE"]
1126    if device_state["sdk_version"] >= 34:
1127        commands.append("--for-testing")
1128    commands.append("user")
1129
1130    output, err = execute_shell_command("Test", args, commands)
1131
1132    try:
1133        id = int(output.rsplit(" ", 1)[1].strip())
1134    except IndexError:
1135        print("Error parsing profile id. Output: " + output + ", err: " + err)
1136        sys.exit(1)
1137    return id
1138
1139def gather_device_state(args):
1140    current_user = get_current_user(args)
1141    users = get_users(args)
1142    return {"current_user": current_user, "users": users, "sdk_version": get_sdk_version(args), "features": get_features(args)}
1143
1144def get_features(args):
1145    return [n[8:].strip() for n in execute_shell_command("", args, ["adb", "shell", "pm", "list", "features"])[0].split("\n")]
1146
1147def get_sdk_version(args):
1148    return int(execute_shell_command("", args, ["adb", "shell", "getprop", "ro.build.version.sdk"])[0].strip())
1149
1150def get_users(args):
1151    users_output, err = execute_shell_command("Test", args, ["adb", "shell", "cmd user list -v"])
1152    users = {}
1153    for user_row in users_output.split("\n")[1:]:
1154        if not user_row:
1155            continue
1156
1157        id = int(user_row.split("id=", 2)[1].split(",", 2)[0])
1158        type = user_row.split("type=", 2)[1].split(",", 2)[0]
1159        flags = user_row.split("flags=", 2)[1].split(" ", 2)[0].split("|")
1160        parent = None
1161        if "PROFILE" in flags:
1162            parent = int(user_row.split("parentId=", 2)[1].split(")", 2)[0])
1163        user = {"id": id, "flags": flags, "type": type, "parent": parent}
1164        users[user["id"]] = user
1165
1166    for user in users.values():
1167        if user["type"] == "profile.MANAGED":
1168            users[user["parent"]]["work_profile_id"] = user["id"]
1169
1170    return users
1171
1172def get_current_user(args):
1173    output = execute_shell_command("Test", args, ["adb", "shell", "am", "get-current-user"])
1174    try:
1175        return int(output[0].strip())
1176    except (IndexError, ValueError):
1177        print("Error parsing current user. Output: " + output[0] + " Err: " + output[1])
1178        sys.exit(1)
1179
1180def ensure_no_dpcs(device_state, args):
1181    device_policy_output = execute_shell_command("", args, ["adb", "shell", "dumpsys", "device_policy"])[0]
1182
1183    for user in device_policy_output.split("Enabled Device Admins (")[1:]:
1184        user_line, device_admin_line, other = user.split("\n", 2)
1185        if len(device_admin_line.strip()) > 0:
1186            user = user_line.split(" ", 1)[1].split(",", 1)[0]
1187            admin_name = device_admin_line.strip().split(":", 1)[0]
1188            print(execute_shell_command("", args, ["adb", "shell", "dpm", "remove-active-admin", "--user", user, admin_name]))
1189
1190def increase_maximum_user_limit(args):
1191    default_maximum_users = 4
1192    if len(args.states) > default_maximum_users:
1193        execute_shell_command("Test", args, ["adb", "root"])
1194        execute_shell_command("Test", args, ["adb", "wait-for-device"])
1195        execute_shell_command("Test", args, ["adb", "shell", "setprop", "fw.max_users", str(len(args.states))])
1196
1197
1198def run_tests(args):
1199    test = None
1200    btest_run = BtestRun()
1201    total_test_count = 0
1202    next_test = 1
1203    has_quit = False
1204
1205    device_state = gather_device_state(args)
1206    args.headless = "HEADLESS" in device_state["users"][0]["type"]
1207
1208    ensure_correct_number_of_non_for_testing_users(args, device_state)
1209
1210    if args.headless:
1211        if RUN_ON_CLONE_PROFILE in args.states:
1212            print("Not running on clone profile on headless")
1213            args.states.remove(RUN_ON_CLONE_PROFILE)
1214        if RUN_ON_PRIVATE_PROFILE in args.states:
1215            print("Not running on private profile on headless")
1216            args.states.remove(RUN_ON_PRIVATE_PROFILE)
1217
1218    # TODO(b/298356062): Increase this only when there is a maximum user limit exception and
1219    # set the max-user limit to the default after the tests.
1220    increase_maximum_user_limit(args)
1221
1222    # Construct modules with args
1223
1224    states = set()
1225    for module in args.modules:
1226        for m in supported_modules[module][STATES]:
1227            states.add(m)
1228    states = [s(args) for s in states]
1229
1230    if not args.headless:
1231        states = [t for t in states if not t.name() == "RUN_ON_ADDITIONAL_USER"]
1232
1233    if not "android.software.device_admin" in device_state["features"]:
1234        filtered_states = []
1235        for t in states:
1236            if t.name() == "RUN_ON_WORK_PROFILE":
1237                print("device_admin not supported, skipping work profile state")
1238                continue
1239            filtered_states.append(t)
1240        states = filtered_states
1241
1242    if "i" in args.states:
1243        args.states.remove("i")
1244        # Initial - replace with system or secondary
1245        if args.headless:
1246            args.states.append(RUN_ON_SECONDARY_USER)
1247        else:
1248            args.states.append(RUN_ON_SYSTEM_USER)
1249
1250    if RUN_ON_CURRENT_USER in args.states and len(args.states) > 1:
1251        for state in states:
1252            if (state.is_active(device_state)):
1253                # Found current
1254                args.states.append(state.__class__)
1255                break
1256
1257    # We calculate annotations before filtering so we properly exclude all
1258    all_include_annotations = []
1259    for state in states:
1260        all_include_annotations.extend(state.include_annotations(args))
1261
1262    states = [m for m in states if m.__class__ in args.states]
1263
1264    first_state = None
1265
1266    for state in states:
1267        if (state.is_active(device_state)):
1268            first_state = state
1269            state.initialise(device_state) # Entering a state we are already in
1270            break
1271
1272    if first_state is None:
1273        # We are not in any state, enter the first one arbitrarily
1274        first_state = states[0]
1275        first_state.enter(device_state)
1276
1277    # Move to start
1278    states.insert(0, states.pop(states.index(first_state)))
1279    needs_to_enter_state = False
1280
1281    instrumented_runs = {}
1282    for module in args.modules:
1283        instrumented_runs[module] = [target[1] for target in args.targets if target[0] == module and target[1] is not None]
1284
1285    if (args.root):
1286        execute_shell_command("Test", args, ["adb", "root"])
1287    else:
1288        execute_shell_command("Test", args, ["adb", "unroot"])
1289
1290    try:
1291        for i, state in enumerate(states):
1292            print(state.name())
1293            debug(args, "[Test] Running tests for " + state.name())
1294            if needs_to_enter_state:
1295                state.enter(device_state)
1296            include_annotations = state.include_annotations(args)
1297            exclude_annotations = [x for x in all_include_annotations if not x in state.all_supported_annotations(args)]
1298
1299            for module in instrumented_runs:
1300                test = Test(args, module, instrumented_runs[module], state, btest_run, total_test_count, next_test, include_annotations, exclude_annotations, (i < len(states) - 1))
1301                test.run()
1302                total_test_count = test.total_test_count
1303                next_test = test.next_test
1304            needs_to_enter_state = True
1305    except KeyboardInterrupt:
1306        # Kill the test process then move on to print the results
1307        if test is not None:
1308            test.kill()
1309        has_quit = True
1310    except Exception as e:
1311        if test is not None:
1312            test.kill()
1313        raise e
1314
1315    return btest_run, has_quit
1316
1317def ensure_correct_number_of_non_for_testing_users(args, device_state):
1318    allowed_non_for_testing_users = 1
1319    if args.headless:
1320        allowed_non_for_testing_users = 2
1321    has_changed_users = False
1322
1323    for user_id in sorted(device_state["users"].keys()):
1324        if is_for_testing(device_state, device_state["users"][user_id]):
1325            continue
1326        if allowed_non_for_testing_users <= 0:
1327            print("Removing user " + str(user_id) + " as exceeded supported non-for-testing users")
1328            remove_user(args, user_id)
1329            has_changed_users = True
1330        allowed_non_for_testing_users -= 1
1331
1332    if has_changed_users:
1333        device_state["users"] = get_users(args)
1334
1335def main():
1336    args = get_args()
1337
1338    build_modules(args)
1339
1340    install(args)
1341
1342    if args.test:
1343        should_run = True
1344        while should_run:
1345            should_run = False
1346            btest_run, has_quit = run_tests(args)
1347
1348            out = os.environ["OUT"]
1349            with open(out + "/btest_passes.txt", "w") as o:
1350                for test in btest_run.passed_tests:
1351                    o.write(test["testName"] + "\n")
1352            with open(out + "/btest_failures.txt", "w") as o:
1353                for test in btest_run.failed_tests:
1354                    o.write(test["testName"] + "\n")
1355            with open(out + "/btest_ignored.txt", "w") as o:
1356                for test in btest_run.ignored_tests:
1357                    o.write(test["testName"] + "\n")
1358            with open(out + "/btest_assumption_failed.txt", "w") as o:
1359                for test in btest_run.assumption_failed_tests:
1360                    o.write(test["testName"] + "\n")
1361
1362            print("\n" + PASSED_CODE + "Passed: " + str(len(btest_run.passed_tests)) + RESET_CODE
1363                  + "," + FAILED_CODE + " Failed: " + str(len(btest_run.failed_tests)) + RESET_CODE
1364                  + "," + IGNORED_CODE + " Ignored: " + str(len(btest_run.ignored_tests)) + RESET_CODE
1365                  + ", " + IGNORED_CODE + "Assumption Failed: " + str(len(btest_run.assumption_failed_tests)) + RESET_CODE)
1366
1367            if len(btest_run.failed_tests) > 0:
1368                print("\n\nFailures:")
1369                for test_result in btest_run.failed_tests:
1370                    print(test_result["testName"] + " ❌  " + FAILED_CODE + "FAILED (" + test_result["message"] + ")" + RESET_CODE + " (" + format_nanos(test_result["runTime"]) + ")", flush=True)
1371            else:
1372                # No failures
1373                if args.rerun_after_all_pass and not has_quit:
1374                    print("All passed. rerun-after-all-pass specified. Rerunning...")
1375                    should_run = True
1376
1377def debug(args, msg):
1378    if args.debug:
1379        print(msg)
1380
1381if __name__ == '__main__':
1382    main()