1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import dbus, gobject, os, sys
6
7import common
8from autotest_lib.client.common_lib import error
9from autotest_lib.client.common_lib.cros import session_manager
10from autotest_lib.client.cros import ownership
11
12"""Utility class for tests that generate, push and fetch policies.
13
14As the python bindings for the protobufs used in policies are built as a part
15of tests that use them, callers must pass in their location at call time."""
16
17
18def install_protobufs(autodir, job):
19    """Installs policy protobuf dependencies and set import path.
20
21    After calling this, you can simply import any policy pb2.py file directly,
22    e.g. import chrome_device_policy_pb2.
23
24    @param autodir: Autotest directory (usually the caller's self.autodir).
25    @param job: Job instance (usually the caller's self.job).
26    """
27    # TODO(crbug.com/807950): Change the installation process so that policy
28    #                         proto imports can be moved to the top.
29    dep = 'policy_protos'
30    dep_dir = os.path.join(autodir, 'deps', dep)
31    job.install_pkg(dep, 'dep', dep_dir)
32    sys.path.append(dep_dir)
33
34
35def compare_policy_response(policy_response, owner=None, guests=None,
36                            new_users=None, roaming=None, whitelist=None):
37    """Check the contents of |policy_response| against given args.
38
39    Deserializes |policy_response| into a PolicyFetchResponse protobuf,
40    with an embedded (serialized) PolicyData protobuf that embeds a
41    (serialized) ChromeDeviceSettingsProto, and checks to see if this
42    protobuf turducken contains the information passed in.
43
44    @param policy_response: string serialization of a PolicyData protobuf.
45    @param owner: string representing the owner's name/account.
46    @param guests: boolean indicating whether guests should be allowed.
47    @param new_users: boolean indicating if user pods are on login screen.
48    @param roaming: boolean indicating whether data roaming is enabled.
49    @param whitelist: list of accounts that are allowed to log in.
50
51    @return True if |policy_response| has all the provided data, else False.
52    """
53    import chrome_device_policy_pb2
54    import device_management_backend_pb2
55
56    response_proto = device_management_backend_pb2.PolicyFetchResponse()
57    response_proto.ParseFromString(policy_response)
58    ownership.assert_has_policy_data(response_proto)
59
60    data_proto = device_management_backend_pb2.PolicyData()
61    data_proto.ParseFromString(response_proto.policy_data)
62    ownership.assert_has_device_settings(data_proto)
63    if owner: ownership.assert_username(data_proto, owner)
64
65    settings = chrome_device_policy_pb2.ChromeDeviceSettingsProto()
66    settings.ParseFromString(data_proto.policy_value)
67    if guests: ownership.assert_guest_setting(settings, guests)
68    if new_users: ownership.assert_show_users(settings, new_users)
69    if roaming: ownership.assert_roaming(settings, roaming)
70    if whitelist:
71        ownership.assert_new_users(settings, False)
72        ownership.assert_users_on_whitelist(settings, whitelist)
73
74
75def build_policy_data(owner=None, guests=None, new_users=None, roaming=None,
76                      whitelist=None):
77    """Generate and serialize a populated device policy protobuffer.
78
79    Creates a PolicyData protobuf, with an embedded
80    ChromeDeviceSettingsProto, containing the information passed in.
81
82    @param owner: string representing the owner's name/account.
83    @param guests: boolean indicating whether guests should be allowed.
84    @param new_users: boolean indicating if user pods are on login screen.
85    @param roaming: boolean indicating whether data roaming is enabled.
86    @param whitelist: list of accounts that are allowed to log in.
87
88    @return serialization of the PolicyData proto that we build.
89    """
90    import chrome_device_policy_pb2
91    import device_management_backend_pb2
92
93    data_proto = device_management_backend_pb2.PolicyData()
94    data_proto.policy_type = ownership.POLICY_TYPE
95    if owner: data_proto.username = owner
96
97    settings = chrome_device_policy_pb2.ChromeDeviceSettingsProto()
98    if guests:
99        settings.guest_mode_enabled.guest_mode_enabled = guests
100    if new_users:
101        settings.show_user_names.show_user_names = new_users
102    if roaming:
103        settings.data_roaming_enabled.data_roaming_enabled = roaming
104    if whitelist:
105        settings.allow_new_users.allow_new_users = False
106        for user in whitelist:
107            settings.user_whitelist.user_whitelist.append(user)
108
109    data_proto.policy_value = settings.SerializeToString()
110    return data_proto.SerializeToString()
111
112
113def generate_policy(key, pubkey, policy, old_key=None):
114    """Generate and serialize a populated, signed device policy protobuffer.
115
116    Creates a protobuf containing the device policy |policy|, signed with
117    |key|.  Also includes the public key |pubkey|, signed with |old_key|
118    if provided.  If not, |pubkey| is signed with |key|.  The protobuf
119    is serialized to a string and returned.
120
121    @param key: new policy signing key.
122    @param pubkey: new public key to be signed and embedded in generated
123                   PolicyFetchResponse.
124    @param policy: policy data to be embedded in generated PolicyFetchResponse.
125    @param old_key: if provided, this implies the generated PolicyFetchRespone
126                    is intended to represent a key rotation.  pubkey will be
127                    signed with this key before embedding.
128
129    @return serialization of the PolicyFetchResponse proto that we build.
130    """
131    import device_management_backend_pb2
132
133    if old_key == None:
134        old_key = key
135    policy_proto = device_management_backend_pb2.PolicyFetchResponse()
136    policy_proto.policy_data = policy
137    policy_proto.policy_data_signature = ownership.sign(key, policy)
138    policy_proto.new_public_key = pubkey
139    policy_proto.new_public_key_signature = ownership.sign(old_key, pubkey)
140    return policy_proto.SerializeToString()
141
142
143def push_policy_and_verify(policy_string, sm):
144    """Push a device policy to the session manager over DBus.
145
146    The serialized device policy |policy_string| is sent to the session
147    manager with the StorePolicyEx DBus call.  Success of the store is
148    validated by fetching the policy again and comparing.
149
150    @param policy_string: serialized policy to push to the session manager.
151    @param sm: a connected SessionManagerInterface.
152
153    @raises error.TestFail if policy push failed.
154    """
155    listener = session_manager.OwnershipSignalListener(gobject.MainLoop())
156    listener.listen_for_new_policy()
157    descriptor = session_manager.make_device_policy_descriptor()
158    sm.StorePolicyEx(descriptor,
159                     dbus.ByteArray(policy_string), byte_arrays=True)
160    listener.wait_for_signals(desc='Policy push.')
161
162    retrieved_policy = sm.RetrievePolicyEx(descriptor, byte_arrays=True)
163    if retrieved_policy != policy_string:
164        raise error.TestFail('Policy should not be %s' % retrieved_policy)
165
166
167def get_policy(sm):
168    """Get a device policy from the session manager over DBus.
169
170    Provided mainly for symmetry with push_policy_and_verify().
171
172    @param sm: a connected SessionManagerInterface.
173
174    @return Serialized PolicyFetchResponse.
175    """
176    return sm.RetrievePolicyEx(session_manager.make_device_policy_descriptor(),
177                               byte_arrays=True)
178