1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import dbus, gobject, sys
6
7import common
8from autotest_lib.client.common_lib import error
9from autotest_lib.client.common_lib.cros import session_manager
10from autotest_lib.client.cros import ownership
11
12
13"""Utility class for tests that generate, push and fetch policies.
14
15As the python bindings for the protobufs used in policies are built as a part
16of tests that use them, callers must pass in their location at call time."""
17
18
19def compare_policy_response(proto_binding_location, policy_response,
20                            owner=None, guests=None, new_users=None,
21                            roaming=None, whitelist=None, proxies=None):
22    """Check the contents of |policy_response| against given args.
23
24    Deserializes |policy_response| into a PolicyFetchResponse protobuf,
25    with an embedded (serialized) PolicyData protobuf that embeds a
26    (serialized) ChromeDeviceSettingsProto, and checks to see if this
27    protobuf turducken contains the information passed in.
28
29    @param proto_binding_location: the location of generated python bindings
30                                   for policy protobufs.
31    @param policy_response: string serialization of a PolicyData protobuf.
32    @param owner: string representing the owner's name/account.
33    @param guests: boolean indicating whether guests should be allowed.
34    @param new_users: boolean indicating if user pods are on login screen.
35    @param roaming: boolean indicating whether data roaming is enabled.
36    @param whitelist: list of accounts that are allowed to log in.
37    @param proxies: dictionary - { 'proxy_mode': <string> }
38
39    @return True if |policy_response| has all the provided data, else False.
40    """
41    # Pull in protobuf bindings.
42    sys.path.append(proto_binding_location)
43    from device_management_backend_pb2 import PolicyFetchResponse
44    from device_management_backend_pb2 import PolicyData
45    from chrome_device_policy_pb2 import ChromeDeviceSettingsProto
46    from chrome_device_policy_pb2 import AllowNewUsersProto
47    from chrome_device_policy_pb2 import GuestModeEnabledProto
48    from chrome_device_policy_pb2 import ShowUserNamesOnSigninProto
49    from chrome_device_policy_pb2 import DataRoamingEnabledProto
50    from chrome_device_policy_pb2 import DeviceProxySettingsProto
51
52    response_proto = PolicyFetchResponse()
53    response_proto.ParseFromString(policy_response)
54    ownership.assert_has_policy_data(response_proto)
55
56    data_proto = PolicyData()
57    data_proto.ParseFromString(response_proto.policy_data)
58    ownership.assert_has_device_settings(data_proto)
59    if owner: ownership.assert_username(data_proto, owner)
60
61    settings = ChromeDeviceSettingsProto()
62    settings.ParseFromString(data_proto.policy_value)
63    if guests: ownership.assert_guest_setting(settings, guests)
64    if new_users: ownership.assert_show_users(settings, new_users)
65    if roaming: ownership.assert_roaming(settings, roaming)
66    if whitelist:
67        ownership.assert_new_users(settings, False)
68        ownership.assert_users_on_whitelist(settings, whitelist)
69        if proxies: ownership.assert_proxy_settings(settings, proxies)
70
71
72def build_policy_data(proto_binding_location, owner=None, guests=None,
73                      new_users=None, roaming=None, whitelist=None,
74                      proxies=None):
75    """Generate and serialize a populated device policy protobuffer.
76
77    Creates a PolicyData protobuf, with an embedded
78    ChromeDeviceSettingsProto, containing the information passed in.
79
80    @param proto_binding_location: the location of generated python bindings
81                                   for policy protobufs.
82    @param owner: string representing the owner's name/account.
83    @param guests: boolean indicating whether guests should be allowed.
84    @param new_users: boolean indicating if user pods are on login screen.
85    @param roaming: boolean indicating whether data roaming is enabled.
86    @param whitelist: list of accounts that are allowed to log in.
87    @param proxies: dictionary - { 'proxy_mode': <string> }
88
89    @return serialization of the PolicyData proto that we build.
90    """
91    # Pull in protobuf bindings.
92    sys.path.append(proto_binding_location)
93    from device_management_backend_pb2 import PolicyData
94    from chrome_device_policy_pb2 import ChromeDeviceSettingsProto
95    from chrome_device_policy_pb2 import AllowNewUsersProto
96    from chrome_device_policy_pb2 import GuestModeEnabledProto
97    from chrome_device_policy_pb2 import ShowUserNamesOnSigninProto
98    from chrome_device_policy_pb2 import DataRoamingEnabledProto
99    from chrome_device_policy_pb2 import DeviceProxySettingsProto
100
101    data_proto = PolicyData()
102    data_proto.policy_type = ownership.POLICY_TYPE
103    if owner: data_proto.username = owner
104
105    settings = ChromeDeviceSettingsProto()
106    if guests:
107        settings.guest_mode_enabled.guest_mode_enabled = guests
108    if new_users:
109        settings.show_user_names.show_user_names = new_users
110    if roaming:
111        settings.data_roaming_enabled.data_roaming_enabled = roaming
112    if whitelist:
113        settings.allow_new_users.allow_new_users = False
114        for user in whitelist:
115            settings.user_whitelist.user_whitelist.append(user)
116    if proxies:
117        settings.device_proxy_settings.proxy_mode = proxies['proxy_mode']
118
119    data_proto.policy_value = settings.SerializeToString()
120    return data_proto.SerializeToString()
121
122
123def generate_policy(proto_binding_location, key, pubkey, policy, old_key=None):
124    """Generate and serialize a populated, signed device policy protobuffer.
125
126    Creates a protobuf containing the device policy |policy|, signed with
127    |key|.  Also includes the public key |pubkey|, signed with |old_key|
128    if provided.  If not, |pubkey| is signed with |key|.  The protobuf
129    is serialized to a string and returned.
130
131    @param proto_binding_location: the location of generated python bindings
132                                   for policy protobufs.
133    @param key: new policy signing key.
134    @param pubkey: new public key to be signed and embedded in generated
135                   PolicyFetchResponse.
136    @param policy: policy data to be embedded in generated PolicyFetchResponse.
137    @param old_key: if provided, this implies the generated PolicyFetchRespone
138                    is intended to represent a key rotation.  pubkey will be
139                    signed with this key before embedding.
140
141    @return serialization of the PolicyFetchResponse proto that we build.
142    """
143    # Pull in protobuf bindings.
144    sys.path.append(proto_binding_location)
145    from device_management_backend_pb2 import PolicyFetchResponse
146
147    if old_key == None:
148        old_key = key
149    policy_proto = PolicyFetchResponse()
150    policy_proto.policy_data = policy
151    policy_proto.policy_data_signature = ownership.sign(key, policy)
152    policy_proto.new_public_key = pubkey
153    policy_proto.new_public_key_signature = ownership.sign(old_key, pubkey)
154    return policy_proto.SerializeToString()
155
156
157def push_policy_and_verify(policy_string, sm):
158    """Push a device policy to the session manager over DBus.
159
160    The serialized device policy |policy_string| is sent to the session
161    manager with the StorePolicy DBus call.  Success of the store is
162    validated by fetching the policy again and comparing.
163
164    @param policy_string: serialized policy to push to the session manager.
165    @param sm: a connected SessionManagerInterface.
166
167    @raises error.TestFail if policy push failed.
168    """
169    listener = session_manager.OwnershipSignalListener(gobject.MainLoop())
170    listener.listen_for_new_policy()
171    sm.StorePolicy(dbus.ByteArray(policy_string), byte_arrays=True)
172    listener.wait_for_signals(desc='Policy push.')
173
174    retrieved_policy = sm.RetrievePolicy(byte_arrays=True)
175    if retrieved_policy != policy_string:
176        raise error.TestFail('Policy should not be %s' % retrieved_policy)
177
178
179def get_policy(sm):
180    """Get a device policy from the session manager over DBus.
181
182    Provided mainly for symmetry with push_policy_and_verify().
183
184    @param sm: a connected SessionManagerInterface.
185
186    @return Serialized PolicyFetchResponse.
187    """
188    return sm.RetrievePolicy(byte_arrays=True)
189