1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16"""Python module for Spirent GSS6450 GNSS RPS.""" 17 18import datetime 19import numbers 20from acts.controllers import abstract_inst 21 22 23class GSS6450Error(abstract_inst.SocketInstrumentError): 24 """GSS6450 Instrument Error Class.""" 25 26 27class GSS6450(abstract_inst.RequestInstrument): 28 """GSS6450 Class, inherted from abstract_inst RequestInstrument.""" 29 30 def __init__(self, ip_addr): 31 """Init method for GSS6450. 32 33 Args: 34 ip_addr: IP Address. 35 Type, str. 36 """ 37 super(GSS6450, self).__init__(ip_addr) 38 39 self.idn = 'Spirent-GSS6450' 40 41 def _put(self, cmd): 42 """Send put command via GSS6450 HTTP Request and get response. 43 44 Args: 45 cmd: parameters listed in SHM_PUT. 46 Type, Str. 47 48 Returns: 49 resp: Response from the _query method. 50 Type, Str. 51 """ 52 put_cmd = 'shm_put.shtml?' + cmd 53 resp = self._query(put_cmd) 54 55 return resp 56 57 def _get(self, cmd): 58 """Send get command via GSS6450 HTTP Request and get response. 59 60 Args: 61 cmd: parameters listed in SHM_GET. 62 Type, Str. 63 64 Returns: 65 resp: Response from the _query method. 66 Type, Str. 67 """ 68 get_cmd = 'shm_get.shtml?' + cmd 69 resp = self._query(get_cmd) 70 71 return resp 72 73 def get_scenario_filename(self): 74 """Get the scenario filename of GSS6450. 75 76 Returns: 77 filename: RPS Scenario file name. 78 Type, Str. 79 """ 80 resp_raw = self._get('-f') 81 filename = resp_raw.split(':')[-1].strip(' ') 82 self._logger.debug('Got scenario file name: "%s".', filename) 83 84 return filename 85 86 def get_scenario_description(self): 87 """Get the scenario description of GSS6450. 88 89 Returns: 90 description: RPS Scenario description. 91 Type, Str. 92 """ 93 resp_raw = self._get('-d') 94 description = resp_raw.split('-d')[-1].strip(' ') 95 96 if description: 97 self._logger.debug('Got scenario description: "%s".', description) 98 else: 99 self._logger.warning('Got scenario description with empty string.') 100 101 return description 102 103 def get_scenario_location(self): 104 """Get the scenario location of GSS6450. 105 106 Returns: 107 location: RPS Scenario location. 108 Type, Str. 109 """ 110 resp_raw = self._get('-i') 111 location = resp_raw.split('-i')[-1].strip(' ') 112 113 if location: 114 self._logger.debug('Got scenario location: "%s".', location) 115 else: 116 self._logger.warning('Got scenario location with empty string.') 117 118 return location 119 120 def get_operation_mode(self): 121 """Get the operation mode of GSS6450. 122 123 Returns: 124 mode: RPS Operation Mode. 125 Type, Str. 126 Option, STOPPED/PLAYING/RECORDING 127 """ 128 resp_raw = self._get('-m') 129 mode = resp_raw.split('-m')[-1].strip(' ') 130 self._logger.debug('Got operation mode: "%s".', mode) 131 132 return mode 133 134 def get_battery_level(self): 135 """Get the battery level of GSS6450. 136 137 Returns: 138 batterylevel: RPS Battery Level. 139 Type, float. 140 """ 141 resp_raw = self._get('-l') 142 batterylevel = float(resp_raw.split('-l')[-1].strip(' ')) 143 self._logger.debug('Got battery level: %s%%.', batterylevel) 144 145 return batterylevel 146 147 def get_rfport_voltage(self): 148 """Get the RF port voltage of GSS6450. 149 150 Returns: 151 voltageout: RPS RF port voltage. 152 Type, str 153 """ 154 resp_raw = self._get('-v') 155 voltageout = resp_raw.split('-v')[-1].strip(' ') 156 self._logger.debug('Got RF port voltage: "%s".', voltageout) 157 158 return voltageout 159 160 def get_storage_media(self): 161 """Get the storage media of GSS6450. 162 163 Returns: 164 media: RPS storage. 165 Type, str 166 167 Raises: 168 GSS6450Error: raise when request response is not support. 169 """ 170 resp_raw = self._get('-M') 171 resp_num = resp_raw.split('-M')[-1].strip(' ') 172 173 if resp_num == '1': 174 media = '1-INTERNAL' 175 elif resp_num == '2': 176 media = '2-REMOVABLE' 177 else: 178 errmsg = ('"{}" is not recognized as GSS6450 valid storage media' 179 ' type'.format(resp_num)) 180 raise GSS6450Error(error=errmsg, command='get_storage_media') 181 182 self._logger.debug('Got current storage media: %s.', media) 183 184 return media 185 186 def get_attenuation(self): 187 """Get the attenuation of GSS6450. 188 189 Returns: 190 attenuation: RPS attenuation level, in dB. 191 Type, list of float. 192 """ 193 resp_raw = self._get('-a') 194 resp_str = resp_raw.split('-a')[-1].strip(' ') 195 self._logger.debug('Got attenuation: %s dB.', resp_str) 196 attenuation = [float(itm) for itm in resp_str.split(',')] 197 198 return attenuation 199 200 def get_elapsed_time(self): 201 """Get the running scenario elapsed time of GSS6450. 202 203 Returns: 204 etime: RPS elapsed time. 205 Type, datetime.timedelta. 206 """ 207 resp_raw = self._get('-e') 208 resp_str = resp_raw.split('-e')[-1].strip(' ') 209 self._logger.debug('Got senario elapsed time: "%s".', resp_str) 210 etime_tmp = datetime.datetime.strptime(resp_str, '%H:%M:%S') 211 etime = datetime.timedelta(hours=etime_tmp.hour, 212 minutes=etime_tmp.minute, 213 seconds=etime_tmp.second) 214 215 return etime 216 217 def get_playback_offset(self): 218 """Get the running scenario playback offset of GSS6450. 219 220 Returns: 221 offset: RPS playback offset. 222 Type, datetime.timedelta. 223 """ 224 resp_raw = self._get('-o') 225 offset_tmp = float(resp_raw.split('-o')[-1].strip(' ')) 226 self._logger.debug('Got senario playback offset: %s sec.', offset_tmp) 227 offset = datetime.timedelta(seconds=offset_tmp) 228 229 return offset 230 231 def play_scenario(self, scenario=''): 232 """Start to play scenario in GSS6450. 233 234 Args: 235 scenario: Scenario to play. 236 Type, str. 237 Default, '', which will run current selected one. 238 """ 239 if scenario: 240 cmd = '-f{},-wP'.format(scenario) 241 else: 242 cmd = '-wP' 243 244 _ = self._put(cmd) 245 246 if scenario: 247 infmsg = 'Started playing scenario: "{}".'.format(scenario) 248 else: 249 infmsg = 'Started playing current scenario.' 250 251 self._logger.debug(infmsg) 252 253 def record_scenario(self, scenario=''): 254 """Start to record scenario in GSS6450. 255 256 Args: 257 scenario: Scenario to record. 258 Type, str. 259 Default, '', which will run current selected one. 260 """ 261 if scenario: 262 cmd = '-f{},-wR'.format(scenario) 263 else: 264 cmd = '-wR' 265 266 _ = self._put(cmd) 267 268 if scenario: 269 infmsg = 'Started recording scenario: "{}".'.format(scenario) 270 else: 271 infmsg = 'Started recording scenario.' 272 273 self._logger.debug(infmsg) 274 275 def stop_scenario(self): 276 """Start to stop playing/recording scenario in GSS6450.""" 277 _ = self._put('-wS') 278 279 self._logger.debug('Stopped playing/recording scanrio.') 280 281 def set_rfport_voltage(self, voltageout): 282 """Set the RF port voltage of GSS6450. 283 284 Args: 285 voltageout: RPS RF port voltage. 286 Type, str 287 288 Raises: 289 GSS6450Error: raise when voltageout input is not valid. 290 """ 291 if voltageout == 'OFF': 292 voltage_cmd = '0' 293 elif voltageout == '3.3V': 294 voltage_cmd = '3' 295 elif voltageout == '5V': 296 voltage_cmd = '5' 297 else: 298 errmsg = ('"{}" is not recognized as GSS6450 valid RF port voltage' 299 ' type'.format(voltageout)) 300 raise GSS6450Error(error=errmsg, command='set_rfport_voltage') 301 302 _ = self._put('-v{},-wV'.format(voltage_cmd)) 303 self._logger.debug('Set RF port voltage: "%s".', voltageout) 304 305 def set_attenuation(self, attenuation): 306 """Set the attenuation of GSS6450. 307 308 Args: 309 attenuation: RPS attenuation level, in dB. 310 Type, numerical. 311 312 Raises: 313 GSS6450Error: raise when attenuation is not in range. 314 """ 315 if not 0 <= attenuation <= 31: 316 errmsg = ('"attenuation" must be within [0, 31], ' 317 'current input is {}').format(str(attenuation)) 318 raise GSS6450Error(error=errmsg, command='set_attenuation') 319 320 attenuation_raw = round(attenuation) 321 322 if attenuation_raw != attenuation: 323 warningmsg = ('"attenuation" must be integer, current input ' 324 'will be rounded to {}'.format(attenuation_raw)) 325 self._logger.warning(warningmsg) 326 327 _ = self._put('-a{},-wA'.format(attenuation_raw)) 328 329 self._logger.debug('Set attenuation: %s dB.', attenuation_raw) 330 331 def set_playback_offset(self, offset): 332 """Set the playback offset of GSS6450. 333 334 Args: 335 offset: RPS playback offset. 336 Type, datetime.timedelta, or numerical. 337 338 Raises: 339 GSS6450Error: raise when offset is not numeric or timedelta. 340 """ 341 if isinstance(offset, datetime.timedelta): 342 offset_raw = offset.total_seconds() 343 elif isinstance(offset, numbers.Number): 344 offset_raw = offset 345 else: 346 raise GSS6450Error(error=('"offset" must be numerical value or ' 347 'datetime.timedelta'), 348 command='set_playback_offset') 349 350 _ = self._put('-o{}'.format(offset_raw)) 351 352 self._logger.debug('Set playback offset: %s sec.', offset_raw) 353 354 def set_storage_media(self, media): 355 """Set the storage media of GSS6450. 356 357 Args: 358 media: RPS storage Media, Internal or External. 359 Type, str. Option, 'internal', 'removable' 360 361 Raises: 362 GSS6450Error: raise when media option is not support. 363 """ 364 if media == 'internal': 365 raw_media = '1' 366 elif media == 'removable': 367 raw_media = '2' 368 else: 369 raise GSS6450Error( 370 error=('"media" input must be in ["internal", "removable"]. ' 371 ' Current input is {}'.format(media)), 372 command='set_storage_media') 373 374 _ = self._put('-M{}-wM'.format(raw_media)) 375 376 resp_raw = self.get_storage_media() 377 if raw_media != resp_raw[0]: 378 raise GSS6450Error( 379 error=('Setting media "{}" is not the same as queried media ' 380 '"{}".'.format(media, resp_raw)), 381 command='set_storage_media') 382