legoEv3/evdev/device.py
# encoding: utf-8
import os
from select import select
from collections import namedtuple
from evdev import _input, _uinput, ecodes, util
from evdev.events import InputEvent
#--------------------------------------------------------------------------
_AbsInfo = namedtuple('AbsInfo', ['value', 'min', 'max', 'fuzz', 'flat', 'resolution'])
_KbdInfo = namedtuple('KbdInfo', ['repeat', 'delay'])
_DeviceInfo = namedtuple('DeviceInfo', ['bustype', 'vendor', 'product', 'version'])
class AbsInfo(_AbsInfo):
'''
A ``namedtuple`` for storing absolut axis information -
corresponds to the ``input_absinfo`` struct:
**value**
Latest reported value for the axis.
**min**
Specifies minimum value for the axis.
**max**
Specifies maximum value for the axis.
**fuzz**
Specifies fuzz value that is used to filter noise from the
event stream.
**flat**
Values that are within this value will be discarded by joydev
interface and reported as 0 instead.
**resolution**
Specifies resolution for the values reported for the axis.
Resolution for main axes (``ABS_X, ABS_Y, ABS_Z``) is reported
in units per millimeter (units/mm), resolution for rotational
axes (``ABS_RX, ABS_RY, ABS_RZ``) is reported in units per
radian.
.. note: The input core does not clamp reported values to the
``[minimum, maximum]`` limits, such task is left to userspace.
'''
def __str__(self):
return 'val {}, min {}, max {}, fuzz {}, flat {}, res {}'.format(*self)
class KbdInfo(_KbdInfo):
'''
Keyboard repeat rate:
**repeat**
Keyboard repeat rate in characters per second.
**delay**
Amount of time that a key must be depressed before it will start
to repeat (in milliseconds).
'''
def __str__(self):
return 'repeat {}, delay {}'.format(*self)
class DeviceInfo(_DeviceInfo):
def __str__(self):
msg = 'bus: {:04x}, vendor {:04x}, product {:04x}, version {:04x}'
return msg.format(*self)
class InputDevice(object):
'''
A linux input device from which input events can be read.
'''
__slots__ = ('fn', 'fd', 'info', 'name', 'phys', '_rawcapabilities',
'version', 'ff_effects_count')
def __init__(self, dev):
'''
:param dev: path to input device
'''
#: Path to input device.
self.fn = dev
#: A non-blocking file descriptor to the device file.
self.fd = os.open(dev, os.O_RDWR | os.O_NONBLOCK)
# Returns (bustype, vendor, product, version, name, phys, capabilities).
info_res = _input.ioctl_devinfo(self.fd)
#: A :class:`DeviceInfo <evdev.device.DeviceInfo>` instance.
self.info = DeviceInfo(*info_res[:4])
#: The name of the event device.
self.name = info_res[4]
#: The physical topology of the device.
self.phys = info_res[5]
#: The evdev protocol version.
self.version = _input.ioctl_EVIOCGVERSION(self.fd)
#: The raw dictionary of device capabilities - see `:func:capabilities()`.
self._rawcapabilities = _input.ioctl_capabilities(self.fd)
#: The number of force feedback effects the device can keep in its memory.
self.ff_effects_count = _input.ioctl_EVIOCGEFFECTS(self.fd)
def __del__(self):
if hasattr(self, 'fd') and self.fd is not None:
try:
self.close()
except OSError:
pass
def _capabilities(self, absinfo=True):
res = {}
for etype, ecodes in self._rawcapabilities.items():
for code in ecodes:
l = res.setdefault(etype, [])
if isinstance(code, tuple):
if absinfo:
a = code[1] # (0, 0, 0, 255, 0, 0)
i = AbsInfo(*a)
l.append((code[0], i))
else:
l.append(code[0])
else:
l.append(code)
return res
def capabilities(self, verbose=False, absinfo=True):
'''
Return the event types that this device supports as a mapping of
supported event types to lists of handled event codes. Example::
{ 1: [272, 273, 274],
2: [0, 1, 6, 8] }
If ``verbose`` is ``True``, event codes and types will be resolved
to their names. Example::
{ ('EV_KEY', 1): [('BTN_MOUSE', 272),
('BTN_RIGHT', 273),
('BTN_MIDDLE', 273)],
('EV_REL', 2): [('REL_X', 0),
('REL_Y', 1),
('REL_HWHEEL', 6),
('REL_WHEEL', 8)] }
Unknown codes or types will be resolved to ``'?'``.
If ``absinfo`` is ``True``, the list of capabilities will also
include absolute axis information in the form of
:class:`AbsInfo` instances::
{ 3: [ (0, AbsInfo(min=0, max=255, fuzz=0, flat=0)),
(1, AbsInfo(min=0, max=255, fuzz=0, flat=0)) ]}
Combined with ``verbose`` the above becomes::
{ ('EV_ABS', 3): [ (('ABS_X', 0), AbsInfo(min=0, max=255, fuzz=0, flat=0)),
(('ABS_Y', 1), AbsInfo(min=0, max=255, fuzz=0, flat=0)) ]}
'''
if verbose:
return dict(util.resolve_ecodes(self._capabilities(absinfo)))
else:
return self._capabilities(absinfo)
def leds(self, verbose=False):
'''
Return currently set LED keys. For example::
[0, 1, 8, 9]
If ``verbose`` is ``True``, event codes are resolved to their
names. Unknown codes are resolved to ``'?'``. For example::
[('LED_NUML', 0), ('LED_CAPSL', 1), ('LED_MISC', 8), ('LED_MAIL', 9)]
'''
leds = _input.get_sw_led_snd(self.fd, ecodes.EV_LED)
if verbose:
return [(ecodes.LED[l] if l in ecodes.LED else '?', l) for l in leds]
return leds
def set_led(self, led_num, value):
'''
Set the state of the selected LED. For example::
device.set_led(ecodes.LED_NUML, 1)
..
'''
_uinput.write(self.fd, ecodes.EV_LED, led_num, value)
def __eq__(self, other):
'''Two devices are equal if their :data:`info` attributes are equal.'''
return self.info == other.info
def __str__(self):
msg = 'device {}, name "{}", phys "{}"'
return msg.format(self.fn, self.name, self.phys)
def __repr__(self):
msg = (self.__class__.__name__, self.fn)
return '{}({!r})'.format(*msg)
def close(self):
if self.fd > -1:
try:
os.close(self.fd)
finally:
self.fd = -1
def fileno(self):
'''
Return the file descriptor to the open event device. This
makes it possible to pass pass ``InputDevice`` instances
directly to :func:`select.select()` and
:class:`asyncore.file_dispatcher`.'''
return self.fd
def read_one(self):
'''
Read and return a single input event as an instance of
:class:`InputEvent <evdev.events.InputEvent>`.
Return ``None`` if there are no pending input events.
'''
# event -> (sec, usec, type, code, val)
event = _input.device_read(self.fd)
if event:
return InputEvent(*event)
def read_loop(self):
'''Enter an endless ``select()`` loop that yields input events.'''
while True:
r, w, x = select([self.fd], [], [])
for event in self.read():
yield event
def read(self):
'''
Read multiple input events from device. Return a generator
object that yields :class:`InputEvent
<evdev.events.InputEvent>` instances.
'''
# events -> [(sec, usec, type, code, val), ...]
events = _input.device_read_many(self.fd)
for i in events:
yield InputEvent(*i)
def grab(self):
'''
Grab input device using ``EVIOCGRAB`` - other applications will
be unable to receive events until the device is released. Only
one process can hold a ``EVIOCGRAB`` on a device.
.. warning:: Grabbing an already grabbed device will raise an
``IOError``.'''
_input.ioctl_EVIOCGRAB(self.fd, 1)
def ungrab(self):
'''Release device if it has been already grabbed (uses
`EVIOCGRAB`).
.. warning:: Releasing an already released device will raise an
``IOError('Invalid argument')``.'''
_input.ioctl_EVIOCGRAB(self.fd, 0)
def upload_effect(self, effect):
'''Upload a force feedback effect to a force feedback device.'''
data = bytes(buffer(effect)[:])
ff_id = _input.upload_effect(self.fd, data)
return ff_id
def erase_effect(self, ff_id):
'''Erase a force effect from a force feedback device. This
also stops the effect.'''
_input.erase_effect(self.fd, ff_id)
@property
def repeat(self):
'''Get or set the keyboard repeat rate (in characters per
minute) and delay (in milliseconds).'''
return KbdInfo(*_input.ioctl_EVIOCGREP(self.fd))
@repeat.setter
def repeat(self, value):
return _input.ioctl_EVIOCSREP(self.fd, *value)
def active_keys(self, verbose=False):
'''
Return currently active keys. Example::
[1, 42]
If ``verbose`` is ``True``, key codes are resolved to their
verbose names. Unknown codes are resolved to ``'?'``. For
example::
[('KEY_ESC', 1), ('KEY_LEFTSHIFT', 42)]
'''
active_keys = _input.ioctl_EVIOCGKEY(self.fd)
if verbose:
return [(ecodes.KEY[k] if k in ecodes.KEY else '?', k) for k in active_keys]
return active_keys