Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion py/generate_bidi.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,8 @@ def generate_code(self, enhancements: dict[str, Any] | None = None) -> str:
local_imports.append("from selenium.webdriver.common.bidi.common import command_builder")
if self.events:
local_imports.append(
"from selenium.webdriver.common.bidi._event_manager import EventConfig, _EventWrapper, _EventManager"
"from selenium.webdriver.common.bidi._event_manager import "
"EventConfig, Subscription, _EventWrapper, _EventManager"
)

code += "\n".join(stdlib_imports) + "\n"
Expand Down
180 changes: 177 additions & 3 deletions py/private/_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,18 @@

from __future__ import annotations

import logging
import queue
import threading
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any

from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.bidi.session import Session

logger = logging.getLogger(__name__)


@dataclass
class EventConfig:
Expand Down Expand Up @@ -86,6 +91,133 @@ def _camel_to_snake(name: str) -> str:
return "".join(result)


_UNSET = object()


class Subscription:
"""A pending expectation for a single BiDi event.

The event handler is registered when the subscription is created, so an
event fired by an action inside a ``with`` block is captured without a
race between the action and the wait::

with driver.network.expect_response("**/api/**") as response_info:
driver.find_element(By.ID, "load").click()
response = response_info.value

Exiting the ``with`` block waits for a matching event (raising
:class:`~selenium.common.exceptions.TimeoutException` if none arrives
within ``timeout``) and then removes the event handler. Outside a
``with`` block, call :meth:`wait` (or read :attr:`value`) to block until
the event arrives, and :meth:`cancel` to stop listening without waiting.

Exactly the first matching event is captured; matches arriving after the
subscription detaches are silently discarded.
"""

def __init__(
self,
register: Callable[[Callable], Any],
unregister: Callable[[Any], None],
predicate: Callable[[Any], bool] | None = None,
timeout: float = 30.0,
transform: Callable[[Any], Any] | None = None,
description: str = "event",
):
self._unregister = unregister
self._predicate = predicate
self._timeout = timeout
self._transform = transform
self._description = description
self._events: queue.SimpleQueue = queue.SimpleQueue()
self._value = _UNSET
self._detached = False
self._detach_lock = threading.Lock()
self._cleanups: list[Callable[[], None]] = []
self._token = register(self._on_event)

def _on_event(self, params: Any) -> None:
if self._detached:
return
try:
value = self._transform(params) if self._transform else params
if self._predicate is None or self._predicate(value):
self._events.put(value)
except Exception:
logger.exception("Predicate or transform for %s raised; event dropped", self._description)

def wait(self, timeout: float | None = None) -> Any:
"""Block until a matching event arrives and return it.

The first matching event is cached: later calls (and :attr:`value`)
return it without waiting again. The event handler is removed once a
match is captured; on timeout it stays registered so the wait can be
retried — call :meth:`cancel` to stop listening early.

Args:
timeout: Seconds to wait; defaults to the subscription's timeout.

Raises:
TimeoutException: If no matching event arrives in time.
"""
if self._value is not _UNSET:
return self._value
timeout = self._timeout if timeout is None else timeout
try:
self._value = self._events.get(timeout=timeout)
except queue.Empty:
raise TimeoutException(f"Timed out after {timeout}s waiting for {self._description}") from None
self._detach()
return self._value

@property
def value(self) -> Any:
"""The captured event, waiting for it first if necessary."""
return self.wait()

def cancel(self) -> None:
"""Stop listening without waiting for an event."""
self._detach()

def add_cleanup(self, cleanup: Callable[[], None]) -> None:
"""Run ``cleanup`` when the subscription detaches.

Used by ``expect_*`` helpers that register companion event handlers
(e.g. ``expect_download``) so those are removed alongside this one.
"""
self._cleanups.append(cleanup)

def _detach(self) -> None:
with self._detach_lock:
if self._detached:
return
self._detached = True
# Unregister and run cleanups outside the lock: they perform BiDi
# I/O and must not block a concurrent detach attempt.
try:
self._unregister(self._token)
except Exception:
logger.exception("Failed to remove event handler for %s", self._description)
for cleanup in self._cleanups:
try:
cleanup()
except Exception:
logger.exception("Subscription cleanup for %s failed", self._description)

