1#!/usr/bin/env python 2# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6# Description: 7# 8# Class for handling linux 'evdev' input devices. 9# 10# Provides evtest-like functionality if run from the command line: 11# $ input_device.py -d /dev/input/event6 12 13""" Read properties and events of a linux input device. """ 14 15import array 16import copy 17import fcntl 18import os.path 19import select 20import struct 21import time 22 23from collections import OrderedDict 24 25from linux_input import * 26 27 28class Valuator: 29 """ A Valuator just stores a value """ 30 def __init__(self): 31 self.value = 0 32 33class SwValuator(Valuator): 34 """ A Valuator used for EV_SW (switch) events """ 35 def __init__(self, value): 36 self.value = value 37 38class AbsValuator(Valuator): 39 """ 40 An AbsValuator, used for EV_ABS events stores a value as well as other 41 properties of the corresponding absolute axis. 42 """ 43 def __init__(self, value, min_value, max_value, fuzz, flat, resolution): 44 self.value = value 45 self.min = min_value 46 self.max = max_value 47 self.fuzz = fuzz 48 self.flat = flat 49 self.resolution = resolution 50 51 52class InputEvent: 53 """ 54 Linux evdev input event 55 56 An input event has the following fields which can be accessed as public 57 properties of this class: 58 tv_sec 59 tv_usec 60 type 61 code 62 value 63 """ 64 def __init__(self, tv_sec=0, tv_usec=0, type=0, code=0, value=0): 65 self.format = input_event_t 66 self.format_size = struct.calcsize(self.format) 67 (self.tv_sec, self.tv_usec, self.type, self.code, 68 self.value) = (tv_sec, tv_usec, type, code, value) 69 70 def read(self, stream): 71 """ Read an input event from the provided stream and unpack it. """ 72 packed = stream.read(self.format_size) 73 (self.tv_sec, self.tv_usec, self.type, self.code, 74 self.value) = struct.unpack(self.format, packed) 75 76 def write(self, stream): 77 """ Pack an input event and write it to the provided stream. """ 78 packed = struct.pack(self.format, self.tv_sec, self.tv_usec, self.type, 79 self.code, self.value) 80 stream.write(packed) 81 stream.flush() 82 83 def __str__(self): 84 t = EV_TYPES.get(self.type, self.type) 85 if self.type in EV_STRINGS: 86 c = EV_STRINGS[self.type].get(self.code, self.code) 87 else: 88 c = self.code 89 return ('%d.%06d: %s[%s] = %d' % 90 (self.tv_sec, self.tv_usec, t, c, self.value)) 91 92 93class InputDevice: 94 """ 95 Linux evdev input device 96 97 A linux kernel "evdev" device sends a stream of "input events". 98 These events are grouped together into "input reports", which is a set of 99 input events ending in a single EV_SYN/SYN_REPORT event. 100 101 Each input event is tagged with a type and a code. 102 A given input device supports a subset of the possible types, and for 103 each type it supports a subset of the possible codes for that type. 104 105 The device maintains a "valuator" for each supported type/code pairs. 106 There are two types of "valuators": 107 Normal valuators represent just a value. 108 Absolute valuators are only for EV_ABS events. They have more fields: 109 value, minimum, maximum, resolution, fuzz, flatness 110 Note: Relative and Absolute "Valuators" are also often called relative 111 and absolute axis, respectively. 112 113 The evdev protocol is stateful. Input events are only sent when the values 114 of a valuator actually changes. This dramatically reduces the stream of 115 events emenating from the kernel. 116 117 Multitouch devices are a special case. There are two types of multitouch 118 devices defined in the kernel: 119 Multitouch type "A" (MT-A) devices: 120 In each input report, the device sends an unordered list of all 121 active contacts. The data for each active contact is separated 122 in the input report by an EV_SYN/SYN_MT_REPORT event. 123 Thus, the MT-A contact event stream is not stateful. 124 Note: MT-A is not currently supported by this class. 125 126 Multitouch type "B" (MT-B) devices: 127 The device maintains a list of slots, where each slot contains a 128 single contact. In each input report, the device only sends 129 information about the slots that have changed. 130 Thus, the MT-B contact event stream is stateful. 131 When reporting multiple slots, the EV_ABS/MT_SLOT valuator is used 132 to indicate the 'current' slot for which subsequent EV_ABS/ABS_MT_* 133 valuator events apply. 134 An inactive slot has EV_ABS/ABS_MT_TRACKING_ID == -1 135 Active slots have EV_ABS/ABS_MT_TRACKING_ID >= 0 136 137 Besides maintaining the set of supported ABS_MT valuators in the supported 138 valuator list, a array of slots is also maintained. Each slot has its own 139 unique copy of just the supported ABS_MT valuators. This represents the 140 current state of that slot. 141 """ 142 143 def __init__(self, path, ev_syn_cb=None): 144 """ 145 Constructor opens the device file and probes its properties. 146 147 Note: The device file is left open when the constructor exits. 148 """ 149 self.path = path 150 self.ev_syn_cb = ev_syn_cb 151 self.events = {} # dict { ev_type : dict { ev_code : Valuator } } 152 self.mt_slots = [] # [ dict { mt_code : AbsValuator } ] * |MT-B slots| 153 154 # Open the device node, and use ioctls to probe its properties 155 self.f = None 156 self.f = open(path, 'rb+', buffering=0) 157 self._ioctl_version() 158 self._ioctl_id() 159 self._ioctl_name() 160 for t in self._ioctl_types(): 161 self._ioctl_codes(t) 162 self._setup_mt_slots() 163 164 def __del__(self): 165 """ 166 Deconstructor closes the device file, if it is open. 167 """ 168 if self.f and not self.f.closed: 169 self.f.close() 170 171 def process_event(self, ev): 172 """ 173 Processes an incoming input event. 174 175 Returns True for EV_SYN/SYN_REPORT events to indicate that a complete 176 input report has been received. 177 178 Returns False for other events. 179 180 Events not supported by this device are silently ignored. 181 182 For MT events, updates the slot valuator value for the current slot. 183 If current slot is the 'primary' slot, also updates the events entry. 184 185 For all other events, updates the corresponding valuator value. 186 """ 187 if ev.type == EV_SYN and ev.code == SYN_REPORT: 188 return True 189 elif ev.type not in self.events or ev.code not in self.events[ev.type]: 190 return False 191 elif self.is_mt_b() and ev.type == EV_ABS and ev.code in ABS_MT_RANGE: 192 # TODO: Handle MT-A 193 slot = self._get_current_slot() 194 slot[ev.code].value = ev.value 195 # if the current slot is the "primary" slot, 196 # update the events[] entry, too. 197 if slot == self._get_mt_primary_slot(): 198 self.events[ev.type][ev.code].value = ev.value 199 else: 200 self.events[ev.type][ev.code].value = ev.value 201 return False 202 203 def _ioctl_version(self): 204 """ Queries device file for version information. """ 205 # Version is a 32-bit integer, which encodes 8-bit major version, 206 # 8-bit minor version and 16-bit revision. 207 version = array.array('I', [0]) 208 fcntl.ioctl(self.f, EVIOCGVERSION, version, 1) 209 self.version = (version[0] >> 16, (version[0] >> 8) & 0xff, 210 version[0] & 0xff) 211 212 def _ioctl_id(self): 213 """ Queries device file for input device identification. """ 214 # struct input_id is 4 __u16 215 gid = array.array('H', [0] * 4) 216 fcntl.ioctl(self.f, EVIOCGID, gid, 1) 217 self.id_bus = gid[ID_BUS] 218 self.id_vendor = gid[ID_VENDOR] 219 self.id_product = gid[ID_PRODUCT] 220 self.id_version = gid[ID_VERSION] 221 222 def _ioctl_name(self): 223 """ Queries device file for the device name. """ 224 # Device name is a C-string up to 255 bytes in length. 225 name_len = 255 226 name = array.array('B', [0] * name_len) 227 name_len = fcntl.ioctl(self.f, EVIOCGNAME(name_len), name, 1) 228 self.name = name[0:name_len-1].tostring() 229 230 def _ioctl_get_switch(self, sw): 231 """ 232 Queries device file for current value of all switches and returns 233 a boolean indicating whether the switch sw is set. 234 """ 235 size = SW_CNT / 8 # Buffer size of one __u16 236 buf = array.array('H', [0]) 237 fcntl.ioctl(self.f, EVIOCGSW(size), buf) 238 return SwValuator(((buf[0] >> sw) & 0x01) == 1) 239 240 def _ioctl_absinfo(self, axis): 241 """ 242 Queries device file for absinfo structure for given absolute axis. 243 """ 244 # struct input_absinfo is 6 __s32 245 a = array.array('i', [0] * 6) 246 fcntl.ioctl(self.f, EVIOCGABS(axis), a, 1) 247 return AbsValuator(a[0], a[1], a[2], a[3], a[4], a[5]) 248 249 def _ioctl_codes(self, ev_type): 250 """ 251 Queries device file for supported event codes for given event type. 252 """ 253 self.events[ev_type] = {} 254 if ev_type not in EV_SIZES: 255 return 256 257 size = EV_SIZES[ev_type] / 8 # Convert bits to bytes 258 ev_code = array.array('B', [0] * size) 259 try: 260 count = fcntl.ioctl(self.f, EVIOCGBIT(ev_type, size), ev_code, 1) 261 for c in range(count * 8): 262 if test_bit(c, ev_code): 263 if ev_type == EV_ABS: 264 self.events[ev_type][c] = self._ioctl_absinfo(c) 265 elif ev_type == EV_SW: 266 self.events[ev_type][c] = self._ioctl_get_switch(c) 267 else: 268 self.events[ev_type][c] = Valuator() 269 except IOError as (errno, strerror): 270 # Errno 22 signifies that this event type has no event codes. 271 if errno != 22: 272 raise 273 274 def _ioctl_types(self): 275 """ Queries device file for supported event types. """ 276 ev_types = array.array('B', [0] * (EV_CNT / 8)) 277 fcntl.ioctl(self.f, EVIOCGBIT(EV_SYN, EV_CNT / 8), ev_types, 1) 278 types = [] 279 for t in range(EV_CNT): 280 if test_bit(t, ev_types): 281 types.append(t) 282 return types 283 284 def _convert_slot_index_to_slot_id(self, index): 285 """ Convert a slot index in self.mt_slots to its slot id. """ 286 return self.abs_mt_slot.min + index 287 288 def _ioctl_mt_slots(self): 289 """Query mt slots values using ioctl. 290 291 The ioctl buffer argument should be binary equivalent to 292 struct input_mt_request_layout { 293 __u32 code; 294 __s32 values[num_slots]; 295 296 Note that the slots information returned by EVIOCGMTSLOTS 297 corresponds to the slot ids ranging from abs_mt_slot.min to 298 abs_mt_slot.max which is not necessarily the same as the 299 slot indexes ranging from 0 to num_slots - 1 in self.mt_slots. 300 We need to map between the slot index and the slot id correctly. 301 }; 302 """ 303 # Iterate through the absolute mt events that are supported. 304 for c in range(ABS_MT_FIRST, ABS_MT_LAST): 305 if c not in self.events[EV_ABS]: 306 continue 307 # Sync with evdev kernel driver for the specified code c. 308 mt_slot_info = array.array('i', [c] + [0] * self.num_slots) 309 mt_slot_info_len = (self.num_slots + 1) * mt_slot_info.itemsize 310 fcntl.ioctl(self.f, EVIOCGMTSLOTS(mt_slot_info_len), mt_slot_info) 311 values = mt_slot_info[1:] 312 for slot_index in range(self.num_slots): 313 slot_id = self._convert_slot_index_to_slot_id(slot_index) 314 self.mt_slots[slot_index][c].value = values[slot_id] 315 316 def _setup_mt_slots(self): 317 """ 318 Sets up the device's mt_slots array. 319 320 Each element of the mt_slots array is initialized as a deepcopy of a 321 dict containing all of the MT valuators from the events dict. 322 """ 323 # TODO(djkurtz): MT-A 324 if not self.is_mt_b(): 325 return 326 ev_abs = self.events[EV_ABS] 327 # Create dict containing just the MT valuators 328 mt_abs_info = dict((axis, ev_abs[axis]) 329 for axis in ev_abs 330 if axis in ABS_MT_RANGE) 331 332 # Initialize TRACKING_ID to -1 333 mt_abs_info[ABS_MT_TRACKING_ID].value = -1 334 335 # Make a copy of mt_abs_info for each MT slot 336 self.abs_mt_slot = ev_abs[ABS_MT_SLOT] 337 self.num_slots = self.abs_mt_slot.max - self.abs_mt_slot.min + 1 338 for s in range(self.num_slots): 339 self.mt_slots.append(copy.deepcopy(mt_abs_info)) 340 341 self._ioctl_mt_slots() 342 343 def get_current_slot_id(self): 344 """ 345 Return the current slot id. 346 """ 347 if not self.is_mt_b(): 348 return None 349 return self.events[EV_ABS][ABS_MT_SLOT].value 350 351 def _get_current_slot(self): 352 """ 353 Returns the current slot, as indicated by the last ABS_MT_SLOT event. 354 """ 355 current_slot_id = self.get_current_slot_id() 356 if current_slot_id is None: 357 return None 358 return self.mt_slots[current_slot_id] 359 360 def _get_tid(self, slot): 361 """ Returns the tracking_id for the given MT slot. """ 362 return slot[ABS_MT_TRACKING_ID].value 363 364 def _get_mt_valid_slots(self): 365 """ 366 Returns a list of valid slots. 367 368 A valid slot is a slot whose tracking_id != -1. 369 """ 370 return [s for s in self.mt_slots if self._get_tid(s) != -1] 371 372 def _get_mt_primary_slot(self): 373 """ 374 Returns the "primary" MT-B slot. 375 376 The "primary" MT-B slot is arbitrarily chosen as the slot with lowest 377 tracking_id (> -1). It is used to make an MT-B device look like 378 single-touch (ST) device. 379 """ 380 slot = None 381 for s in self.mt_slots: 382 tid = self._get_tid(s) 383 if tid < 0: 384 continue 385 if not slot or tid < self._get_tid(slot): 386 slot = s 387 return slot 388 389 def _code_if_mt(self, type, code): 390 """ 391 Returns MT-equivalent event code for certain specific event codes 392 """ 393 if type != EV_ABS: 394 return code 395 elif code == ABS_X: 396 return ABS_MT_POSITION_X 397 elif code == ABS_Y: 398 return ABS_MT_POSITION_Y 399 elif code == ABS_PRESSURE: 400 return ABS_MT_PRESSURE 401 elif code == ABS_TOOL_WIDTH: 402 return ABS_TOUCH_MAJOR 403 else: 404 return code 405 406 def _get_valuator(self, type, code): 407 """ Returns Valuator for given event type and code """ 408 if (not type in self.events) or (not code in self.events[type]): 409 return None 410 if type == EV_ABS: 411 code = self._code_if_mt(type, code) 412 return self.events[type][code] 413 414 def _get_value(self, type, code): 415 """ 416 Returns the value of the valuator with the give event (type, code). 417 """ 418 axis = self._get_valuator(type, code) 419 if not axis: 420 return None 421 return axis.value 422 423 def _get_min(self, type, code): 424 """ 425 Returns the min value of the valuator with the give event (type, code). 426 427 Note: Only AbsValuators (EV_ABS) have max values. 428 """ 429 axis = self._get_valuator(type, code) 430 if not axis: 431 return None 432 return axis.min 433 434 def _get_max(self, type, code): 435 """ 436 Returns the min value of the valuator with the give event (type, code). 437 438 Note: Only AbsValuators (EV_ABS) have max values. 439 """ 440 axis = self._get_valuator(type, code) 441 if not axis: 442 return None 443 return axis.max 444 445 """ Public accessors """ 446 447 def get_num_fingers(self): 448 if self.is_mt_b(): 449 return len(self._get_mt_valid_slots()) 450 elif self.is_mt_a(): 451 return 0 # TODO(djkurtz): MT-A 452 else: # Single-Touch case 453 if not self._get_value(EV_KEY, BTN_TOUCH) == 1: 454 return 0 455 elif self._get_value(EV_KEY, BTN_TOOL_TRIPLETAP) == 1: 456 return 3 457 elif self._get_value(EV_KEY, BTN_TOOL_DOUBLETAP) == 1: 458 return 2 459 elif self._get_value(EV_KEY, BTN_TOOL_FINGER) == 1: 460 return 1 461 else: 462 return 0 463 464 def get_x(self): 465 return self._get_value(EV_ABS, ABS_X) 466 467 def get_x_min(self): 468 return self._get_min(EV_ABS, ABS_X) 469 470 def get_x_max(self): 471 return self._get_max(EV_ABS, ABS_X) 472 473 def get_y(self): 474 return self._get_value(EV_ABS, ABS_Y) 475 476 def get_y_min(self): 477 return self._get_min(EV_ABS, ABS_Y) 478 479 def get_y_max(self): 480 return self._get_max(EV_ABS, ABS_Y) 481 482 def get_pressure(self): 483 return self._get_value(EV_ABS, ABS_PRESSURE) 484 485 def get_pressure_min(self): 486 return self._get_min(EV_ABS, ABS_PRESSURE) 487 488 def get_pressure_max(self): 489 return self._get_max(EV_ABS, ABS_PRESSURE) 490 491 def get_left(self): 492 return int(self._get_value(EV_KEY, BTN_LEFT) == 1) 493 494 def get_right(self): 495 return int(self._get_value(EV_KEY, BTN_RIGHT) == 1) 496 497 def get_middle(self): 498 return int(self._get_value(EV_KEY, BTN_MIDDLE) == 1) 499 500 def get_microphone_insert(self): 501 return self._get_value(EV_SW, SW_MICROPHONE_INSERT) 502 503 def get_headphone_insert(self): 504 return self._get_value(EV_SW, SW_HEADPHONE_INSERT) 505 506 def get_lineout_insert(self): 507 return self._get_value(EV_SW, SW_LINEOUT_INSERT) 508 509 def is_touchpad(self): 510 return ((EV_KEY in self.events) and 511 (BTN_TOOL_FINGER in self.events[EV_KEY]) and 512 (EV_ABS in self.events)) 513 514 def is_keyboard(self): 515 return ((EV_KEY in self.events) and 516 (KEY_F2 in self.events[EV_KEY])) 517 518 def is_touchscreen(self): 519 return ((EV_KEY in self.events) and 520 (BTN_TOUCH in self.events[EV_KEY]) and 521 (not BTN_TOOL_FINGER in self.events[EV_KEY]) and 522 (EV_ABS in self.events)) 523 524 def is_mt_b(self): 525 return self.is_mt() and ABS_MT_SLOT in self.events[EV_ABS] 526 527 def is_mt_a(self): 528 return self.is_mt() and ABS_MT_SLOT not in self.events[EV_ABS] 529 530 def is_mt(self): 531 return (EV_ABS in self.events and 532 (set(self.events[EV_ABS]) & set(ABS_MT_RANGE))) 533 534 def is_hp_jack(self): 535 return (EV_SW in self.events and 536 SW_HEADPHONE_INSERT in self.events[EV_SW]) 537 538 def is_mic_jack(self): 539 return (EV_SW in self.events and 540 SW_MICROPHONE_INSERT in self.events[EV_SW]) 541 542 def is_audio_jack(self): 543 return (EV_SW in self.events and 544 ((SW_HEADPHONE_INSERT in self.events[EV_SW]) or 545 (SW_MICROPHONE_INSERT in self.events[EV_SW] or 546 (SW_LINEOUT_INSERT in self.events[EV_SW])))) 547 548 """ Debug helper print functions """ 549 550 def print_abs_info(self, axis): 551 if EV_ABS in self.events and axis in self.events[EV_ABS]: 552 a = self.events[EV_ABS][axis] 553 print ' Value %6d' % a.value 554 print ' Min %6d' % a.min 555 print ' Max %6d' % a.max 556 if a.fuzz != 0: 557 print ' Fuzz %6d' % a.fuzz 558 if a.flat != 0: 559 print ' Flat %6d' % a.flat 560 if a.resolution != 0: 561 print ' Resolution %6d' % a.resolution 562 563 def print_props(self): 564 print ('Input driver Version: %d.%d.%d' % 565 (self.version[0], self.version[1], self.version[2])) 566 print ('Input device ID: bus %x vendor %x product %x version %x' % 567 (self.id_bus, self.id_vendor, self.id_product, self.id_version)) 568 print 'Input device name: "%s"' % (self.name) 569 for t in self.events: 570 print ' Event type %d (%s)' % (t, EV_TYPES.get(t, '?')) 571 for c in self.events[t]: 572 if (t in EV_STRINGS): 573 code = EV_STRINGS[t].get(c, '?') 574 print ' Event code %s (%d)' % (code, c) 575 else: 576 print ' Event code (%d)' % (c) 577 self.print_abs_info(c) 578 579 def get_slots(self): 580 """ Get those slots with positive tracking IDs. """ 581 slot_dict = OrderedDict() 582 for slot_index in range(self.num_slots): 583 slot = self.mt_slots[slot_index] 584 if self._get_tid(slot) == -1: 585 continue 586 slot_id = self._convert_slot_index_to_slot_id(slot_index) 587 slot_dict[slot_id] = slot 588 return slot_dict 589 590 def print_slots(self): 591 slot_dict = self.get_slots() 592 for slot_id, slot in slot_dict.items(): 593 print 'slot #%d' % slot_id 594 for a in slot: 595 abs = EV_STRINGS[EV_ABS].get(a, '?') 596 print ' %s = %6d' % (abs, slot[a].value) 597 598 599def print_report(device): 600 print '----- EV_SYN -----' 601 if device.is_touchpad(): 602 f = device.get_num_fingers() 603 if f == 0: 604 return 605 x = device.get_x() 606 y = device.get_y() 607 z = device.get_pressure() 608 l = device.get_left() 609 print 'Left=%d Fingers=%d X=%d Y=%d Pressure=%d' % (l, f, x, y, z) 610 if device.is_mt(): 611 device.print_slots() 612 613 614if __name__ == "__main__": 615 from optparse import OptionParser 616 import glob 617 parser = OptionParser() 618 619 parser.add_option("-a", "--audio_jack", action="store_true", 620 dest="audio_jack", default=False, 621 help="Find and use all audio jacks") 622 parser.add_option("-d", "--devpath", dest="devpath", 623 default="/dev/input/event0", 624 help="device path (/dev/input/event0)") 625 parser.add_option("-q", "--quiet", action="store_false", dest="verbose", 626 default=True, help="print less messages to stdout") 627 parser.add_option("-t", "--touchpad", action="store_true", dest="touchpad", 628 default=False, help="Find and use first touchpad device") 629 (options, args) = parser.parse_args() 630 631 # TODO: Use gudev to detect touchpad 632 devices = [] 633 if options.touchpad: 634 for path in glob.glob('/dev/input/event*'): 635 device = InputDevice(path) 636 if device.is_touchpad(): 637 print 'Using touchpad %s.' % path 638 options.devpath = path 639 devices.append(device) 640 break 641 else: 642 print 'No touchpad found!' 643 exit() 644 elif options.audio_jack: 645 for path in glob.glob('/dev/input/event*'): 646 device = InputDevice(path) 647 if device.is_audio_jack(): 648 devices.append(device) 649 device = None 650 elif os.path.exists(options.devpath): 651 print 'Using %s.' % options.devpath 652 devices.append(InputDevice(options.devpath)) 653 else: 654 print '%s does not exist.' % options.devpath 655 exit() 656 657 for device in devices: 658 device.print_props() 659 if device.is_touchpad(): 660 print ('x: (%d,%d), y: (%d,%d), z: (%d, %d)' % 661 (device.get_x_min(), device.get_x_max(), 662 device.get_y_min(), device.get_y_max(), 663 device.get_pressure_min(), device.get_pressure_max())) 664 device.print_slots() 665 print 'Number of fingers: %d' % device.get_num_fingers() 666 print 'Current slot id: %d' % device.get_current_slot_id() 667 print '------------------' 668 print 669 670 ev = InputEvent() 671 while True: 672 _rl, _, _ = select.select([d.f for d in devices], [], []) 673 for fd in _rl: 674 # Lookup for the device which owns fd. 675 device = [d for d in devices if d.f == fd][0] 676 try: 677 ev.read(fd) 678 except KeyboardInterrupt: 679 exit() 680 is_syn = device.process_event(ev) 681 print ev 682 if is_syn: 683 print_report(device) 684