1import json
2import logging
3
4from autotest_lib.client.common_lib import error
5from autotest_lib.client.cros.enterprise import enterprise_policy_utils
6from autotest_lib.client.cros.enterprise.policy_group import AllPolicies
7
8
9CHROMEPOLICIES = 'chromePolicies'
10DEVICELOCALACCOUNT = 'deviceLocalAccountPolicies'
11EXTENSIONPOLICIES = 'extensionPolicies'
12
13CHROMEOS_CLOUDDPC = {
14    'AudioCaptureAllowed': 'unmuteMicrophoneDisabled',
15    'DefaultGeolocationSetting': 'shareLocationDisabled',
16    'DeviceBlockDevmode': 'debuggingFeaturesDisabled',
17    'DisableScreenshots': 'screenCaptureDisabled',
18    'ExternalStorageDisabled': 'usbFileTransferDisabled',
19    'VideoCaptureAllowed': 'cameraDisabled',
20}
21
22
23class Policy_Manager(object):
24
25    def __init__(self, username=None, fake_dm_server=None):
26        """
27        This class is to hanlde:
28            Setting policies to a Fake DM server
29            Obtaining policies from a DUT
30            Obtaining clouddpc policy settings.
31            Obtinaing both set/received policies in many different data formats
32            Verifying policies provided to the DUT are correct.
33
34        It has been designed so all the features are independent, meaning a
35        fake_dm_server is not required to obtain policies from the DUT, get
36        clouddpc values, etc.
37
38        @params username: username to be used when creating the fake DM policy.
39        @params fake_dm_server: The fake DM server object.
40
41        """
42        self._configured = AllPolicies(True)
43        self._obtained = AllPolicies()
44        self.CHROME = 'chrome'
45        self.LOCAL = 'local'
46        self.EXT = 'ext'
47        self.username = username
48        self.fake_dm_server = fake_dm_server
49        self.autotest_ext = None
50
51        # If a fake_dm_sever is provided, enabled auto_update by default.
52        # Can be turned off if desired.
53        self._auto_updateDM = bool(fake_dm_server)
54
55    def configure_policies(self,
56                           user={},
57                           suggested_user={},
58                           device={},
59                           extension={},
60                           new=True):
61        """
62        Used to configure desired policies on the DUT. Will also save the
63        configured policies.
64
65        If a fake_dm_server was provided on class initialization, and
66        auto_updateDM is not False, this will also update the fake_dm_server.
67
68        @params user/suggested_user/device/extension: dict of the policies to
69            be set. extension must be provided with the extension id. e.g.
70            {'ExtensionID': {policy_dict}}
71        @param new: bool, if True clear all previously configured policies.
72
73        """
74        if new:
75            self._configured = AllPolicies(True)
76
77        self._configured.set_policy('chrome', user, 'user')
78        self._configured.set_policy('chrome', suggested_user, 'suggested_user')
79        self._configured.set_policy('chrome', device, 'device')
80
81        self._configured.set_extension_policy(extension)
82
83        if self.auto_updateDM:
84            self.updateDMServer()
85
86    def configure_extension_visual_policy(self,
87                                          ext_policy={},
88                                          new=True):
89        """
90        Extensions are... different. The policy set for them often is not the
91        actual policy, but a pointer to where the policy data is. This makes
92        verifying the extension policy tricky.
93
94        To help with, this function will allow you to set the 'real' policy.
95        Important things to note: This is only useful for verifying the policy,
96        and getting the policy as a dictionary (which has a flag for which
97        style of extension policies you want to see. The DM server will not be
98        set via this.
99
100        @param ext_policy: dict, Extension policy must be provided with the
101            extension id:
102                {'ExtensionID': {policy_dict}}.
103        @param new: bool, if True will erase any previously stored VISUAL
104            extension data.
105
106        """
107        if new:
108            self._configured.ext_values = {}
109
110        self._configured.set_extension_policy(ext_policy, True)
111
112    def remove_policy(self,
113                      policy_name,
114                      policy_type,
115                      extID=None):
116        """
117        Removes the policy from the configured policies. Useful when you want
118        clear a specific policy, but leave the other policies untouched.
119        If auto_updateDM is True (thus a fake_dm_server has been provided), the
120        dm server will be updated.
121
122        @param policy_name: The policy name
123        @param policy_type: The type of policy it is. Valid types:
124            "user", "device", "extension", "suggested_user".
125        @param extID: The extension ID, if removing an extension policy.
126
127        """
128        if policy_type != 'extension':
129            self._removeChromePolicy(policy_name)
130        else:
131            self._removeExtensionPolicy(policy_name, extID)
132
133        if self.auto_updateDM:
134            self.updateDMServer()
135
136    def _removeChromePolicy(self, policy_name):
137        """
138        Attempts to remove the specified extension policy from specified
139        extension.
140
141        @rasies error.TestError: If the policy is not in the configured
142            policies.
143
144        """
145        try:
146            del self._configured.chrome[policy_name]
147        except KeyError:
148            raise error.TestError('Policy {} missing from chrome policies.'
149                                  .format(policy_name))
150
151    def _removeExtensionPolicy(self, policy_name, extID):
152        """
153        Attempts to remove the specified extension policy from specified
154        extension.
155
156        @raises error.TestError: if the policy_type is an 'extension', but the
157            extID is not provided, or the policy is not found in the extension.
158
159        """
160        if not extID:
161            raise error.TestError(
162                'Cannot delete extension policy without extension ID')
163        try:
164            del self._configured.extension_configured_data[extID][policy_name]
165        except KeyError:
166            raise error.TestError(
167                'Policy {} missing from extension policies.'
168                .format(policy_name))
169
170    def obtain_policies_from_device(self, autotest_ext=None):
171        """
172        Calls the autotest private getAllEnterprisePolicies() API, and saves
173        the response.
174
175        @param autotest_ext: The autotest browser extension.
176
177        """
178        if autotest_ext:
179            self.autotest_ext = autotest_ext
180        if not self.autotest_ext:
181            raise error.TestError('Cannot obtain policies without autotest_ext')
182        self.raw_data = enterprise_policy_utils.get_all_policies(
183            self.autotest_ext)
184        self._obtained.set_policy(self.CHROME, self.raw_data[CHROMEPOLICIES])
185        self._obtained.set_policy(self.LOCAL, self.raw_data[DEVICELOCALACCOUNT])
186        self._obtained.set_extension_policy(self.raw_data[EXTENSIONPOLICIES])
187
188    def verify_policy(self, policyName, policy_value, extID=None):
189        """Verifies the configured policies are == to the policies obtained."""
190        recieved_value = self.get_policy_value_from_DUT(policyName=policyName,
191                                                        extID=extID,
192                                                        refresh=True)
193        if not recieved_value == policy_value:
194            raise error.TestError(
195                'Policy {} value was not set correctly. \nExpected:\t {}'
196                '\nReceived: \t'.format(policyName,
197                                        policy_value,
198                                        recieved_value))
199        logging.info('Policy verification successful')
200
201    def verify_policies(self):
202        """Verifies the configured policies are == to the policies obtained."""
203        if not self._configured == self._obtained:
204            raise error.TestError(
205                'Configured policies did not match policies received from DUT.')
206        logging.info('Policy verification successful')
207
208    def get_policy_value_from_DUT(self, policyName, extID=None, refresh=False):
209        """
210        Get the value of a specified policy from the DUT. If the policy is from
211        an extension, an extension ID (extID) must be provided.
212
213        @param policyName: str, the name of the policy.
214        @param extID: The ID of the extension.
215        @param refresh: bool, if you want to get the policies from the DUT again
216            Note: This does NOT force the DUT to re-obtain the policies from the
217            DM server.
218
219        @returns: The value of the policy if found, else None.
220        """
221        if refresh:
222            # Uses the previously provided autotest Extension.
223            self.obtain_policies_from_device()
224
225        if extID:
226            if policyName in self._obtained.extension_configured_data[extID]:
227                return (
228                    self._obtained.extension_configured_data[extID][policyName]
229                        .value)
230        elif policyName in self._obtained.chrome:
231            return self._obtained.chrome[policyName].value
232        return None
233
234    def updateDMServer(self):
235        """Updates the Fake DM server with the current configured policy."""
236        fake_dm_json = self.getDMConfig()
237        logging.info('Policy blob {}'.format(fake_dm_json))
238        if not self.fake_dm_server:
239            raise error.TestError(
240                'Cannot update DM server. DM server not provided')
241        self.fake_dm_server.setup_policy(fake_dm_json)
242
243    def getDMConfig(self, refresh=True):
244        """"
245        Creates the DM configuration (aka Json blob) to be used by the
246        fake DM server
247
248        @param refresh: bool, if True, will clear any previous configuration.
249            If False, return the currently configured DM blob.
250
251        """
252        if refresh:
253            self._configured.createNewFakeDMServerJson()
254        self._configured.updateDMJson()
255        self._configured._DMJSON['policy_user'] = self.username
256        return json.dumps(self._configured._DMJSON)
257
258    def getCloudDpc(self):
259        """Gets the Cloud DPC ARC policy settings."""
260        expected_cloud_dpc_settings = {}
261        if self._arc_certs():
262            self._add_arc_certs(expected_cloud_dpc_settings)
263        self._add_shared_policies(expected_cloud_dpc_settings)
264        self._add_shared_arc_policy(expected_cloud_dpc_settings)
265
266        return expected_cloud_dpc_settings
267
268    def _arc_certs(self):
269        """
270        Returns True if ArcCertificatesSyncMode is set in the configured
271        policies and the bool(value) is True, else False.
272
273        """
274        if ('ArcCertificatesSyncMode' in self._configured.chrome and
275                self._configured.chrome['ArcCertificatesSyncMode'].value):
276            return True
277        return False
278
279    def _add_shared_arc_policy(self, dpc):
280        """Adds the shared policies that are subset within the 'ArcPolicy'."""
281        Arc_Policy = self._configured.chrome.get('ArcPolicy', {})
282        if Arc_Policy:
283            Arc_Policy = Arc_Policy.value
284
285        for key in ['applications', 'accountTypesWithManagementDisabled']:
286            if key in Arc_Policy:
287                dpc[key] = Arc_Policy[key]
288
289    def _add_shared_policies(self, dpc):
290        """
291        Add all of the configured policies that are shared with arc clouddpc,
292        to the "dpc" dict, with the clouddpc key. If the policy is not set, it
293        will not be added.
294
295        """
296        for policy_name, dpc_name in CHROMEOS_CLOUDDPC.items():
297            if policy_name in self._configured.chrome:
298                dpc[dpc_name] = self._configured.chrome[policy_name].value
299
300    def _add_arc_certs(self, dpc):
301        open_network_config = 'OpenNetworkConfiguration'
302        if open_network_config in self._configured.chrome:
303            dpc['caCerts'] = self._configured.chrome[open_network_config].value
304        else:
305            dpc['caCerts'] = None
306
307    def get_configured_policies_as_dict(self, visual=False):
308        """ Returns the configured policies as a dict."""
309        return self._configured.get_policy_as_dict(visual)
310
311    def get_obtained_policies_as_dict(self):
312        """ Returns the obtained policies as a dict."""
313        return self._obtained.get_policy_as_dict(visual=True)
314
315    @property
316    def auto_updateDM(self):
317        """ Returns the current state of the auto_updateDM setting."""
318        return self._auto_updateDM
319
320    @auto_updateDM.setter
321    def auto_updateDM(self, value):
322        """Turns on/off auto updating of the DM server."""
323        if not isinstance(value, bool):
324            raise error.TestError('Auto Update DM must be bool, got {}'
325                                   .format(value))
326        if value and not self.fake_dm_server:
327            raise error.TestError(
328                'Cannot autoupdate without the Fake DM server configured.')
329        self._auto_updateDM = value
330