#!/usr/bin/env python # Copyright (C) 2014 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Interface for a USB-connected Monsoon power meter (http://msoon.com/LabEquipment/PowerMonitor/). This file requires gflags, which requires setuptools. To install setuptools: sudo apt-get install python-setuptools To install gflags, see http://code.google.com/p/python-gflags/ To install pyserial, see http://pyserial.sourceforge.net/ Example usages: Set the voltage of the device 7536 to 4.0V python monsoon.py --voltage=4.0 --serialno 7536 Get 5000hz data from device number 7536, with unlimited number of samples python monsoon.py --samples -1 --hz 5000 --serialno 7536 Get 200Hz data for 5 seconds (1000 events) from default device python monsoon.py --samples 100 --hz 200 Get unlimited 200Hz data from device attached at /dev/ttyACM0 python monsoon.py --samples -1 --hz 200 --device /dev/ttyACM0 Output columns for collection with --samples, separated by space: TIMESTAMP OUTPUT OUTPUT_AVG USB USB_AVG | | | | | | | ` (if --includeusb and --avg) | | ` (if --includeusb) | ` (if --avg) ` (if --timestamp) """ import fcntl import os import select import signal import stat import struct import sys import time import collections import gflags as flags # http://code.google.com/p/python-gflags/ import serial # http://pyserial.sourceforge.net/ FLAGS = flags.FLAGS class Monsoon: """ Provides a simple class to use the power meter, e.g. mon = monsoon.Monsoon() mon.SetVoltage(3.7) mon.StartDataCollection() mydata = [] while len(mydata) < 1000: mydata.extend(mon.CollectData()) mon.StopDataCollection() """ def __init__(self, device=None, serialno=None, wait=1): """ Establish a connection to a Monsoon. By default, opens the first available port, waiting if none are ready. A particular port can be specified with "device", or a particular Monsoon can be specified with "serialno" (using the number printed on its back). With wait=0, IOError is thrown if a device is not immediately available. """ self._coarse_ref = self._fine_ref = self._coarse_zero = self._fine_zero = 0 self._coarse_scale = self._fine_scale = 0 self._last_seq = 0 self.start_voltage = 0 if device: self.ser = serial.Serial(device, timeout=1) return while True: # try all /dev/ttyACM* until we find one we can use for dev in os.listdir("/dev"): if not dev.startswith("ttyACM"): continue tmpname = "/tmp/monsoon.%s.%s" % (os.uname()[0], dev) self._tempfile = open(tmpname, "w") try: os.chmod(tmpname, 0666) except OSError: pass try: # use a lockfile to ensure exclusive access fcntl.lockf(self._tempfile, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError as e: print >>sys.stderr, "device %s is in use" % dev continue try: # try to open the device self.ser = serial.Serial("/dev/%s" % dev, timeout=1) self.StopDataCollection() # just in case self._FlushInput() # discard stale input status = self.GetStatus() except Exception as e: print >>sys.stderr, "error opening device %s: %s" % (dev, e) continue if not status: print >>sys.stderr, "no response from device %s" % dev elif serialno and status["serialNumber"] != serialno: print >>sys.stderr, ("Note: another device serial #%d seen on %s" % (status["serialNumber"], dev)) else: self.start_voltage = status["voltage1"] return self._tempfile = None if not wait: raise IOError("No device found") print >>sys.stderr, "waiting for device..." time.sleep(1) def GetStatus(self): """ Requests and waits for status. Returns status dictionary. """ # status packet format STATUS_FORMAT = ">BBBhhhHhhhHBBBxBbHBHHHHBbbHHBBBbbbbbbbbbBH" STATUS_FIELDS = [ "packetType", "firmwareVersion", "protocolVersion", "mainFineCurrent", "usbFineCurrent", "auxFineCurrent", "voltage1", "mainCoarseCurrent", "usbCoarseCurrent", "auxCoarseCurrent", "voltage2", "outputVoltageSetting", "temperature", "status", "leds", "mainFineResistor", "serialNumber", "sampleRate", "dacCalLow", "dacCalHigh", "powerUpCurrentLimit", "runTimeCurrentLimit", "powerUpTime", "usbFineResistor", "auxFineResistor", "initialUsbVoltage", "initialAuxVoltage", "hardwareRevision", "temperatureLimit", "usbPassthroughMode", "mainCoarseResistor", "usbCoarseResistor", "auxCoarseResistor", "defMainFineResistor", "defUsbFineResistor", "defAuxFineResistor", "defMainCoarseResistor", "defUsbCoarseResistor", "defAuxCoarseResistor", "eventCode", "eventData", ] self._SendStruct("BBB", 0x01, 0x00, 0x00) while True: # Keep reading, discarding non-status packets bytes = self._ReadPacket() if not bytes: return None if len(bytes) != struct.calcsize(STATUS_FORMAT) or bytes[0] != "\x10": print >>sys.stderr, "wanted status, dropped type=0x%02x, len=%d" % ( ord(bytes[0]), len(bytes)) continue status = dict(zip(STATUS_FIELDS, struct.unpack(STATUS_FORMAT, bytes))) assert status["packetType"] == 0x10 for k in status.keys(): if k.endswith("VoltageSetting"): status[k] = 2.0 + status[k] * 0.01 elif k.endswith("FineCurrent"): pass # needs calibration data elif k.endswith("CoarseCurrent"): pass # needs calibration data elif k.startswith("voltage") or k.endswith("Voltage"): status[k] = status[k] * 0.000125 elif k.endswith("Resistor"): status[k] = 0.05 + status[k] * 0.0001 if k.startswith("aux") or k.startswith("defAux"): status[k] += 0.05 elif k.endswith("CurrentLimit"): status[k] = 8 * (1023 - status[k]) / 1023.0 return status def RampVoltage(self, start, end): v = start if v < 3.0: v = 3.0 # protocol doesn't support lower than this while (v < end): self.SetVoltage(v) v += .1 time.sleep(.1) self.SetVoltage(end) def SetVoltage(self, v): """ Set the output voltage, 0 to disable. """ if v == 0: self._SendStruct("BBB", 0x01, 0x01, 0x00) else: self._SendStruct("BBB", 0x01, 0x01, int((v - 2.0) * 100)) def SetMaxCurrent(self, i): """Set the max output current.""" assert i >= 0 and i <= 8 val = 1023 - int((i/8)*1023) self._SendStruct("BBB", 0x01, 0x0a, val & 0xff) self._SendStruct("BBB", 0x01, 0x0b, val >> 8) def SetUsbPassthrough(self, val): """ Set the USB passthrough mode: 0 = off, 1 = on, 2 = auto. """ self._SendStruct("BBB", 0x01, 0x10, val) def StartDataCollection(self): """ Tell the device to start collecting and sending measurement data. """ self._SendStruct("BBB", 0x01, 0x1b, 0x01) # Mystery command self._SendStruct("BBBBBBB", 0x02, 0xff, 0xff, 0xff, 0xff, 0x03, 0xe8) def StopDataCollection(self): """ Tell the device to stop collecting measurement data. """ self._SendStruct("BB", 0x03, 0x00) # stop def CollectData(self): """ Return some current samples. Call StartDataCollection() first. """ while True: # loop until we get data or a timeout bytes = self._ReadPacket() if not bytes: return None if len(bytes) < 4 + 8 + 1 or bytes[0] < "\x20" or bytes[0] > "\x2F": print >>sys.stderr, "wanted data, dropped type=0x%02x, len=%d" % ( ord(bytes[0]), len(bytes)) continue seq, type, x, y = struct.unpack("BBBB", bytes[:4]) data = [struct.unpack(">hhhh", bytes[x:x+8]) for x in range(4, len(bytes) - 8, 8)] if self._last_seq and seq & 0xF != (self._last_seq + 1) & 0xF: print >>sys.stderr, "data sequence skipped, lost packet?" self._last_seq = seq if type == 0: if not self._coarse_scale or not self._fine_scale: print >>sys.stderr, "waiting for calibration, dropped data packet" continue def scale(val): if val & 1: return ((val & ~1) - self._coarse_zero) * self._coarse_scale else: return (val - self._fine_zero) * self._fine_scale out_main = [] out_usb = [] for main, usb, aux, voltage in data: out_main.append(scale(main)) out_usb.append(scale(usb)) return (out_main, out_usb) elif type == 1: self._fine_zero = data[0][0] self._coarse_zero = data[1][0] # print >>sys.stderr, "zero calibration: fine 0x%04x, coarse 0x%04x" % ( # self._fine_zero, self._coarse_zero) elif type == 2: self._fine_ref = data[0][0] self._coarse_ref = data[1][0] # print >>sys.stderr, "ref calibration: fine 0x%04x, coarse 0x%04x" % ( # self._fine_ref, self._coarse_ref) else: print >>sys.stderr, "discarding data packet type=0x%02x" % type continue if self._coarse_ref != self._coarse_zero: self._coarse_scale = 2.88 / (self._coarse_ref - self._coarse_zero) if self._fine_ref != self._fine_zero: self._fine_scale = 0.0332 / (self._fine_ref - self._fine_zero) def _SendStruct(self, fmt, *args): """ Pack a struct (without length or checksum) and send it. """ data = struct.pack(fmt, *args) data_len = len(data) + 1 checksum = (data_len + sum(struct.unpack("B" * len(data), data))) % 256 out = struct.pack("B", data_len) + data + struct.pack("B", checksum) self.ser.write(out) def _ReadPacket(self): """ Read a single data record as a string (without length or checksum). """ len_char = self.ser.read(1) if not len_char: print >>sys.stderr, "timeout reading from serial port" return None data_len = struct.unpack("B", len_char) data_len = ord(len_char) if not data_len: return "" result = self.ser.read(data_len) if len(result) != data_len: return None body = result[:-1] checksum = (data_len + sum(struct.unpack("B" * len(body), body))) % 256 if result[-1] != struct.pack("B", checksum): print >>sys.stderr, "invalid checksum from serial port" return None return result[:-1] def _FlushInput(self): """ Flush all read data until no more available. """ self.ser.flush() flushed = 0 while True: ready_r, ready_w, ready_x = select.select([self.ser], [], [self.ser], 0) if len(ready_x) > 0: print >>sys.stderr, "exception from serial port" return None elif len(ready_r) > 0: flushed += 1 self.ser.read(1) # This may cause underlying buffering. self.ser.flush() # Flush the underlying buffer too. else: break if flushed > 0: print >>sys.stderr, "dropped >%d bytes" % flushed def main(argv): """ Simple command-line interface for Monsoon.""" useful_flags = ["voltage", "status", "usbpassthrough", "samples", "current"] if not [f for f in useful_flags if FLAGS.get(f, None) is not None]: print __doc__.strip() print FLAGS.MainModuleHelp() return if FLAGS.includeusb: num_channels = 2 else: num_channels = 1 if FLAGS.avg and FLAGS.avg < 0: print "--avg must be greater than 0" return mon = Monsoon(device=FLAGS.device, serialno=FLAGS.serialno) if FLAGS.voltage is not None: if FLAGS.ramp is not None: mon.RampVoltage(mon.start_voltage, FLAGS.voltage) else: mon.SetVoltage(FLAGS.voltage) if FLAGS.current is not None: mon.SetMaxCurrent(FLAGS.current) if FLAGS.status: items = sorted(mon.GetStatus().items()) print "\n".join(["%s: %s" % item for item in items]) if FLAGS.usbpassthrough: if FLAGS.usbpassthrough == 'off': mon.SetUsbPassthrough(0) elif FLAGS.usbpassthrough == 'on': mon.SetUsbPassthrough(1) elif FLAGS.usbpassthrough == 'auto': mon.SetUsbPassthrough(2) else: sys.exit('bad passthrough flag: %s' % FLAGS.usbpassthrough) if FLAGS.samples: # Make sure state is normal mon.StopDataCollection() status = mon.GetStatus() native_hz = status["sampleRate"] * 1000 # Collect and average samples as specified mon.StartDataCollection() # In case FLAGS.hz doesn't divide native_hz exactly, use this invariant: # 'offset' = (consumed samples) * FLAGS.hz - (emitted samples) * native_hz # This is the error accumulator in a variation of Bresenham's algorithm. emitted = offset = 0 chan_buffers = tuple([] for _ in range(num_channels)) # past n samples for rolling average history_deques = tuple(collections.deque() for _ in range(num_channels)) try: last_flush = time.time() while emitted < FLAGS.samples or FLAGS.samples == -1: # The number of raw samples to consume before emitting the next output need = (native_hz - offset + FLAGS.hz - 1) / FLAGS.hz if need > len(chan_buffers[0]): # still need more input samples chans_samples = mon.CollectData() if not all(chans_samples): break for chan_buffer, chan_samples in zip(chan_buffers, chans_samples): chan_buffer.extend(chan_samples) else: # Have enough data, generate output samples. # Adjust for consuming 'need' input samples. offset += need * FLAGS.hz while offset >= native_hz: # maybe multiple, if FLAGS.hz > native_hz this_sample = [sum(chan[:need]) / need for chan in chan_buffers] if FLAGS.timestamp: print int(time.time()), if FLAGS.avg: chan_avgs = [] for chan_deque, chan_sample in zip(history_deques, this_sample): chan_deque.appendleft(chan_sample) if len(chan_deque) > FLAGS.avg: chan_deque.pop() chan_avgs.append(sum(chan_deque) / len(chan_deque)) # Interleave channel rolling avgs with latest channel data data_to_print = [datum for pair in zip(this_sample, chan_avgs) for datum in pair] else: data_to_print = this_sample fmt = ' '.join('%f' for _ in data_to_print) print fmt % tuple(data_to_print) sys.stdout.flush() offset -= native_hz emitted += 1 # adjust for emitting 1 output sample chan_buffers = tuple(c[need:] for c in chan_buffers) now = time.time() if now - last_flush >= 0.99: # flush every second sys.stdout.flush() last_flush = now except KeyboardInterrupt: print >>sys.stderr, "interrupted" mon.StopDataCollection() if __name__ == '__main__': # Define flags here to avoid conflicts with people who use us as a library flags.DEFINE_boolean("status", None, "Print power meter status") flags.DEFINE_integer("avg", None, "Also report average over last n data points") flags.DEFINE_float("voltage", None, "Set output voltage (0 for off)") flags.DEFINE_float("current", None, "Set max output current") flags.DEFINE_string("usbpassthrough", None, "USB control (on, off, auto)") flags.DEFINE_integer("samples", None, "Collect and print this many samples. " "-1 means collect indefinitely.") flags.DEFINE_integer("hz", 5000, "Print this many samples/sec") flags.DEFINE_string("device", None, "Path to the device in /dev/... (ex:/dev/ttyACM1)") flags.DEFINE_integer("serialno", None, "Look for a device with this serial number") flags.DEFINE_boolean("timestamp", None, "Also print integer (seconds) timestamp on each line") flags.DEFINE_boolean("ramp", True, "Gradually increase voltage") flags.DEFINE_boolean("includeusb", False, "Include measurements from USB channel") main(FLAGS(sys.argv))