def __enter__(self) -> Subscription:
return self

def __exit__(self, exc_type, exc, tb) -> bool:
if exc_type is None:
try:
self.wait()
finally:
self._detach()
else:
self._detach()
return False


class _EventManager:
"""Manages event subscriptions and callbacks."""

Expand All @@ -94,6 +226,7 @@ def __init__(self, conn, event_configs: dict[str, EventConfig]):
self.event_configs = event_configs
self.subscriptions: dict = {}
self._event_wrappers = {} # Cache of _EventWrapper objects
self._raw_wrappers = {} # Cache of raw-dict _EventWrapper objects
self._bidi_to_class = {config.bidi_event: config.event_class for config in event_configs.values()}
self._available_events = ", ".join(sorted(event_configs.keys()))
self._subscription_lock = threading.Lock()
Expand Down Expand Up @@ -144,10 +277,20 @@ def remove_callback_from_tracking(self, bidi_event: str, callback_id: int) -> No
if entry and callback_id in entry["callbacks"]:
entry["callbacks"].remove(callback_id)

def add_event_handler(self, event: str, callback: Callable, contexts: list[str] | None = None) -> int:
def add_event_handler(
self, event: str, callback: Callable, contexts: list[str] | None = None, raw: bool = False
) -> int:
event_config = self.validate_event(event)
# Use the event wrapper for add_callback
event_wrapper = self._event_wrappers.get(event_config.bidi_event)
# Use the event wrapper for add_callback. Raw handlers receive the
# unfiltered wire-level params dict instead of the typed dataclass,
# which may not carry every event field.
if raw:
event_wrapper = self._raw_wrappers.get(event_config.bidi_event)
if event_wrapper is None:
event_wrapper = _EventWrapper(event_config.bidi_event, dict)
self._raw_wrappers[event_config.bidi_event] = event_wrapper
else:
event_wrapper = self._event_wrappers.get(event_config.bidi_event)
callback_id = self.conn.add_callback(event_wrapper, callback)
self.subscribe_to_event(event_config.bidi_event, contexts)
self.add_callback_to_tracking(event_config.bidi_event, callback_id)
Expand All @@ -160,6 +303,37 @@ def remove_event_handler(self, event: str, callback_id: int) -> None:
self.remove_callback_from_tracking(event_config.bidi_event, callback_id)
self.unsubscribe_from_event(event_config.bidi_event)

def expect(
self,
event: str,
predicate: Callable[[Any], bool] | None = None,
timeout: float = 30.0,
transform: Callable[[Any], Any] | None = None,
raw: bool = False,
contexts: list[str] | None = None,
) -> Subscription:
"""Return a :class:`Subscription` capturing the next matching ``event``.

Args:
event: The event key to subscribe to.
predicate: Optional filter applied to each (transformed) event;
the first event for which it returns true is captured.
timeout: Default seconds the subscription waits for a match.
transform: Optional conversion applied to the event payload
before the predicate sees it and before it is returned.
raw: When true the handler receives the wire-level params dict
instead of the typed event dataclass.
contexts: Optional browsing context IDs to subscribe to.
"""
return Subscription(
register=lambda callback: self.add_event_handler(event, callback, contexts, raw=raw),
unregister=lambda callback_id: self.remove_event_handler(event, callback_id),
predicate=predicate,
timeout=timeout,
transform=transform,
description=f"event '{event}'",
)

def clear_event_handlers(self) -> None:
"""Clear all event handlers."""
with self._subscription_lock:
Expand Down
15 changes: 15 additions & 0 deletions py/private/_network_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,21 @@ def globs_to_url_patterns(patterns: list | None) -> list[dict] | None:
return translated or None


def to_url_predicate(url_or_predicate) -> Callable | None:
"""Normalize an ``expect_*`` filter into a predicate over wrapped events.

A string is treated as a URL glob (``*``, ``**``, ``?``) matched against
the event's ``url`` attribute; a callable is returned unchanged; ``None``
matches everything.
"""
if url_or_predicate is None:
return None
if callable(url_or_predicate):
return url_or_predicate
regex = glob_to_regex(str(url_or_predicate))
return lambda event: bool(regex.match(event.url or ""))


class Request:
"""Wraps a BiDi network request event and provides request action methods.

Expand Down
Loading
Loading