From 155262ade4d933a8dd50cb1669193c77be77c019 Mon Sep 17 00:00:00 2001 From: Devin Bayer Date: Wed, 11 Jun 2025 15:26:24 +0200 Subject: [PATCH] Proxy fixes and logging clarity improvements --- facedancer/filters/logging.py | 18 +- facedancer/filters/standard.py | 35 ++-- facedancer/proxy.py | 348 +++++++++++++++++++-------------- 3 files changed, 232 insertions(+), 169 deletions(-) diff --git a/facedancer/filters/logging.py b/facedancer/filters/logging.py index 8f7ad831..39f72b05 100644 --- a/facedancer/filters/logging.py +++ b/facedancer/filters/logging.py @@ -4,6 +4,8 @@ import datetime +from facedancer.request import USBControlRequest + from ..logging import log from . import USBProxyFilter @@ -23,7 +25,7 @@ def __init__(self, verbose=4, decoration=''): - def filter_control_in(self, req, data, stalled): + def filter_control_in(self, req: USBControlRequest | None, data, stalled): """ Log IN control requests without modification. """ @@ -33,7 +35,7 @@ def filter_control_in(self, req, data, stalled): return req, data, stalled if self.verbose > 3: - log.info("{} {}{}".format(self.timestamp(), self.decoration, repr(req))) + log.info("{} {}< control {}".format(self.timestamp(), self.decoration, req)) if self.verbose > 3 and stalled: log.info("{} {}< --STALLED-- ".format(self.timestamp(), self.decoration)) @@ -57,7 +59,7 @@ def filter_control_out(self, req, data): return req, data if self.verbose > 3: - log.info("{} {}{}".format(self.timestamp(), self.decoration, repr(req))) + log.info("{} {}> control {}".format(self.timestamp(), self.decoration, req)) if self.verbose > 4 and data: self._pretty_print_data(data, '>', self.decoration) @@ -101,16 +103,18 @@ def filter_out(self, ep_num, data): def timestamp(self): """ Generate a quick timestamp for printing. """ - return datetime.datetime.now().strftime("[%H:%M:%S]") + return datetime.datetime.now().strftime("[%H:%M:%S.%f]") def _magic_decode(self, data): """ Simple decode function that attempts to find a nice string representation for the console.""" try: return bytes(data).decode('utf-16le') except: - return bytes(data) + return bytes(data).hex(' ', 2) def _pretty_print_data(self, data, direction_marker, decoration='', is_string=False, ep_marker=''): - data = self._magic_decode(data) if is_string else bytes(data) - log.info("{} {}{}{}: {}".format(self.timestamp(), ep_marker, decoration, direction_marker, data)) + decoded = self._magic_decode(data) if is_string else bytes(data).hex(' ', 2) + if self.verbose < 6 and len(data) >= 30: + decoded = decoded[:30] + '…' + log.info("{} {}{}{}: {} ({})".format(self.timestamp(), ep_marker, decoration, direction_marker, decoded, len(data))) diff --git a/facedancer/filters/standard.py b/facedancer/filters/standard.py index 46d58eef..b3535a38 100644 --- a/facedancer/filters/standard.py +++ b/facedancer/filters/standard.py @@ -24,6 +24,8 @@ class USBProxySetupFilters(USBProxyFilter): MAX_PACKET_SIZE_EP0 = 64 + configurations: dict[int, USBDescriptor] + def __init__(self, device, verbose=0): self.device = device self.configurations = {} @@ -48,7 +50,7 @@ def filter_control_in(self, req, data, stalled): if descriptor_type == self.DESCRIPTOR_CONFIGURATION and req.length >= 32: configuration = USBDescribable.from_binary_descriptor(data) self.configurations[configuration.number] = configuration - if self.verbose > 0: + if self.verbose > 2: log.info("-- Storing configuration {} --".format(configuration)) @@ -56,10 +58,12 @@ def filter_control_in(self, req, data, stalled): # Patch our data to overwrite the maximum packet size on EP0. # See USBProxy.connect for a rationale on this. device = USBDescribable.from_binary_descriptor(data) - device.max_packet_size_ep0 = 64 - data = bytearray(device.get_descriptor())[:len(data)] - if self.verbose > 0: - log.info("-- Patched device descriptor. --") + old = device.max_packet_size_ep0 + if old != 64: + device.max_packet_size_ep0 = 64 + data = bytearray(device.get_descriptor())[:len(data)] + if self.verbose > 0: + log.info(f"-- Patched device descriptor. max_packet_size_ep0 {old} to 64 --") return req, data, stalled @@ -69,13 +73,14 @@ def filter_control_out(self, req, data): # handle it ourself, and absorb it. if req.get_recipient() == self.RECIPIENT_DEVICE and \ req.request == self.SET_ADDRESS_REQUEST: + log.info(f"setup: out - handle set_address {req}") req.acknowledge(blocking=True) self.device.set_address(req.value) return None, None - # Special case: if this is a SET_CONFIGURATION_REQUEST, - # pass it through, but also set up the Facedancer hardware - # in response. + # Special case: SET_CONFIGURATION_REQUEST + # libusb1 recommends always calling setConfiguration() instead + # of sending the control packet manually if req.get_recipient() == self.RECIPIENT_DEVICE and \ req.request == self.SET_CONFIGURATION_REQUEST: configuration_index = req.value @@ -84,8 +89,10 @@ def filter_control_out(self, req, data): if configuration_index in self.configurations: configuration = self.configurations[configuration_index] - if self.verbose > 0: + if self.verbose > 2: log.info("-- Applying configuration {} --".format(configuration)) + elif self.verbose > 0: + log.info(f"-- Applying configuration {configuration.get_identifier()} --") self.device.configured(configuration) @@ -94,13 +101,17 @@ def filter_control_out(self, req, data): else: log.warning("-- WARNING: Applying configuration {}, but we've never read that configuration's descriptor! --".format(configuration_index)) - # Special case: if this is a SET_INTERFACE_REQUEST, - # pass it through, but also tell the device so it can update - # its current configuration. + return None, None + + # Special case: SET_INTERFACE_REQUEST + # libusb1 recommends calling setInterfaceAlternative() if req.get_recipient() == self.RECIPIENT_INTERFACE and \ req.request == self.SET_INTERFACE_REQUEST: interface_number = req.index alternate = req.value + log.info(f"setup: out - handle SET_INTERFACE_REQUEST {req}") self.device.interface_changed(interface_number, alternate) + return None, None + return req, data diff --git a/facedancer/proxy.py b/facedancer/proxy.py index 7730d037..4e6b4559 100644 --- a/facedancer/proxy.py +++ b/facedancer/proxy.py @@ -1,25 +1,26 @@ # # This file is part of Facedancer. # -""" USB Proxy implementation. """ +"""USB Proxy implementation.""" import atexit import platform import usb1 import sys -from usb1 import USBError, USBErrorTimeout +from usb1 import USBError, USBErrorTimeout -from . import DeviceSpeed, USBConfiguration, USBDirection -from .device import USBBaseDevice -from .errors import DeviceNotFoundError -from .logging import log -from .request import USBControlRequest -from .types import USB +from . import DeviceSpeed, USBConfiguration, USBDirection +from .endpoint import USBEndpoint +from .device import USBBaseDevice +from .errors import DeviceNotFoundError +from .logging import log +from .request import USBControlRequest +from .types import USB class USBProxyDevice(USBBaseDevice): - """ USB Proxy Device """ + """USB Proxy Device""" name = "USB Proxy Device" @@ -29,7 +30,6 @@ def __init__(self, index=0, quirks=[], scheduler=None, **kwargs): """ Sets up a new USBProxy instance. """ - # Finally, initialize our base class with a minimal set of # parameters. We'll do almost nothing, as we'll be proxying # packets by default to the device. @@ -41,15 +41,16 @@ def __init__(self, index=0, quirks=[], scheduler=None, **kwargs): # Find the device to proxy matching the given keyword arguments... usb_devices = list(self.proxied_device.find(find_all=True, **kwargs)) if len(usb_devices) <= index: - raise DeviceNotFoundError(f"Could not find device to proxy.") + raise DeviceNotFoundError("Could not find device to proxy.") device = usb_devices[index] # Open a connection to the proxied device and attempt to # detach it from any kernel-side driver that may prevent us # from communicating with it... - device_handle = self.proxied_device.open(device, detach=True) - log.info(f"Found {self.proxied_device.device_speed().name} speed device to proxy: {device}") - + self.proxied_device.open(device, detach=True) + log.info( + f"Found {self.proxied_device.device_speed().name} speed device to proxy: {device}" + ) def add_filter(self, filter_object, head=False): """ @@ -60,7 +61,6 @@ def add_filter(self, filter_object, head=False): else: self.filter_list.append(filter_object) - def connect(self): """ Initialize this device. We perform a reduced initialization, as we really @@ -89,7 +89,6 @@ def connect(self): # skipping USB.state_attached may not be strictly correct (9.1.1.{1,2}) self.state = USB.state_powered - # - event handlers -------------------------------------------------------- def configured(self, configuration: USBConfiguration): @@ -102,18 +101,21 @@ def configured(self, configuration: USBConfiguration): configuration: The configuration to be applied. """ + self.proxied_device.setConfiguration(configuration.number) + # All interfaces on the configuration are set to their default setting. configuration.active_interfaces = { - interface.number : interface - for interface in configuration.get_interfaces() - if interface.alternate == 0 + interface.number: interface + for interface in configuration.get_interfaces() + if interface.alternate == 0 } # Pass our configuration on to the core device. self.backend.configured(configuration) - configuration.parent = self # FIXME Not great semantics + configuration.parent = self # FIXME Not great semantics self.configuration = configuration + self._ack_status_stage() def interface_changed(self, interface_number: int, alternate: int): """ @@ -126,14 +128,17 @@ def interface_changed(self, interface_number: int, alternate: int): interface_number: The interface number. alternate: The alternate setting to be applied. """ + log.debug(f"set interface num={interface_number} alt={alternate}") + self.proxied_device.setInterface(interface_number, alternate) + identifier = (interface_number, alternate) interface = self.configuration.interfaces[identifier] self.configuration.active_interfaces[interface_number] = interface - + self._ack_status_stage() def handle_bus_reset(self): super().handle_bus_reset() - + self.proxied_device.reset() def handle_request(self, request: USBControlRequest): """ @@ -145,20 +150,18 @@ def handle_request(self, request: USBControlRequest): else: self._proxy_out_control_request(request) - def handle_get_configuration_request(self, request): super().handle_get_configuration_request(request) - def handle_get_descriptor_request(self, request): super().handle_get_descriptor_request(request) - - def handle_data_available(self, ep_num, data): + def handle_data_received(self, endpoint, data): """ Handles the case where data is ready from the Facedancer device that needs to be proxied to the target device. """ + ep_num = endpoint.number # Run the data through all of our filters. for f in self.filter_list: @@ -168,7 +171,7 @@ def handle_data_available(self, ep_num, data): if data: try: self.proxied_device.write(ep_num, data) - except USBError as e: + except USBError: stalled = True for f in self.filter_list: @@ -177,31 +180,61 @@ def handle_data_available(self, ep_num, data): if stalled: self.backend.stall_endpoint(0, USBDirection.OUT) + def handle_data_requested(self, endpoint: USBEndpoint): + """Handler called when the host requests data on a non-control endpoint. - def handle_nak(self, ep_num): - """ - Handles a NAK, which means that the target asked the proxied device - to participate in a transfer. We use this as our cue to participate - in communications. - """ - # Make sure the endpoint exists for the current configuration - # before attempting to handle NAK events. - # Skip handling OUT endpoints, as we handle those in handle_data_available. - endpoint = self.configuration.get_endpoint(ep_num, USBDirection.IN) - if endpoint is None: - return + Typically, this method will delegate the request to the appropriate + configuration+interface+endpoint. If overridden, the + overriding function will receive all events. + Args: + endpoint_number : The endpoint number on which the host requested data. + """ # TODO: Currently, we use this for _all_ non-control transfers, as we # don't e.g. periodically schedule isochronous or interrupt transfers. # We probably should set up those to be independently scheduled and # then limit this to only bulk endpoints. - self._proxy_in_transfer(endpoint) + ep_num = endpoint.number + # Filter the "IN token" generated by the target device. We can use this + # to e.g. change the endpoint before proxying to the target device, or + # to absorb a packet before it's proxied. + for f in self.filter_list: + ep_num = f.filter_in_token(ep_num) + + if ep_num is None: + return + + try: + # Quick hack to improve responsiveness on interrupt endpoints. + if endpoint.interval: + data = self.proxied_device.read( + ep_num, endpoint.max_packet_size, timeout=endpoint.interval + ) + else: + data = self.proxied_device.read(ep_num, endpoint.max_packet_size) + + except usb1.USBErrorPipe: + self.proxied_device.clear_halt(ep_num, USBDirection.IN) + return + except USBErrorTimeout: + return + + # Run the data through all of our filters. + for f in self.filter_list: + ep_num, data = f.filter_in(endpoint.number, data) + + # If our data wasn't filtered out, transmit it to the target! + if data: + if not endpoint.get_device(): + log.warning("endpoint has no device") + return + endpoint.send(data) # - helpers --------------------------------------------------------------- - def _ack_status_stage(self, blocking=False): - self.backend.ack_status_stage(blocking=blocking) + def _ack_status_stage(self): + self.backend.ack_status_stage(blocking=True) def _proxy_in_control_request(self, request: USBControlRequest): """ @@ -236,14 +269,14 @@ def _proxy_in_control_request(self, request: USBControlRequest): index=request.index, length=request.length, ) - except USBError as e: + except USBError: stalled = True # Run filters here. for f in self.filter_list: request, data, stalled = f.filter_control_in(request, data, stalled) - #... and proxy it to our victim. + # ... and proxy it to our victim. if stalled: # TODO: allow stalling of eps other than 0! self.backend.stall_endpoint(0, USBDirection.IN) @@ -251,7 +284,6 @@ def _proxy_in_control_request(self, request: USBControlRequest): # TODO: support control endpoints other than 0 self.control_send(0, request, data) - def _proxy_out_control_request(self, request: USBControlRequest): """ Proxy OUT requests, which sends a request from the victim to the @@ -271,75 +303,35 @@ def _proxy_out_control_request(self, request: USBControlRequest): request=request.request, value=request.value, index=request.index, - data=data + data=data, ) self._ack_status_stage() # Special case: we've stalled, allow the filters to decide what to do. - except USBError as e: + except USBError: stalled = True for f in self.filter_list: - request, data, stalled = f.handle_out_request_stall(request, data, stalled) + request, data, stalled = f.handle_out_request_stall( + request, data, stalled + ) if stalled: self.backend.stall_endpoint(0, USBDirection.OUT) - def _proxy_in_transfer(self, endpoint): - """ - Proxy OUT requests, which sends a request from the target device to the - victim, at the target's request. - """ - - ep_num = endpoint.number - - # Filter the "IN token" generated by the target device. We can use this - # to e.g. change the endpoint before proxying to the target device, or - # to absorb a packet before it's proxied. - for f in self.filter_list: - ep_num = f.filter_in_token(ep_num) - - if ep_num is None: - return - - try: - # Quick hack to improve responsiveness on interrupt endpoints. - if endpoint.interval: - data = self.proxied_device.read(ep_num, endpoint.max_packet_size, timeout=endpoint.interval) - else: - data = self.proxied_device.read(ep_num, endpoint.max_packet_size) - - except usb1.USBErrorPipe: - self.proxied_device.clear_halt(ep_num, USBDirection.IN) - return - except USBErrorTimeout: - return - - # Run the data through all of our filters. - for f in self.filter_list: - ep_num, data = f.filter_in(endpoint.number, data) - - # If our data wasn't filtered out, transmit it to the target! - if data: - endpoint.send(data) - - - class LibUSB1Device: - """ A wrapper around the proxied device based on libusb1. """ - + """A wrapper around the proxied device based on libusb1.""" """ Class variable that stores our global libusb library context. """ - context = None + context: usb1.USBContext | None = None """ Class variable that stores our device handle. """ - device_handle = None - + device_handle: usb1.USBDeviceHandle | None = None @classmethod - def _get_libusb_context(cls): - """ Retrieves the libusb context we'll use to fetch libusb device instances. """ + def _get_libusb_context(cls) -> usb1.USBContext: + """Retrieves the libusb context we'll use to fetch libusb device instances.""" # If we don't have a libusb context, create one. if cls.context is None: @@ -348,23 +340,16 @@ def _get_libusb_context(cls): return cls.context + @classmethod + def _handle(cls) -> usb1.USBDeviceHandle: + assert cls.device_handle is not None + return cls.device_handle @classmethod def _destroy_libusb_context(cls): - """ Destroys our libusb context on closing our python instance. """ + """Destroys our libusb context on closing our python instance.""" if cls.device_handle is not None: - device = cls.device_handle.getDevice() - number = cls.device_handle.getConfiguration() - active_configuration = next(filter(lambda c: c.getConfigurationValue() == number, device), None) - if active_configuration: - for interface in active_configuration: - number = interface[0].getNumber() - try: - cls.device_handle.releaseInterface(number) - except usb1.USBErrorNotFound as e: - log.warning(f"Failed to releace interface {0} for {device}") - pass - + cls._release() cls.device_handle.close() cls.device_handle = None @@ -372,44 +357,28 @@ def _destroy_libusb_context(cls): cls.context.close() cls.context = None - @classmethod - def open(cls, device, detach=True): + def open(cls, device: usb1.USBDevice, detach=True): + log.debug(f"opening {device}") cls.device_handle = device.open() try: cls.device_handle.setAutoDetachKernelDriver(detach) - except usb1.USBErrorNotSupported: - pass - - number = cls.device_handle.getConfiguration() - active_configuration = next(filter(lambda c: c.getConfigurationValue() == number, device), None) - if active_configuration: - for interface in active_configuration: - number = interface[0].getNumber() - try: - cls.device_handle.claimInterface(number) - except usb1.USBErrorAccess: - log.error(f"Failed to claim interface {number} for {device}") - if platform.system() == "Darwin": - log.error("You may need to run your proxy code as root.\n") - elif platform.system() == "Linux": - log.error("Please ensure you have configured an entry for the device in your") - log.error("/etc/udev/rules.d directory.\n") - elif platform.system() == "Windows": - log.error("You may need to experiment with the Zadig driver to access the device.\n") - sys.exit(1) + except usb1.USBErrorNotSupported as e: + log.debug(f"setAutoDetachKernelDriver: {e}") - return cls.device_handle + cls._claim() + return cls.device_handle # TODO adapt logic from pygreat usb1.py @classmethod def find(cls, idVendor, idProduct, find_all=True): - """ Finds a USB device by its identifiers. """ + """Finds a USB device by its identifiers.""" matching_devices = [] context = cls._get_libusb_context() + device: usb1.USBDevice for device in context.getDeviceList(): if device.getVendorID() == idVendor and device.getProductID() == idProduct: matching_devices.append(device) @@ -421,55 +390,132 @@ def find(cls, idVendor, idProduct, find_all=True): else: return None + @classmethod + def setInterface(cls, interface: int, alt: int): + """libusb1 recommends always using this instead of sending control packets.""" + log.info(f"LibUSB1Device setInterface {interface} alt {alt}") + cls._handle().setInterfaceAltSetting(interface, alt) @classmethod - def device_speed(cls): - return DeviceSpeed(cls.device_handle.getDevice().getDeviceSpeed()) + def setConfiguration(cls, number: int): + """libusb1 recommends always using this instead of sending control packets.""" + old = cls._handle().getConfiguration() + if old == number: + log.info(f"LibUSB1Device keeping configuration {number}") + return + log.info(f"LibUSB1Device set configuration {old} → {number}") + + # prevent kernel driver from re-attaching + cls._handle().setAutoDetachKernelDriver(0) + + cls._release() + cls._handle().setConfiguration(number) + cls._claim() + + cls._handle().setAutoDetachKernelDriver(1) @classmethod - def controlRead(cls, request_type, request, value, index, length, timeout=1000): - return cls.device_handle.controlRead(request_type, request, value, index, length, timeout) + def _release(cls): + active_configuration = cls.active_configuration() + if not active_configuration: + return + for interface in active_configuration: + number = interface[0].getNumber() + try: + log.info(f"Release interface {number}") + cls._handle().releaseInterface(number) + except usb1.USBErrorNotFound: + log.warning(f"Failed to release interface {number}") @classmethod - def controlWrite(cls, request_type, request, value, index, data, timeout=1000): - return cls.device_handle.controlWrite(request_type, request, value, index, data, timeout) + def _claim(cls): + log.info("Claiming interfaces") + active_configuration = cls.active_configuration() + if not active_configuration: + return + + for interface in active_configuration: + number = interface[0].getNumber() + try: + log.info(f"Claiming interface {number}") + cls._handle().claimInterface(number) + except usb1.USBErrorAccess: + log.error(f"Failed to claim interface {number}") + if platform.system() == "Darwin": + log.error("You may need to run your proxy code as root.\n") + elif platform.system() == "Linux": + log.error( + "Please ensure you have configured an entry for the device in your" + ) + log.error("/etc/udev/rules.d directory.\n") + elif platform.system() == "Windows": + log.error( + "You may need to experiment with the Zadig driver to access the device.\n" + ) + sys.exit(1) + + @classmethod + def active_configuration(cls) -> usb1.USBConfiguration | None: + device = cls._handle().getDevice() + number = cls._handle().getConfiguration() + for cfg in device: + if cfg.getConfigurationValue() == number: + return cfg + + @classmethod + def device_speed(cls): + return DeviceSpeed(cls._handle().getDevice().getDeviceSpeed()) + @classmethod + def controlRead(cls, request_type, request, value, index, length, timeout=1000): + return cls._handle().controlRead( + request_type, request, value, index, length, timeout + ) + + @classmethod + def controlWrite(cls, request_type, request, value, index, data, timeout=1000): + return cls._handle().controlWrite( + request_type, request, value, index, data, timeout + ) @classmethod def read(cls, endpoint_number, length, timeout=1000): # Avoid accidental uses of endpoint address - endpoint_number = endpoint_number & 0x7f + endpoint_number = endpoint_number & 0x7F # TODO support interrupt endpoints - return cls.device_handle.bulkRead(endpoint_number, length, timeout) - + return cls._handle().bulkRead(endpoint_number, length, timeout) @classmethod def write(cls, endpoint_number, data, timeout=1000): # TODO support interrupt endpoints - return cls.device_handle.bulkWrite(endpoint_number, data, timeout) - + return cls._handle().bulkWrite(endpoint_number, data, timeout) @classmethod def clear_halt(cls, endpoint_number, direction): endpoint_address = direction.to_endpoint_address(endpoint_number) - return cls.device_handle.clearHalt(endpoint_address) + return cls._handle().clearHalt(endpoint_address) + + @classmethod + def reset(cls): + log.info("LibUSB1Device reset") + cls._handle().resetDevice() + cls._claim() if __name__ == "__main__": - from . import FacedancerUSBApp - from .filters.standard import USBProxySetupFilters - from .filters.logging import USBProxyPrettyPrintFilter + from .filters.standard import USBProxySetupFilters + from .filters.logging import USBProxyPrettyPrintFilter # akai midimix - VENDOR_ID = 0x09e8 + VENDOR_ID = 0x09E8 PRODUCT_ID = 0x0031 # xbox controller - #VENDOR_ID = 0x045e - #PRODUCT_ID = 0x02d1 + # VENDOR_ID = 0x045e + # PRODUCT_ID = 0x02d1 device = USBProxyDevice(idVendor=VENDOR_ID, idProduct=PRODUCT_ID) @@ -478,7 +524,9 @@ def clear_halt(cls, endpoint_number, direction): async def configure_logging(): import logging + logging.getLogger("facedancer").setLevel(logging.INFO) from facedancer import main + main(device, configure_logging())