1#!/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3# -*- coding: utf-8 -*-
4#
5# Copyright (c) 2017 Benjamin Tissoires <benjamin.tissoires@gmail.com>
6# Copyright (c) 2017 Red Hat, Inc.
7#
8# This program is free software: you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 2 of the License, or
11# (at your option) any later version.
12#
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License
19# along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
21import fcntl
22import functools
23import libevdev
24import os
25
26try:
27    import pyudev
28except ImportError:
29    raise ImportError("UHID is not supported due to missing pyudev dependency")
30
31import logging
32
33import hidtools.hid as hid
34from hidtools.uhid import UHIDDevice
35from hidtools.util import BusType
36
37from pathlib import Path
38from typing import Any, ClassVar, Dict, List, Optional, Tuple, Type, Union
39
40logger = logging.getLogger("hidtools.device.base_device")
41
42
43class SysfsFile(object):
44    def __init__(self, path):
45        self.path = path
46
47    def __set_value(self, value):
48        with open(self.path, "w") as f:
49            return f.write(f"{value}\n")
50
51    def __get_value(self):
52        with open(self.path) as f:
53            return f.read().strip()
54
55    @property
56    def int_value(self) -> int:
57        return int(self.__get_value())
58
59    @int_value.setter
60    def int_value(self, v: int) -> None:
61        self.__set_value(v)
62
63    @property
64    def str_value(self) -> str:
65        return self.__get_value()
66
67    @str_value.setter
68    def str_value(self, v: str) -> None:
69        self.__set_value(v)
70
71
72class LED(object):
73    def __init__(self, sys_path):
74        self.max_brightness = SysfsFile(sys_path / "max_brightness").int_value
75        self.__brightness = SysfsFile(sys_path / "brightness")
76
77    @property
78    def brightness(self) -> int:
79        return self.__brightness.int_value
80
81    @brightness.setter
82    def brightness(self, value: int) -> None:
83        self.__brightness.int_value = value
84
85
86class PowerSupply(object):
87    """Represents Linux power_supply_class sysfs nodes."""
88
89    def __init__(self, sys_path):
90        self._capacity = SysfsFile(sys_path / "capacity")
91        self._status = SysfsFile(sys_path / "status")
92        self._type = SysfsFile(sys_path / "type")
93
94    @property
95    def capacity(self) -> int:
96        return self._capacity.int_value
97
98    @property
99    def status(self) -> str:
100        return self._status.str_value
101
102    @property
103    def type(self) -> str:
104        return self._type.str_value
105
106
107class HIDIsReady(object):
108    """
109    Companion class that binds to a kernel mechanism
110    and that allows to know when a uhid device is ready or not.
111
112    See :meth:`is_ready` for details.
113    """
114
115    def __init__(self: "HIDIsReady", uhid: UHIDDevice) -> None:
116        self.uhid = uhid
117
118    def is_ready(self: "HIDIsReady") -> bool:
119        """
120        Overwrite in subclasses: should return True or False whether
121        the attached uhid device is ready or not.
122        """
123        return False
124
125
126class UdevHIDIsReady(HIDIsReady):
127    _pyudev_context: ClassVar[Optional[pyudev.Context]] = None
128    _pyudev_monitor: ClassVar[Optional[pyudev.Monitor]] = None
129    _uhid_devices: ClassVar[Dict[int, Tuple[bool, int]]] = {}
130
131    def __init__(self: "UdevHIDIsReady", uhid: UHIDDevice) -> None:
132        super().__init__(uhid)
133        self._init_pyudev()
134
135    @classmethod
136    def _init_pyudev(cls: Type["UdevHIDIsReady"]) -> None:
137        if cls._pyudev_context is None:
138            cls._pyudev_context = pyudev.Context()
139            cls._pyudev_monitor = pyudev.Monitor.from_netlink(cls._pyudev_context)
140            cls._pyudev_monitor.filter_by("hid")
141            cls._pyudev_monitor.start()
142
143            UHIDDevice._append_fd_to_poll(
144                cls._pyudev_monitor.fileno(), cls._cls_udev_event_callback
145            )
146
147    @classmethod
148    def _cls_udev_event_callback(cls: Type["UdevHIDIsReady"]) -> None:
149        if cls._pyudev_monitor is None:
150            return
151        event: pyudev.Device
152        for event in iter(functools.partial(cls._pyudev_monitor.poll, 0.02), None):
153            if event.action not in ["bind", "remove", "unbind"]:
154                return
155
156            logger.debug(f"udev event: {event.action} -> {event}")
157
158            id = int(event.sys_path.strip().split(".")[-1], 16)
159
160            device_ready, count = cls._uhid_devices.get(id, (False, 0))
161
162            ready = event.action == "bind"
163            if not device_ready and ready:
164                count += 1
165            cls._uhid_devices[id] = (ready, count)
166
167    def is_ready(self: "UdevHIDIsReady") -> Tuple[bool, int]:
168        try:
169            return self._uhid_devices[self.uhid.hid_id]
170        except KeyError:
171            return (False, 0)
172
173
174class EvdevMatch(object):
175    def __init__(
176        self: "EvdevMatch",
177        *,
178        requires: List[Any] = [],
179        excludes: List[Any] = [],
180        req_properties: List[Any] = [],
181        excl_properties: List[Any] = [],
182    ) -> None:
183        self.requires = requires
184        self.excludes = excludes
185        self.req_properties = req_properties
186        self.excl_properties = excl_properties
187
188    def is_a_match(self: "EvdevMatch", evdev: libevdev.Device) -> bool:
189        for m in self.requires:
190            if not evdev.has(m):
191                return False
192        for m in self.excludes:
193            if evdev.has(m):
194                return False
195        for p in self.req_properties:
196            if not evdev.has_property(p):
197                return False
198        for p in self.excl_properties:
199            if evdev.has_property(p):
200                return False
201        return True
202
203
204class EvdevDevice(object):
205    """
206    Represents an Evdev node and its properties.
207    This is a stub for the libevdev devices, as they are relying on
208    uevent to get the data, saving us some ioctls to fetch the names
209    and properties.
210    """
211
212    def __init__(self: "EvdevDevice", sysfs: Path) -> None:
213        self.sysfs = sysfs
214        self.event_node: Any = None
215        self.libevdev: Optional[libevdev.Device] = None
216
217        self.uevents = {}
218        # all of the interesting properties are stored in the input uevent, so in the parent
219        # so convert the uevent file of the parent input node into a dict
220        with open(sysfs.parent / "uevent") as f:
221            for line in f.readlines():
222                key, value = line.strip().split("=")
223                self.uevents[key] = value.strip('"')
224
225        # we open all evdev nodes in order to not miss any event
226        self.open()
227
228    @property
229    def name(self: "EvdevDevice") -> str:
230        assert "NAME" in self.uevents
231
232        return self.uevents["NAME"]
233
234    @property
235    def evdev(self: "EvdevDevice") -> Path:
236        return Path("/dev/input") / self.sysfs.name
237
238    def matches_application(
239        self: "EvdevDevice", application: str, matches: Dict[str, EvdevMatch]
240    ) -> bool:
241        if self.libevdev is None:
242            return False
243
244        if application in matches:
245            return matches[application].is_a_match(self.libevdev)
246
247        logger.error(
248            f"application '{application}' is unknown, please update/fix hid-tools"
249        )
250        assert False  # hid-tools likely needs an update
251
252    def open(self: "EvdevDevice") -> libevdev.Device:
253        self.event_node = open(self.evdev, "rb")
254        self.libevdev = libevdev.Device(self.event_node)
255
256        assert self.libevdev.fd is not None
257
258        fd = self.libevdev.fd.fileno()
259        flag = fcntl.fcntl(fd, fcntl.F_GETFD)
260        fcntl.fcntl(fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
261
262        return self.libevdev
263
264    def close(self: "EvdevDevice") -> None:
265        if self.libevdev is not None and self.libevdev.fd is not None:
266            self.libevdev.fd.close()
267            self.libevdev = None
268        if self.event_node is not None:
269            self.event_node.close()
270            self.event_node = None
271
272
273class BaseDevice(UHIDDevice):
274    # default _application_matches that matches nothing. This needs
275    # to be set in the subclasses to have get_evdev() working
276    _application_matches: Dict[str, EvdevMatch] = {}
277
278    def __init__(
279        self,
280        name,
281        application,
282        rdesc_str: Optional[str] = None,
283        rdesc: Optional[Union[hid.ReportDescriptor, str, bytes]] = None,
284        input_info=None,
285    ) -> None:
286        self._kernel_is_ready: HIDIsReady = UdevHIDIsReady(self)
287        if rdesc_str is None and rdesc is None:
288            raise Exception("Please provide at least a rdesc or rdesc_str")
289        super().__init__()
290        if name is None:
291            name = f"uhid gamepad test {self.__class__.__name__}"
292        if input_info is None:
293            input_info = (BusType.USB, 1, 2)
294        self.name = name
295        self.info = input_info
296        self.default_reportID = None
297        self.opened = False
298        self.started = False
299        self.application = application
300        self._input_nodes: Optional[list[EvdevDevice]] = None
301        if rdesc is None:
302            assert rdesc_str is not None
303            self.rdesc = hid.ReportDescriptor.from_human_descr(rdesc_str)  # type: ignore
304        else:
305            self.rdesc = rdesc  # type: ignore
306
307    @property
308    def power_supply_class(self: "BaseDevice") -> Optional[PowerSupply]:
309        ps = self.walk_sysfs("power_supply", "power_supply/*")
310        if ps is None or len(ps) < 1:
311            return None
312
313        return PowerSupply(ps[0])
314
315    @property
316    def led_classes(self: "BaseDevice") -> List[LED]:
317        leds = self.walk_sysfs("led", "**/max_brightness")
318        if leds is None:
319            return []
320
321        return [LED(led.parent) for led in leds]
322
323    @property
324    def kernel_is_ready(self: "BaseDevice") -> bool:
325        return self._kernel_is_ready.is_ready()[0] and self.started
326
327    @property
328    def kernel_ready_count(self: "BaseDevice") -> int:
329        return self._kernel_is_ready.is_ready()[1]
330
331    @property
332    def input_nodes(self: "BaseDevice") -> List[EvdevDevice]:
333        if self._input_nodes is not None:
334            return self._input_nodes
335
336        if not self.kernel_is_ready or not self.started:
337            return []
338
339        self._input_nodes = [
340            EvdevDevice(path)
341            for path in self.walk_sysfs("input", "input/input*/event*")
342        ]
343        return self._input_nodes
344
345    def match_evdev_rule(self, application, evdev):
346        """Replace this in subclasses if the device has multiple reports
347        of the same type and we need to filter based on the actual evdev
348        node.
349
350        returning True will append the corresponding report to
351        `self.input_nodes[type]`
352        returning False  will ignore this report / type combination
353        for the device.
354        """
355        return True
356
357    def open(self):
358        self.opened = True
359
360    def _close_all_opened_evdev(self):
361        if self._input_nodes is not None:
362            for e in self._input_nodes:
363                e.close()
364
365    def __del__(self):
366        self._close_all_opened_evdev()
367
368    def close(self):
369        self.opened = False
370
371    def start(self, flags):
372        self.started = True
373
374    def stop(self):
375        self.started = False
376        self._close_all_opened_evdev()
377
378    def next_sync_events(self, application=None):
379        evdev = self.get_evdev(application)
380        if evdev is not None:
381            return list(evdev.events())
382        return []
383
384    @property
385    def application_matches(self: "BaseDevice") -> Dict[str, EvdevMatch]:
386        return self._application_matches
387
388    @application_matches.setter
389    def application_matches(self: "BaseDevice", data: Dict[str, EvdevMatch]) -> None:
390        self._application_matches = data
391
392    def get_evdev(self, application=None):
393        if application is None:
394            application = self.application
395
396        if len(self.input_nodes) == 0:
397            return None
398
399        assert self._input_nodes is not None
400
401        if len(self._input_nodes) == 1:
402            evdev = self._input_nodes[0]
403            if self.match_evdev_rule(application, evdev.libevdev):
404                return evdev.libevdev
405        else:
406            for _evdev in self._input_nodes:
407                if _evdev.matches_application(application, self.application_matches):
408                    if self.match_evdev_rule(application, _evdev.libevdev):
409                        return _evdev.libevdev
410
411    def is_ready(self):
412        """Returns whether a UHID device is ready. Can be overwritten in
413        subclasses to add extra conditions on when to consider a UHID
414        device ready. This can be:
415
416        - we need to wait on different types of input devices to be ready
417          (Touch Screen and Pen for example)
418        - we need to have at least 4 LEDs present
419          (len(self.uhdev.leds_classes) == 4)
420        - or any other combinations"""
421        return self.kernel_is_ready
422