1#!/usr/bin/env python 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import ctypes 7import select 8 9import xi2 10import xlib 11 12 13class XI2Reader(object): 14 """A reader to create connection to X server and read x input events.""" 15 def __init__(self, display_name=':0'): 16 """Constructor 17 18 Args: 19 display_name: The X window display name. 20 """ 21 self._display = xlib.XOpenDisplay(display_name) 22 self._window = xlib.XDefaultRootWindow(self._display) 23 self._data = [] 24 25 self._register() 26 27 # Consumes the very first traffic within the connection with X server. 28 xlib.XFlush(self._display) 29 30 def _register(self): 31 """Registers device and events to listen on""" 32 mask = xi2.XIEventMask() 33 mask.deviceid = xi2.XIAllDevices 34 mask.mask_len = xi2.XIMaskLen(xi2.XI_RawMotion) 35 mask.mask = ctypes.cast((ctypes.c_ubyte * mask.mask_len)(), 36 ctypes.POINTER(ctypes.c_ubyte)) 37 38 self._set_mask(mask.mask, xi2.XI_RawKeyPress) 39 self._set_mask(mask.mask, xi2.XI_RawKeyRelease) 40 self._set_mask(mask.mask, xi2.XI_RawButtonPress) 41 self._set_mask(mask.mask, xi2.XI_RawButtonRelease) 42 self._set_mask(mask.mask, xi2.XI_RawMotion) 43 44 xi2.XISelectEvents(self._display, self._window, ctypes.pointer(mask), 1) 45 xlib.XSelectInput(self._display, self._window, ctypes.c_long(0)) 46 47 def _set_mask(self, ptr, event): 48 """Sets event mask""" 49 val = xi2.XISetMask(ptr, event) 50 ptr[event >> 3] = val 51 52 def get_valuator_names(self, device_id): 53 """Gets the valuator names for device. 54 55 Return: 56 An dictionary maps valuator index to descriptive names. 57 Sample output: 58 { 59 0: 'Rel X', 60 1: 'Rel Y', 61 2: 'Abs Start Timestamp', 62 3: 'Abs End Timestamp', 63 4: 'Rel Vert Wheel', 64 5: 'Rel Horiz Wheel' 65 } 66 """ 67 num_devices = ctypes.c_int() 68 device = xi2.XIQueryDevice(self._display, device_id, 69 ctypes.pointer(num_devices)).contents 70 71 valuator_names = [] 72 for i in range(device.num_classes): 73 if device.classes[i].contents.type == xi2.XIValuatorClass: 74 valuator_class_info = ctypes.cast(device.classes[i], 75 ctypes.POINTER(xi2.XIValuatorClassInfo)).contents 76 valuator_names.append(xlib.XGetAtomName(reader._display, 77 valuator_class_info.label)) 78 valuator_names_dict = {} 79 for i in range(len(valuator_names)): 80 valuator_names_dict[i] = valuator_names[i] 81 return valuator_names_dict 82 83 def get_connection_number(self): 84 """Gets the file descriptor number for the connection with X server""" 85 return xlib.XConnectionNumber(reader._display) 86 87 def read_pending_events(self): 88 """Read all the new event datas. 89 90 Return: 91 An array contains all event data with event type and valuator 92 values. Sample format: 93 { 94 'deviceid': 11, 95 'evtype': 17, 96 'time': 406752437L, 97 'valuators': { 98 0: (396.0, -38.0), 99 1: (578.0, -21.0), 100 2: (22802890.0, 22802890.0), 101 3: (26145746.0, 26145746.0) 102 } 103 } 104 """ 105 data = [] 106 while xlib.XPending(self._display): 107 xevent = xlib.XEvent() 108 xlib.XNextEvent(self._display, ctypes.pointer(xevent)) 109 cookie = xevent.xcookie 110 111 # Get event valuator_data 112 result = xlib.XGetEventData(self._display, ctypes.pointer(cookie)) 113 if (not result or cookie.type != xlib.GenericEvent): 114 continue 115 116 raw_event_ptr = ctypes.cast(cookie.data, 117 ctypes.POINTER(xi2.XIRawEvent)) 118 raw_event = raw_event_ptr.contents 119 valuator_state = raw_event.valuators 120 121 # Two value arrays 122 val_ptr = valuator_state.values 123 val_idx = 0 124 raw_val_ptr = raw_event.raw_values 125 raw_val_idx = 0 126 127 valuator_data = {} 128 for i in range(valuator_state.mask_len): 129 if xi2.XIMaskIsSet(valuator_state.mask, i): 130 valuator_data[i] = (val_ptr[val_idx], 131 raw_val_ptr[raw_val_idx]) 132 val_idx += 1 133 raw_val_idx += 1 134 data.append({'deviceid': raw_event.deviceid, 135 'evtype': cookie.evtype, 136 'time': raw_event.time, 137 'valuators': valuator_data}) 138 return data 139 140 141if __name__ == '__main__': 142 reader = XI2Reader() 143 fd = reader.get_connection_number() 144 145 while True: 146 rl, _, _ = select.select([fd], [], []) 147 if fd not in rl: 148 break 149 print reader.read_pending_events() 150