1#!/usr/bin/python 2# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Wrapper for an RF switch built on an Elexol EtherIO24. 7 8The EtherIO is documented at 9 http://www.elexol.com/IO_Modules/Ether_IO_24_Dip_R.php 10 11This file is both a python module and a command line utility to speak 12to the module 13""" 14 15import cellular_logging 16import collections 17import socket 18import struct 19import sys 20 21log = cellular_logging.SetupCellularLogging('ether_io_rf_switch') 22 23 24class Error(Exception): 25 pass 26 27 28class EtherIo24(object): 29 """Encapsulates an EtherIO24 UDP-GPIO bridge.""" 30 31 def __init__(self, hostname, port=2424): 32 self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 33 self.socket.bind(('', 0)) 34 self.destination = (hostname, port) 35 self.socket.settimeout(3) # In seconds 36 37 def SendPayload(self, payload): 38 self.socket.sendto(payload, self.destination) 39 40 def SendOperation(self, opcode, list_bytes): 41 """Sends the specified opcode with [list_bytes] as an argument.""" 42 payload = opcode + struct.pack(('=%dB' % len(list_bytes)), *list_bytes) 43 self.SendPayload(payload) 44 return payload 45 46 def SendCommandVerify(self, write_opcode, list_bytes, read_opcode=None): 47 """Sends opcode and bytes, 48 then reads to make sure command was executed.""" 49 if read_opcode is None: 50 read_opcode = write_opcode.lower() 51 for _ in xrange(3): 52 write_sent = self.SendOperation(write_opcode, list_bytes) 53 self.SendOperation(read_opcode, list_bytes) 54 try: 55 response = self.AwaitResponse() 56 if response == write_sent: 57 return 58 else: 59 log.warning('Unexpected reply: sent %s, got %s', 60 write_sent.encode('hex_codec'), 61 response.encode('hex_codec')) 62 except socket.timeout: 63 log.warning('Timed out awaiting reply for %s', write_opcode) 64 continue 65 raise Error('Failed to execute %s' % write_sent.encode('hex_codec')) 66 67 def AwaitResponse(self): 68 (response, address) = self.socket.recvfrom(65536) 69 if (socket.gethostbyname(address[0]) != 70 socket.gethostbyname(self.destination[0])): 71 log.warning('Unexpected reply source: %s (expected %s)', 72 address, self.destination) 73 return response 74 75 76class RfSwitch(object): 77 """An RF switch hooked to an Elexol EtherIO24.""" 78 79 def __init__(self, ip): 80 self.io = EtherIo24(ip) 81 # Must run on pythons without 0bxxx notation. These are 1110, 82 # 1101, 1011, 0111 83 decode = [0xe, 0xd, 0xb, 0x7] 84 85 self.port_mapping = [] 86 for upper in xrange(3): 87 for lower in xrange(4): 88 self.port_mapping.append(decode[upper] << 4 | decode[lower]) 89 90 def SelectPort(self, n): 91 """Connects port n to the RF generator.""" 92 # Set all pins to output 93 94 # !A0: all pins output 95 self.io.SendCommandVerify('!A', [0]) 96 self.io.SendCommandVerify('A', [self.port_mapping[n]]) 97 98 def Query(self): 99 """Returns (binary port status, selected port, port direction).""" 100 self.io.SendOperation('!a', []) 101 raw_direction = self.io.AwaitResponse() 102 direction = ord(raw_direction[2]) 103 104 self.io.SendOperation('a', []) 105 status = ord(self.io.AwaitResponse()[1]) 106 try: 107 port = self.port_mapping.index(status) 108 except ValueError: 109 port = None 110 111 return status, port, direction 112 113 114def CommandLineUtility(arguments): 115 """Command line utility to control a switch.""" 116 117 def Select(switch, remaining_args): 118 switch.SelectPort(int(remaining_args.popleft())) 119 120 def Query(switch, unused_remaining_args): 121 (raw_status, port, direction) = switch.Query() 122 if direction != 0x00: 123 print 'Warning: Direction register is %x, should be 0x00' % \ 124 direction 125 if port is None: 126 port_str = 'Invalid' 127 else: 128 port_str = str(port) 129 print 'Port %s (0x%x)' % (port_str, raw_status) 130 131 def Usage(): 132 print 'usage: %s hostname {query|select portnumber}' % sys.argv[0] 133 exit(1) 134 135 try: 136 hostname = arguments.popleft() 137 operation = arguments.popleft() 138 139 switch = RfSwitch(hostname) 140 141 if operation == 'query': 142 Query(switch, arguments) 143 elif operation == 'select': 144 Select(switch, arguments) 145 else: 146 Usage() 147 except IndexError: 148 Usage() 149 150if __name__ == '__main__': 151 CommandLineUtility(collections.deque(sys.argv[1:])) 152