• Home
  • History
  • Annotate
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #!/usr/bin/python
2 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
5 
6 import cellular_logging
7 import cellular_system_error
8 
9 log = cellular_logging.SetupCellularLogging('scpi_driver')
10 
11 
12 class _ErrorCheckerContext(object):
13     """Reference-count our error-checking state and only check for
14     errors when we take the first ref or drop the last ref.
15 
16     This way, we can minimize the number of checks; each one takes a
17     bit of time.  You will likely want to set always_check to True when
18     debugging new SCPI interactions.
19 
20     On first entry, we check for errors, but do not stop if we find
21     them; these are errors that were accumulated on the device before
22     this test ran.
23     """
24 
25     def __init__(self, scpi):
26         self.always_check = True  # True for serious debugging
27         self.scpi = scpi
28         self.depth = 0
29         self.raise_on_error = True
30 
31     def __enter__(self):
32         log.debug('ErrorCheckerContext Depth: %s' % self.depth)
33         if self.depth == 0 or self.always_check:
34             errors = self.scpi._WaitAndFetchErrors(
35                 raise_on_error=False)  # Never raise when clearing old errors
36         self.depth += 1
37         return self
38 
39     def __exit__(self, type, value, traceback):
40         self.depth -= 1
41         if self.depth <= 0 or self.always_check:
42             self.scpi._WaitAndFetchErrors()
43         return
44 
45 
46 class Scpi(object):
47     """Wrapper for SCPI.
48 
49     SCPI = "standard commands for programmable instruments",
50     a relative of GPIB.
51 
52     The SCPI driver must export:  Query, Send, Reset and Close
53     """
54 
55     def __init__(self, driver, opc_on_stanza=False):
56         self.driver = driver
57         self.opc_on_stanza = opc_on_stanza
58         self.checker_context = _ErrorCheckerContext(self)
59 
60     def Query(self, command):
61         """Send the SCPI command and return the response."""
62         response = self.driver.Query(command)
63         return response
64 
65     def Send(self, command):
66         """Send the SCPI command."""
67         self.driver.Send(command)
68 
69     def Reset(self):
70         """Tell the device to reset with *RST."""
71         # Some devices (like the prologix) require special handling for
72         # reset.
73         self.driver.Reset()
74 
75     def Close(self):
76         """Close the device."""
77         self.driver.Close()
78 
79     def RetrieveErrors(self):
80         """Retrieves all SYSTem:ERRor messages from the device."""
81         errors = []
82         while True:
83             error = self.Query('SYSTem:ERRor?')
84             if '+0,"No error"' in error:
85                 # We've reached the end of the error stack
86                 break
87 
88             if '-420' in error and 'Query UNTERMINATED' in error:
89                 # This is benign; the GPIB bridge asked for a response when
90                 # the device didn't have one to give.
91 
92                 # TODO(rochberg): This is a layering violation; we should
93                 # really only accept -420 if the underlying driver is in a
94                 # mode that is known to cause this
95                 continue
96 
97             if '+292' in error and 'Data arrived on unknown SAPI' in error:
98                 # This may be benign; It is known to occur when we do a switch
99                 # from GPRS to WCDMA
100                 continue
101 
102             errors.append(error)
103 
104         self.Send('*CLS')           # Clear status
105         errors.reverse()
106         return errors
107 
108     def _WaitAndFetchErrors(self, raise_on_error=True):
109         """Waits for command completion, returns errors."""
110         self.Query('*OPC?')      # Wait for operation complete
111         errors = self.RetrieveErrors()
112         if errors and raise_on_error:
113             raise cellular_system_error.BadScpiCommand('\n'.join(errors))
114         return errors
115 
116     def SimpleVerify(self, command, arg):
117         """Sends "command arg", then "command?", expecting arg back.
118 
119         Arguments:
120           command: SCPI command
121           arg: Argument.  We currently check for exact equality: you should
122             send strings quoted with " because that's what the 8960 returns.
123             We also fail if you send 1 and receive +1 back.
124 
125         Raises:
126           Error:  Verification failed
127         """
128         self.always_check = False
129         with self.checker_context:
130             self.Send('%s %s' % (command, arg))
131             result = self.Query('%s?' % (command,))
132             if result != arg:
133                 raise cellular_system_error.BadScpiCommand(
134                     'Error on %s: sent %s, got %s' % (command, arg, result))
135 
136     def SendStanza(self, commands):
137         """
138         Sends a list of commands and verifies that they complete correctly.
139         """
140         with self.checker_context:
141             for c in commands:
142                 if self.opc_on_stanza:
143                     self.Send(c)
144                     self.Query('*OPC?')
145                 else:
146                     self.Send(c)
147