1import json 2import logging 3 4from autotest_lib.client.common_lib import error 5from autotest_lib.client.cros.enterprise import enterprise_policy_utils 6from autotest_lib.client.cros.enterprise.policy_group import AllPolicies 7 8 9CHROMEPOLICIES = 'chromePolicies' 10DEVICELOCALACCOUNT = 'deviceLocalAccountPolicies' 11EXTENSIONPOLICIES = 'extensionPolicies' 12 13CHROMEOS_CLOUDDPC = { 14 'AudioCaptureAllowed': 'unmuteMicrophoneDisabled', 15 'DefaultGeolocationSetting': 'shareLocationDisabled', 16 'DeviceBlockDevmode': 'debuggingFeaturesDisabled', 17 'DisableScreenshots': 'screenCaptureDisabled', 18 'ExternalStorageDisabled': 'usbFileTransferDisabled', 19 'VideoCaptureAllowed': 'cameraDisabled', 20} 21 22 23class Policy_Manager(object): 24 25 def __init__(self, username=None, fake_dm_server=None): 26 """ 27 This class is to hanlde: 28 Setting policies to a Fake DM server 29 Obtaining policies from a DUT 30 Obtaining clouddpc policy settings. 31 Obtinaing both set/received policies in many different data formats 32 Verifying policies provided to the DUT are correct. 33 34 It has been designed so all the features are independent, meaning a 35 fake_dm_server is not required to obtain policies from the DUT, get 36 clouddpc values, etc. 37 38 @params username: username to be used when creating the fake DM policy. 39 @params fake_dm_server: The fake DM server object. 40 41 """ 42 self._configured = AllPolicies(True) 43 self._obtained = AllPolicies() 44 self.CHROME = 'chrome' 45 self.LOCAL = 'local' 46 self.EXT = 'ext' 47 self.username = username 48 self.fake_dm_server = fake_dm_server 49 self.autotest_ext = None 50 51 # If a fake_dm_sever is provided, enabled auto_update by default. 52 # Can be turned off if desired. 53 self._auto_updateDM = bool(fake_dm_server) 54 55 def configure_policies(self, 56 user={}, 57 suggested_user={}, 58 device={}, 59 extension={}, 60 new=True): 61 """ 62 Used to configure desired policies on the DUT. Will also save the 63 configured policies. 64 65 If a fake_dm_server was provided on class initialization, and 66 auto_updateDM is not False, this will also update the fake_dm_server. 67 68 @params user/suggested_user/device/extension: dict of the policies to 69 be set. extension must be provided with the extension id. e.g. 70 {'ExtensionID': {policy_dict}} 71 @param new: bool, if True clear all previously configured policies. 72 73 """ 74 if new: 75 self._configured = AllPolicies(True) 76 77 self._configured.set_policy('chrome', user, 'user') 78 self._configured.set_policy('chrome', suggested_user, 'suggested_user') 79 self._configured.set_policy('chrome', device, 'device') 80 81 self._configured.set_extension_policy(extension) 82 83 if self.auto_updateDM: 84 self.updateDMServer() 85 86 def configure_extension_visual_policy(self, 87 ext_policy={}, 88 new=True): 89 """ 90 Extensions are... different. The policy set for them often is not the 91 actual policy, but a pointer to where the policy data is. This makes 92 verifying the extension policy tricky. 93 94 To help with, this function will allow you to set the 'real' policy. 95 Important things to note: This is only useful for verifying the policy, 96 and getting the policy as a dictionary (which has a flag for which 97 style of extension policies you want to see. The DM server will not be 98 set via this. 99 100 @param ext_policy: dict, Extension policy must be provided with the 101 extension id: 102 {'ExtensionID': {policy_dict}}. 103 @param new: bool, if True will erase any previously stored VISUAL 104 extension data. 105 106 """ 107 if new: 108 self._configured.ext_values = {} 109 110 self._configured.set_extension_policy(ext_policy, True) 111 112 def remove_policy(self, 113 policy_name, 114 policy_type, 115 extID=None): 116 """ 117 Removes the policy from the configured policies. Useful when you want 118 clear a specific policy, but leave the other policies untouched. 119 If auto_updateDM is True (thus a fake_dm_server has been provided), the 120 dm server will be updated. 121 122 @param policy_name: The policy name 123 @param policy_type: The type of policy it is. Valid types: 124 "user", "device", "extension", "suggested_user". 125 @param extID: The extension ID, if removing an extension policy. 126 127 """ 128 if policy_type != 'extension': 129 self._removeChromePolicy(policy_name) 130 else: 131 self._removeExtensionPolicy(policy_name, extID) 132 133 if self.auto_updateDM: 134 self.updateDMServer() 135 136 def _removeChromePolicy(self, policy_name): 137 """ 138 Attempts to remove the specified extension policy from specified 139 extension. 140 141 @rasies error.TestError: If the policy is not in the configured 142 policies. 143 144 """ 145 try: 146 del self._configured.chrome[policy_name] 147 except KeyError: 148 raise error.TestError('Policy {} missing from chrome policies.' 149 .format(policy_name)) 150 151 def _removeExtensionPolicy(self, policy_name, extID): 152 """ 153 Attempts to remove the specified extension policy from specified 154 extension. 155 156 @raises error.TestError: if the policy_type is an 'extension', but the 157 extID is not provided, or the policy is not found in the extension. 158 159 """ 160 if not extID: 161 raise error.TestError( 162 'Cannot delete extension policy without extension ID') 163 try: 164 del self._configured.extension_configured_data[extID][policy_name] 165 except KeyError: 166 raise error.TestError( 167 'Policy {} missing from extension policies.' 168 .format(policy_name)) 169 170 def obtain_policies_from_device(self, autotest_ext=None): 171 """ 172 Calls the autotest private getAllEnterprisePolicies() API, and saves 173 the response. 174 175 @param autotest_ext: The autotest browser extension. 176 177 """ 178 if autotest_ext: 179 self.autotest_ext = autotest_ext 180 if not self.autotest_ext: 181 raise error.TestError('Cannot obtain policies without autotest_ext') 182 self.raw_data = enterprise_policy_utils.get_all_policies( 183 self.autotest_ext) 184 self._obtained.set_policy(self.CHROME, self.raw_data[CHROMEPOLICIES]) 185 self._obtained.set_policy(self.LOCAL, self.raw_data[DEVICELOCALACCOUNT]) 186 self._obtained.set_extension_policy(self.raw_data[EXTENSIONPOLICIES]) 187 188 def verify_policy(self, policyName, policy_value, extID=None): 189 """Verifies the configured policies are == to the policies obtained.""" 190 recieved_value = self.get_policy_value_from_DUT(policyName=policyName, 191 extID=extID, 192 refresh=True) 193 if not recieved_value == policy_value: 194 raise error.TestError( 195 'Policy {} value was not set correctly. \nExpected:\t {}' 196 '\nReceived: \t'.format(policyName, 197 policy_value, 198 recieved_value)) 199 logging.info('Policy verification successful') 200 201 def verify_policies(self): 202 """Verifies the configured policies are == to the policies obtained.""" 203 if not self._configured == self._obtained: 204 raise error.TestError( 205 'Configured policies did not match policies received from DUT.') 206 logging.info('Policy verification successful') 207 208 def get_policy_value_from_DUT(self, policyName, extID=None, refresh=False): 209 """ 210 Get the value of a specified policy from the DUT. If the policy is from 211 an extension, an extension ID (extID) must be provided. 212 213 @param policyName: str, the name of the policy. 214 @param extID: The ID of the extension. 215 @param refresh: bool, if you want to get the policies from the DUT again 216 Note: This does NOT force the DUT to re-obtain the policies from the 217 DM server. 218 219 @returns: The value of the policy if found, else None. 220 """ 221 if refresh: 222 # Uses the previously provided autotest Extension. 223 self.obtain_policies_from_device() 224 225 if extID: 226 if policyName in self._obtained.extension_configured_data[extID]: 227 return ( 228 self._obtained.extension_configured_data[extID][policyName] 229 .value) 230 elif policyName in self._obtained.chrome: 231 return self._obtained.chrome[policyName].value 232 return None 233 234 def updateDMServer(self): 235 """Updates the Fake DM server with the current configured policy.""" 236 fake_dm_json = self.getDMConfig() 237 logging.info('Policy blob {}'.format(fake_dm_json)) 238 if not self.fake_dm_server: 239 raise error.TestError( 240 'Cannot update DM server. DM server not provided') 241 self.fake_dm_server.setup_policy(fake_dm_json) 242 243 def getDMConfig(self, refresh=True): 244 """" 245 Creates the DM configuration (aka Json blob) to be used by the 246 fake DM server 247 248 @param refresh: bool, if True, will clear any previous configuration. 249 If False, return the currently configured DM blob. 250 251 """ 252 if refresh: 253 self._configured.createNewFakeDMServerJson() 254 self._configured.updateDMJson() 255 self._configured._DMJSON['policy_user'] = self.username 256 return json.dumps(self._configured._DMJSON) 257 258 def getCloudDpc(self): 259 """Gets the Cloud DPC ARC policy settings.""" 260 expected_cloud_dpc_settings = {} 261 if self._arc_certs(): 262 self._add_arc_certs(expected_cloud_dpc_settings) 263 self._add_shared_policies(expected_cloud_dpc_settings) 264 self._add_shared_arc_policy(expected_cloud_dpc_settings) 265 266 return expected_cloud_dpc_settings 267 268 def _arc_certs(self): 269 """ 270 Returns True if ArcCertificatesSyncMode is set in the configured 271 policies and the bool(value) is True, else False. 272 273 """ 274 if ('ArcCertificatesSyncMode' in self._configured.chrome and 275 self._configured.chrome['ArcCertificatesSyncMode'].value): 276 return True 277 return False 278 279 def _add_shared_arc_policy(self, dpc): 280 """Adds the shared policies that are subset within the 'ArcPolicy'.""" 281 Arc_Policy = self._configured.chrome.get('ArcPolicy', {}) 282 if Arc_Policy: 283 Arc_Policy = Arc_Policy.value 284 285 for key in ['applications', 'accountTypesWithManagementDisabled']: 286 if key in Arc_Policy: 287 dpc[key] = Arc_Policy[key] 288 289 def _add_shared_policies(self, dpc): 290 """ 291 Add all of the configured policies that are shared with arc clouddpc, 292 to the "dpc" dict, with the clouddpc key. If the policy is not set, it 293 will not be added. 294 295 """ 296 for policy_name, dpc_name in CHROMEOS_CLOUDDPC.items(): 297 if policy_name in self._configured.chrome: 298 dpc[dpc_name] = self._configured.chrome[policy_name].value 299 300 def _add_arc_certs(self, dpc): 301 open_network_config = 'OpenNetworkConfiguration' 302 if open_network_config in self._configured.chrome: 303 dpc['caCerts'] = self._configured.chrome[open_network_config].value 304 else: 305 dpc['caCerts'] = None 306 307 def get_configured_policies_as_dict(self, visual=False): 308 """ Returns the configured policies as a dict.""" 309 return self._configured.get_policy_as_dict(visual) 310 311 def get_obtained_policies_as_dict(self): 312 """ Returns the obtained policies as a dict.""" 313 return self._obtained.get_policy_as_dict(visual=True) 314 315 @property 316 def auto_updateDM(self): 317 """ Returns the current state of the auto_updateDM setting.""" 318 return self._auto_updateDM 319 320 @auto_updateDM.setter 321 def auto_updateDM(self, value): 322 """Turns on/off auto updating of the DM server.""" 323 if not isinstance(value, bool): 324 raise error.TestError('Auto Update DM must be bool, got {}' 325 .format(value)) 326 if value and not self.fake_dm_server: 327 raise error.TestError( 328 'Cannot autoupdate without the Fake DM server configured.') 329 self._auto_updateDM = value 330