1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#           http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16"""Python module for Spirent GSS6450 GNSS RPS."""
17
18import datetime
19import numbers
20from acts.controllers import abstract_inst
21
22
23class GSS6450Error(abstract_inst.SocketInstrumentError):
24    """GSS6450 Instrument Error Class."""
25
26
27class GSS6450(abstract_inst.RequestInstrument):
28    """GSS6450 Class, inherted from abstract_inst RequestInstrument."""
29
30    def __init__(self, ip_addr):
31        """Init method for GSS6450.
32
33        Args:
34            ip_addr: IP Address.
35                Type, str.
36        """
37        super(GSS6450, self).__init__(ip_addr)
38
39        self.idn = 'Spirent-GSS6450'
40
41    def _put(self, cmd):
42        """Send put command via GSS6450 HTTP Request and get response.
43
44        Args:
45            cmd: parameters listed in SHM_PUT.
46                Type, Str.
47
48        Returns:
49            resp: Response from the _query method.
50                Type, Str.
51        """
52        put_cmd = 'shm_put.shtml?' + cmd
53        resp = self._query(put_cmd)
54
55        return resp
56
57    def _get(self, cmd):
58        """Send get command via GSS6450 HTTP Request and get response.
59
60        Args:
61            cmd: parameters listed in SHM_GET.
62                Type, Str.
63
64        Returns:
65          resp: Response from the _query method.
66              Type, Str.
67        """
68        get_cmd = 'shm_get.shtml?' + cmd
69        resp = self._query(get_cmd)
70
71        return resp
72
73    def get_scenario_filename(self):
74        """Get the scenario filename of GSS6450.
75
76        Returns:
77            filename: RPS Scenario file name.
78                Type, Str.
79        """
80        resp_raw = self._get('-f')
81        filename = resp_raw.split(':')[-1].strip(' ')
82        self._logger.debug('Got scenario file name: "%s".', filename)
83
84        return filename
85
86    def get_scenario_description(self):
87        """Get the scenario description of GSS6450.
88
89        Returns:
90            description: RPS Scenario description.
91                Type, Str.
92        """
93        resp_raw = self._get('-d')
94        description = resp_raw.split('-d')[-1].strip(' ')
95
96        if description:
97            self._logger.debug('Got scenario description: "%s".', description)
98        else:
99            self._logger.warning('Got scenario description with empty string.')
100
101        return description
102
103    def get_scenario_location(self):
104        """Get the scenario location of GSS6450.
105
106        Returns:
107            location: RPS Scenario location.
108                Type, Str.
109        """
110        resp_raw = self._get('-i')
111        location = resp_raw.split('-i')[-1].strip(' ')
112
113        if location:
114            self._logger.debug('Got scenario location: "%s".', location)
115        else:
116            self._logger.warning('Got scenario location with empty string.')
117
118        return location
119
120    def get_operation_mode(self):
121        """Get the operation mode of GSS6450.
122
123        Returns:
124            mode: RPS Operation Mode.
125                Type, Str.
126                Option, STOPPED/PLAYING/RECORDING
127        """
128        resp_raw = self._get('-m')
129        mode = resp_raw.split('-m')[-1].strip(' ')
130        self._logger.debug('Got operation mode: "%s".', mode)
131
132        return mode
133
134    def get_battery_level(self):
135        """Get the battery level of GSS6450.
136
137        Returns:
138            batterylevel: RPS Battery Level.
139                Type, float.
140        """
141        resp_raw = self._get('-l')
142        batterylevel = float(resp_raw.split('-l')[-1].strip(' '))
143        self._logger.debug('Got battery level: %s%%.', batterylevel)
144
145        return batterylevel
146
147    def get_rfport_voltage(self):
148        """Get the RF port voltage of GSS6450.
149
150        Returns:
151            voltageout: RPS RF port voltage.
152                Type, str
153        """
154        resp_raw = self._get('-v')
155        voltageout = resp_raw.split('-v')[-1].strip(' ')
156        self._logger.debug('Got RF port voltage: "%s".', voltageout)
157
158        return voltageout
159
160    def get_storage_media(self):
161        """Get the storage media of GSS6450.
162
163        Returns:
164            media: RPS storage.
165                Type, str
166
167        Raises:
168            GSS6450Error: raise when request response is not support.
169        """
170        resp_raw = self._get('-M')
171        resp_num = resp_raw.split('-M')[-1].strip(' ')
172
173        if resp_num == '1':
174            media = '1-INTERNAL'
175        elif resp_num == '2':
176            media = '2-REMOVABLE'
177        else:
178            errmsg = ('"{}" is not recognized as GSS6450 valid storage media'
179                      ' type'.format(resp_num))
180            raise GSS6450Error(error=errmsg, command='get_storage_media')
181
182        self._logger.debug('Got current storage media: %s.', media)
183
184        return media
185
186    def get_attenuation(self):
187        """Get the attenuation of GSS6450.
188
189        Returns:
190            attenuation: RPS attenuation level, in dB.
191                Type, list of float.
192        """
193        resp_raw = self._get('-a')
194        resp_str = resp_raw.split('-a')[-1].strip(' ')
195        self._logger.debug('Got attenuation: %s dB.', resp_str)
196        attenuation = [float(itm) for itm in resp_str.split(',')]
197
198        return attenuation
199
200    def get_elapsed_time(self):
201        """Get the running scenario elapsed time of GSS6450.
202
203        Returns:
204            etime: RPS elapsed time.
205                Type, datetime.timedelta.
206        """
207        resp_raw = self._get('-e')
208        resp_str = resp_raw.split('-e')[-1].strip(' ')
209        self._logger.debug('Got senario elapsed time: "%s".', resp_str)
210        etime_tmp = datetime.datetime.strptime(resp_str, '%H:%M:%S')
211        etime = datetime.timedelta(hours=etime_tmp.hour,
212                                   minutes=etime_tmp.minute,
213                                   seconds=etime_tmp.second)
214
215        return etime
216
217    def get_playback_offset(self):
218        """Get the running scenario playback offset of GSS6450.
219
220        Returns:
221            offset: RPS playback offset.
222                Type, datetime.timedelta.
223        """
224        resp_raw = self._get('-o')
225        offset_tmp = float(resp_raw.split('-o')[-1].strip(' '))
226        self._logger.debug('Got senario playback offset: %s sec.', offset_tmp)
227        offset = datetime.timedelta(seconds=offset_tmp)
228
229        return offset
230
231    def play_scenario(self, scenario=''):
232        """Start to play scenario in GSS6450.
233
234        Args:
235            scenario: Scenario to play.
236                Type, str.
237                Default, '', which will run current selected one.
238        """
239        if scenario:
240            cmd = '-f{},-wP'.format(scenario)
241        else:
242            cmd = '-wP'
243
244        _ = self._put(cmd)
245
246        if scenario:
247            infmsg = 'Started playing scenario: "{}".'.format(scenario)
248        else:
249            infmsg = 'Started playing current scenario.'
250
251        self._logger.debug(infmsg)
252
253    def record_scenario(self, scenario=''):
254        """Start to record scenario in GSS6450.
255
256        Args:
257            scenario: Scenario to record.
258                Type, str.
259                Default, '', which will run current selected one.
260        """
261        if scenario:
262            cmd = '-f{},-wR'.format(scenario)
263        else:
264            cmd = '-wR'
265
266        _ = self._put(cmd)
267
268        if scenario:
269            infmsg = 'Started recording scenario: "{}".'.format(scenario)
270        else:
271            infmsg = 'Started recording scenario.'
272
273        self._logger.debug(infmsg)
274
275    def stop_scenario(self):
276        """Start to stop playing/recording scenario in GSS6450."""
277        _ = self._put('-wS')
278
279        self._logger.debug('Stopped playing/recording scanrio.')
280
281    def set_rfport_voltage(self, voltageout):
282        """Set the RF port voltage of GSS6450.
283
284        Args:
285            voltageout: RPS RF port voltage.
286                Type, str
287
288        Raises:
289            GSS6450Error: raise when voltageout input is not valid.
290        """
291        if voltageout == 'OFF':
292            voltage_cmd = '0'
293        elif voltageout == '3.3V':
294            voltage_cmd = '3'
295        elif voltageout == '5V':
296            voltage_cmd = '5'
297        else:
298            errmsg = ('"{}" is not recognized as GSS6450 valid RF port voltage'
299                      ' type'.format(voltageout))
300            raise GSS6450Error(error=errmsg, command='set_rfport_voltage')
301
302        _ = self._put('-v{},-wV'.format(voltage_cmd))
303        self._logger.debug('Set RF port voltage: "%s".', voltageout)
304
305    def set_attenuation(self, attenuation):
306        """Set the attenuation of GSS6450.
307
308        Args:
309            attenuation: RPS attenuation level, in dB.
310                Type, numerical.
311
312        Raises:
313            GSS6450Error: raise when attenuation is not in range.
314        """
315        if not 0 <= attenuation <= 31:
316            errmsg = ('"attenuation" must be within [0, 31], '
317                      'current input is {}').format(str(attenuation))
318            raise GSS6450Error(error=errmsg, command='set_attenuation')
319
320        attenuation_raw = round(attenuation)
321
322        if attenuation_raw != attenuation:
323            warningmsg = ('"attenuation" must be integer, current input '
324                          'will be rounded to {}'.format(attenuation_raw))
325            self._logger.warning(warningmsg)
326
327        _ = self._put('-a{},-wA'.format(attenuation_raw))
328
329        self._logger.debug('Set attenuation: %s dB.', attenuation_raw)
330
331    def set_playback_offset(self, offset):
332        """Set the playback offset of GSS6450.
333
334        Args:
335            offset: RPS playback offset.
336                Type, datetime.timedelta, or numerical.
337
338        Raises:
339            GSS6450Error: raise when offset is not numeric or timedelta.
340        """
341        if isinstance(offset, datetime.timedelta):
342            offset_raw = offset.total_seconds()
343        elif isinstance(offset, numbers.Number):
344            offset_raw = offset
345        else:
346            raise GSS6450Error(error=('"offset" must be numerical value or '
347                                      'datetime.timedelta'),
348                               command='set_playback_offset')
349
350        _ = self._put('-o{}'.format(offset_raw))
351
352        self._logger.debug('Set playback offset: %s sec.', offset_raw)
353
354    def set_storage_media(self, media):
355        """Set the storage media of GSS6450.
356
357        Args:
358            media: RPS storage Media, Internal or External.
359                Type, str. Option, 'internal', 'removable'
360
361        Raises:
362            GSS6450Error: raise when media option is not support.
363        """
364        if media == 'internal':
365            raw_media = '1'
366        elif media == 'removable':
367            raw_media = '2'
368        else:
369            raise GSS6450Error(
370                error=('"media" input must be in ["internal", "removable"]. '
371                       ' Current input is {}'.format(media)),
372                command='set_storage_media')
373
374        _ = self._put('-M{}-wM'.format(raw_media))
375
376        resp_raw = self.get_storage_media()
377        if raw_media != resp_raw[0]:
378            raise GSS6450Error(
379                error=('Setting media "{}" is not the same as queried media '
380                       '"{}".'.format(media, resp_raw)),
381                command='set_storage_media')
382