1# Copyright 2019 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4import json
5from mock import patch
6import unittest
7
8
9from autotest_lib.client.common_lib import error
10from autotest_lib.client.cros.enterprise import policy_group
11from autotest_lib.client.cros.enterprise import policy_manager
12
13"""
14This is the unittest file for policy_manager.py.
15If you modify that file, you should be at minimum re-running this file.
16
17Add and correct tests as changes are made to the utils file.
18
19To run the tests, use the following command from your DEV box (outside_chroot):
20
21src/third_party/autotest/files/utils$ python unittest_suite.py \
22autotest_lib.client.cros.enterprise.policy_manager_unittest --debug
23
24"""
25FX_NAME = '_get_pol_from_api'
26PATCH_BASE = 'autotest_lib.client.cros.enterprise.enterprise_policy_utils'
27PATCH_PATH = '{}.{}'.format(PATCH_BASE, FX_NAME)
28
29
30class TestPolicyManager(unittest.TestCase):
31
32    def configPolicyManager(self, username=None):
33        self.policy_manager = policy_manager.Policy_Manager(username)
34        self.policy_manager.autotest_ext = 'Demo'
35        self.maxDiff = None
36        self.expected = {'deviceLocalAccountPolicies': {},
37                         'extensionPolicies': {
38                             'ExtID1':
39                                 {'Extension Policy 1':
40                                     {'scope': 'user',
41                                      'level': 'mandatory',
42                                      'value': 'EP1',
43                                      'source': 'cloud'}}},
44                         'chromePolicies':
45                             {'Suggested Policy 1':
46                                 {'scope': 'user',
47                                  'level': 'recommended',
48                                  'value': 'Suggested 1',
49                                  'source': 'cloud'},
50                              'Test Policy1':
51                                 {'scope': 'user',
52                                  'level': 'mandatory',
53                                  'value': 'Policy1',
54                                  'source': 'cloud'},
55                               'Device Policy1':
56                                 {'scope': 'machine',
57                                  'level': 'mandatory',
58                                  'value': 'Device 1', 'source': 'cloud'}}}
59
60    def test_Defaults(self):
61        self.configPolicyManager()
62        self.assertEqual(type(self.policy_manager._configured),
63                         policy_group.AllPolicies)
64        self.assertEqual(type(self.policy_manager._obtained),
65                         policy_group.AllPolicies)
66
67    def test_configure_policies_and_get_configured_as_dict(self):
68        self.configPolicyManager()
69        self.policy_manager.configure_policies(
70            user={'Test Policy1': 'Policy1'},
71            device={'Device Policy1': 'Device 1'},
72            suggested_user={'Suggested Policy 1': 'Suggested 1'},
73            extension={'ExtID1': {'Extension Policy 1': 'EP1'}})
74        configured = self.policy_manager.get_configured_policies_as_dict()
75        self.assertEqual(configured, self.expected)
76
77    @patch(PATCH_PATH)
78    def test_obtain_policies_from_device_and_get_obtained_policies_as_dict(
79            self, get_pol_mock):
80        self.configPolicyManager()
81        get_pol_mock.return_value = self.expected
82        self.policy_manager.obtain_policies_from_device()
83        received = self.policy_manager.get_obtained_policies_as_dict()
84        self.assertEqual(received, self.expected)
85
86    def test_changing_policy(self):
87        """Test setting a policy to True, then changing the value"""
88        self.policy_manager = policy_manager.Policy_Manager()
89
90        self.policy_manager.configure_policies(user={'TP1': False})
91        configured = self.policy_manager.get_configured_policies_as_dict()
92        self.assertEqual(configured['chromePolicies']['TP1']['value'], False)
93
94        # Now set the policy to False
95        self.policy_manager.configure_policies(user={'TP1': True}, new=False)
96        configured = self.policy_manager.get_configured_policies_as_dict()
97        self.assertEqual(configured['chromePolicies']['TP1']['value'], True)
98
99    def test_remove_policy(self):
100        """
101        Will test remove_policy and the private methods:
102            _removeChromePolicy
103            _removeExtensionPolicy
104
105        """
106        # Setup the policy Manager
107        self.policy_manager = policy_manager.Policy_Manager()
108        self.policy_manager.configure_policies(
109            user={'Test Policy1': 'Policy1', 'Test Policy2': 'Policy2'})
110
111        # Remove one policy. Verify it is gone, but the other is there.
112        self.policy_manager.remove_policy('Test Policy2', 'user')
113        self.assertNotIn('Test Policy2', self.policy_manager._configured.chrome)
114        self.assertIn('Test Policy1', self.policy_manager._configured.chrome)
115
116        # Add an extension Policy. Verify it is there, then remove and verify.
117        self.policy_manager.configure_policies(extension={'ID1': {'P1': 'V1'}},
118                                               new=False)
119
120        self.assertIn('ID1', self.policy_manager._configured.extension_configured_data)
121        self.assertIn('P1', self.policy_manager._configured.extension_configured_data['ID1'])
122        self.policy_manager.remove_policy('P1', 'extension', extID='ID1')
123        self.assertNotIn('P1', self.policy_manager._configured.extension_configured_data['ID1'])
124
125        # Attempt to remove non-existant policies. Verify an error is raised
126        with self.assertRaises(error.TestError) as context:
127            self.policy_manager.remove_policy('Test Policy2', 'user')
128        self.assertEqual(str(context.exception),
129                         'Policy Test Policy2 missing from chrome policies.')
130
131        # Attempt to remove an extension policy without an ID and non-existant
132        # policy in the valid extension
133        with self.assertRaises(error.TestError) as context:
134            self.policy_manager.remove_policy('P1', 'extension')
135        self.assertEqual(str(context.exception),
136                         'Cannot delete extension policy without extension ID')
137
138        with self.assertRaises(error.TestError) as context:
139            self.policy_manager.remove_policy('P2', 'extension', 'ID1')
140        self.assertEqual(str(context.exception),
141                         'Policy P2 missing from extension policies.')
142
143    @patch(PATCH_PATH)
144    def test_verify_policies(self, get_pol_mock):
145        """Test the verify_policies method."""
146        self.configPolicyManager()
147        self.policy_manager.configure_policies(
148            user={'Test Policy1': 'Policy1'},
149            device={'Device Policy1': 'Device 1'},
150            suggested_user={'Suggested Policy 1': 'Suggested 1'},
151            extension={'ExtID1': {'Extension Policy 1': 'EP1'}})
152        get_pol_mock.return_value = self.expected
153        self.policy_manager.obtain_policies_from_device()
154        self.policy_manager.verify_policies()
155
156        # Add another policy to the configured, and test verify_policies fails.
157        self.policy_manager.configure_policies(extension={'ID1': {'P1': 'V1'}},
158                                               new=False)
159        with self.assertRaises(error.TestError) as context:
160            self.policy_manager.verify_policies()
161        self.assertEqual(
162            str(context.exception),
163            'Configured policies did not match policies received from DUT.')
164
165    @patch(PATCH_PATH)
166    def test_verify_special_extension(self, get_pol_mock):
167        """Test the configure_extension_visual_policy and the verify_policies
168        methods."""
169        self.configPolicyManager()
170        self.policy_manager.configure_policies(
171            user={'Test Policy1': 'Policy1'},
172            device={'Device Policy1': 'Device 1'},
173            suggested_user={'Suggested Policy 1': 'Suggested 1'},
174            extension={'ExtID1': {'Afile': 'Adirectory'}})
175        get_pol_mock.return_value = self.expected
176        self.policy_manager.obtain_policies_from_device()
177        with self.assertRaises(error.TestError) as context:
178            self.policy_manager.verify_policies()
179        self.assertEqual(
180            str(context.exception),
181            'Configured policies did not match policies received from DUT.')
182
183        # Add the "visual" policy (aka how the policy should be reported).
184        self.policy_manager.configure_extension_visual_policy(
185            {'ExtID1': {'Extension Policy 1': 'EP1'}})
186        self.policy_manager.verify_policies()
187
188        # Test the configured shows actual policy value by default
189        configured_policies = (self.policy_manager.
190                               get_configured_policies_as_dict())
191        expected_extension = {'ExtID1':
192                                {'Afile':
193                                    {'scope': 'user',
194                                     'level': 'mandatory',
195                                     'value': 'Adirectory',
196                                     'source': 'cloud'}}}
197        self.assertEqual(expected_extension,
198                         configured_policies['extensionPolicies'])
199
200        # Finally, test the configured can also get the 'visual' policy.
201        visual_configured_policies = (self.policy_manager.
202                                      get_configured_policies_as_dict(True))
203        expected_extension = {'ExtID1':
204                                {'Extension Policy 1':
205                                    {'scope': 'user',
206                                     'level': 'mandatory',
207                                     'value': 'EP1',
208                                     'source': 'cloud'}}}
209        self.assertEqual(expected_extension,
210                         visual_configured_policies['extensionPolicies'])
211
212    @patch(PATCH_PATH)
213    def test_get_policy_value_from_DUT(self, get_pol_mock):
214        """Test obtaining a single policy value from the DUT:
215            Getting a policy prior to obtaining (returns None)
216            Getting a normal chrome policy, using the Refresh flag
217            Getting an ExtensionPolicy
218            """
219        self.configPolicyManager()
220        policy_value = self.policy_manager.get_policy_value_from_DUT(
221            'Test Policy1')
222        self.assertEqual(policy_value, None)
223        get_pol_mock.return_value = self.expected
224        policy_value = self.policy_manager.get_policy_value_from_DUT(
225            'Test Policy1', refresh=True)
226        self.assertEqual(policy_value, 'Policy1')
227        extension_policy_value = self.policy_manager.get_policy_value_from_DUT(
228            'Extension Policy 1', 'ExtID1', False)
229        self.assertEqual(extension_policy_value, 'EP1')
230
231    def test_DMServerConfig(self):
232        """
233        Test the getDMConfig method via the following:
234            Test the default DM json
235            Test the DM Json when policies are configured.
236
237        """
238        self.configPolicyManager('Test_UserName')
239        base_config = json.loads(self.policy_manager.getDMConfig())
240        default_dm = (
241            {'invalidation_name': 'test_policy',
242             'invalidation_source': 16,
243             'google/chromeos/device': {},
244             'current_key_index': 0,
245             'google/chrome/extension': {},
246             'managed_users': ['*'],
247             'google/chromeos/user': {'recommended': {}, 'mandatory': {}},
248             'policy_user': 'Test_UserName'})
249        self.assertEqual(base_config, default_dm)
250
251        self.policy_manager.configure_policies(
252            user={'Test Policy1': 'Policy1'},
253            device={'SystemTimezone': 'Device 1'},
254            suggested_user={'Suggested Policy 1': 'Suggested 1'},
255            extension={'ExtID1': {'Extension Policy 1': 'EP1'}})
256
257        expected_DMJson = (
258            {"invalidation_name": "test_policy",
259             "invalidation_source": 16,
260             "google/chromeos/device":
261                {"system_timezone.timezone": "Device 1"},
262             "current_key_index": 0,
263             "google/chrome/extension":
264                {"ExtID1": {"Extension Policy 1": "EP1"}},
265             "managed_users": ["*"],
266             "google/chromeos/user":
267                {"recommended": {"Suggested Policy 1": "Suggested 1"},
268                 "mandatory": {"Test Policy1": "Policy1"}},
269             "policy_user": "Test_UserName"})
270        Dm_with_policies = json.loads(self.policy_manager.getDMConfig())
271        self.assertEqual(Dm_with_policies, expected_DMJson)
272
273    def test_getCloudDpc(self):
274        """
275        Test getCloudDpc and the following private methods:
276            _arc_certs
277            _add_shared_arc_policy
278            _add_shared_policies
279            _add_arc_certs
280
281        """
282        self.configPolicyManager()
283        self.policy_manager.configure_policies(
284            user={'Test Policy1': 'Policy1',
285                  'ArcCertificatesSyncMode': 'TestValue1',
286                  'OpenNetworkConfiguration': 'Apolicy',
287                  'VideoCaptureAllowed': True,
288                  'ArcPolicy':
289                    {'applications': 'SomeApplication',
290                     'OtherPolicy': 'OtherValue'}
291                  })
292        expected = {'applications': 'SomeApplication',
293                    'cameraDisabled': True,
294                    'caCerts': 'Apolicy'}
295        cloudDpcPolicies = self.policy_manager.getCloudDpc()
296        self.assertEqual(expected, cloudDpcPolicies)
297
298
299if __name__ == '__main__':
300    unittest.main()
301