From 8b13468c1a5a687f58d16173c37e6fd4896518bb Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Wed, 23 Apr 2025 11:42:28 -0400 Subject: [PATCH 1/9] refactor(example): show how to load environment variables --- bots/example.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/bots/example.py b/bots/example.py index c15ab0db..0edbfd64 100644 --- a/bots/example.py +++ b/bots/example.py @@ -19,6 +19,11 @@ # Do this first to initialize your bot bot = SilverbackBot() +# You can use environment variables to load sensitive data +# NOTE: Use `os.environ[""]` if you want your bot to fail without it +api_key = os.environ.get("SOME_SERVICE_API_KEY", "badkey") +# NOTE: You can use `silverback cluster vars` commands to configure variable groups + # Cannot call `bot.state` outside of an bot function handler # bot.state.something # NOTE: raises AttributeError From 1861981b65142989ebb391ce9d776532684aca49 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Wed, 23 Apr 2025 11:51:22 -0400 Subject: [PATCH 2/9] refactor: add utility function to determine if something is ScalarType --- silverback/types.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/silverback/types.py b/silverback/types.py index f7d3b75f..a5ab34d8 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -2,7 +2,7 @@ from datetime import datetime, timezone from decimal import Decimal from enum import Enum # NOTE: `enum.StrEnum` only in Python 3.11+ -from typing import Literal +from typing import Any, Literal, get_args from ape.logging import get_logger from pydantic import BaseModel, Field, RootModel, ValidationError, model_validator @@ -73,6 +73,14 @@ def as_row(self) -> "ScalarType | dict": # This is okay, preferable actually, because it means we can store ints outside that range +def is_scalar_type(val: Any) -> bool: + """Check if `val` is a `ScalarType` type""" + return any( + isinstance(val, d_type.__origin__ if hasattr(d_type, "__origin__") else d_type) + for d_type in get_args(ScalarType) + ) + + class ScalarDatapoint(_BaseDatapoint): type: Literal["scalar"] = "scalar" data: ScalarType From 3f5d981bc1b5857dcafc9f7521760b34f3d68a93 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Wed, 23 Apr 2025 12:39:45 -0400 Subject: [PATCH 3/9] refactor(State): add user parameters to StateSnapshot --- silverback/main.py | 16 +++++++++++++--- silverback/state.py | 18 +++++++++++++++++- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/silverback/main.py b/silverback/main.py index 91dd1ddf..84f96c8b 100644 --- a/silverback/main.py +++ b/silverback/main.py @@ -27,7 +27,7 @@ ) from .settings import Settings from .state import StateSnapshot -from .types import ScalarType, SilverbackID, TaskType +from .types import ScalarType, SilverbackID, TaskType, is_scalar_type from .utils import encode_topics_to_string, parse_hexbytes_dict @@ -249,14 +249,24 @@ async def __load_snapshot_handler(self, startup_state: StateSnapshot): startup_state.last_nonce_used or -1, self.signer.nonce - 1 ) - # TODO: Load user custom state (should not start with `system:`) + # Load parameters from snapshot into state + for parameter_name, parameter_value in startup_state.parameters.items(): + if parameter_name.startswith("system:"): + logger.error(f"Cannot restore '{parameter_name}'") + else: + self.state[parameter_name] = parameter_value async def __create_snapshot_handler(self) -> StateSnapshot: return StateSnapshot( - # TODO: Migrate these to parameters (remove explicitly from state) last_block_seen=self.state.get("system:last_block_seen", -1), last_block_processed=self.state.get("system:last_block_processed", -1), last_nonce_used=self.state.get("system:last_nonce_used"), + parameters={ + param_name: param_value + for param_name in self.parameters + # NOTE: Do not backup invalid parameters + if is_scalar_type(param_value := self.state.get(param_name)) + }, ) @property diff --git a/silverback/state.py b/silverback/state.py index 523a9176..26e8143c 100644 --- a/silverback/state.py +++ b/silverback/state.py @@ -1,8 +1,9 @@ from pathlib import Path +from typing import Any from pydantic import BaseModel, Field -from .types import SilverbackID, UTCTimestamp, utc_now +from .types import ScalarType, SilverbackID, UTCTimestamp, utc_now class StateSnapshot(BaseModel): @@ -17,10 +18,25 @@ class StateSnapshot(BaseModel): # Last nonce used by signer last_nonce_used: int | None = None + # User-defined parameters + parameters: dict[str, ScalarType] = {} + # Last time the state was updated # NOTE: intended to use default when creating a model with this type last_updated: UTCTimestamp = Field(default_factory=utc_now) + def __dir__(self) -> list[str]: + return sorted([*super().__dir__(), *self.parameters]) + + def __getattr__(self, attr: str) -> Any: + try: + return super().__getattr__(attr) # type: ignore[misc] + except AttributeError: + if val := self.parameters.get(attr): + return val + + raise AttributeError(f"'{self.__class__.__qualname__}' object has no attribute '{attr}'") + class Datastore: """ From 7c3268387c2bb818f51714410191d3bc6b0ab9f6 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Wed, 23 Apr 2025 12:54:22 -0400 Subject: [PATCH 4/9] feat: add managed parameters support --- bots/example.py | 17 ++++++++++++++--- silverback/main.py | 40 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/bots/example.py b/bots/example.py index 0edbfd64..42c6d321 100644 --- a/bots/example.py +++ b/bots/example.py @@ -31,6 +31,12 @@ usdc = tokens["USDC"] +# We can add "parameters", which are values in `bot.state` that are able to be updated +# by external triggers while the bot is in runtime +bot.add_parameter("bad_number", default=3) +# NOTE: the value specified by `default=` is only used if no value exists in the snapshot + + @bot.on_startup() def bot_startup(startup_state: StateSnapshot): # This is called just as the bot is put into "run" state, @@ -42,6 +48,11 @@ def bot_startup(startup_state: StateSnapshot): # This is a great place to set `bot.state` values bot.state.logs_processed = 0 # NOTE: Can put anything here, any python object works + # NOTE: Non-parameter state is `None` by default + + # Parameters are loaded from state snapshot (or set to the default above) + assert bot.state.bad_number == startup_state.bad_number or 3 + # NOTE: parameter values are stored in the state snapshot for successive runs return {"block_number": startup_state.last_block_seen} @@ -79,10 +90,10 @@ def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]): @bot.on_(usdc.Transfer) # NOTE: Typing isn't required, it will still be an Ape `ContractLog` type def exec_event1(log): - if log.log_index % 7 == 3: + if log.log_index % 7 == bot.state.bad_number: # If you raise any exception, Silverback will track the failure and keep running # NOTE: By default, if you have 3 tasks fail in a row, the bot will shutdown itself - raise ValueError("I don't like the number 3.") + raise ValueError(f"I don't like the number {bot.state.bad_number}.") # You can update state whenever you want bot.state.logs_processed += 1 @@ -102,7 +113,7 @@ async def handle_mints(log): # Any handler function can be async too async def exec_event2(log: ContractLog): token = Token.at(log.contract_address) - # All `bot.state` values are updated across all workers at the same time + # All `bot.state` values (including parameters) are updated across all workers at the same time bot.state.logs_processed += 1 # Do any other long running tasks... await asyncio.sleep(5) diff --git a/silverback/main.py b/silverback/main.py index 84f96c8b..66a847d1 100644 --- a/silverback/main.py +++ b/silverback/main.py @@ -51,6 +51,12 @@ def __hash__(self) -> int: return hash(self.model_dump_json()) +class ParameterInfo(BaseModel): + default: ScalarType | None + + # NOTE: Any other fields should have defaults + + class SharedState(defaultdict): """ Class containing the bot shared state that all workers can read from and write to. @@ -193,6 +199,9 @@ def call_override(account_instance, txn, *args, **kwargs): TaskType.SYSTEM_CREATE_SNAPSHOT, self.__create_snapshot_handler ) + # NOTE: Parameters are create via `add_parameter` + self.__parameters: dict[str, ParameterInfo] = {} + def __register_system_task( self, task_type: TaskType, task_handler: Callable ) -> AsyncTaskiqDecoratedTask: @@ -249,12 +258,21 @@ async def __load_snapshot_handler(self, startup_state: StateSnapshot): startup_state.last_nonce_used or -1, self.signer.nonce - 1 ) + # NOTE: We need to load defaults in case user parameters are not in the snapshot yet + for parameter_name, parameter_info in self.parameters.items(): + self.state[parameter_name] = parameter_info.default + # Load parameters from snapshot into state for parameter_name, parameter_value in startup_state.parameters.items(): if parameter_name.startswith("system:"): logger.error(f"Cannot restore '{parameter_name}'") - else: - self.state[parameter_name] = parameter_value + continue + + elif parameter_name in self.parameters: + # NOTE: Keep both of these in sync (primarily for debugging) + self.__parameters[parameter_name].default = parameter_value + + self.state[parameter_name] = parameter_value async def __create_snapshot_handler(self) -> StateSnapshot: return StateSnapshot( @@ -269,6 +287,24 @@ async def __create_snapshot_handler(self) -> StateSnapshot: }, ) + @property + def parameters(self) -> dict[str, ParameterInfo]: + # NOTE: makes this variable read-only + return self.__parameters + + def add_parameter(self, param_name: str, default: ScalarType | None = None): + if "system:" in param_name: + raise ValueError("Cannot override system parameters") + + if param_name in self.parameters: + raise ValueError(f"{param_name} already added!") + + if default and not is_scalar_type(default): + raise ValueError(f"Default value type '{type(default)}' is not a valid scalar type.") + + # Update this to track parameter existance/default value/update handler + self.__parameters[param_name] = ParameterInfo(default=default) + @property def nonce(self) -> int: if not self.signer: From fbef74edd8fba71304336fbfe9eeb2d6b99211b0 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Wed, 23 Apr 2025 12:55:33 -0400 Subject: [PATCH 5/9] refactor(types): add ParamChange datapoint type for parameter updates --- silverback/types.py | 53 +++++++++++++++++++++++++-------------------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/silverback/types.py b/silverback/types.py index a5ab34d8..d3eff39a 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -5,7 +5,7 @@ from typing import Any, Literal, get_args from ape.logging import get_logger -from pydantic import BaseModel, Field, RootModel, ValidationError, model_validator +from pydantic import BaseModel, Field, RootModel, model_validator from pydantic.functional_serializers import PlainSerializer from typing_extensions import Annotated @@ -92,11 +92,22 @@ def as_row(self) -> ScalarType: return self.data +class ParamChange(_BaseDatapoint): + type: Literal["setparam"] = "setparam" + old: ScalarType | None + new: ScalarType + + # NOTE: Other datapoint types must be explicitly defined as subclasses of `_BaseDatapoint` # Users will have to import and use these directly # NOTE: Other datapoint types must be added to this union -Datapoint = ScalarDatapoint +Datapoint = ScalarDatapoint | ParamChange + + +def is_datapoint(val: Any) -> bool: + """`val` is a `Datapoint` type""" + return any(isinstance(val, d_type) for d_type in get_args(Datapoint)) class Datapoints(RootModel): @@ -104,28 +115,22 @@ class Datapoints(RootModel): @model_validator(mode="before") def parse_datapoints(cls, datapoints: dict) -> dict: - names_to_remove: dict[str, ValidationError] = {} - # Automatically convert raw scalar types - for name in datapoints: - if isinstance(datapoints[name], dict) and "type" in datapoints[name]: - try: - datapoints[name] = ScalarDatapoint.model_validate(datapoints[name]) - except ValidationError as e: - names_to_remove[name] = e - elif not isinstance(datapoints[name], Datapoint): - try: - datapoints[name] = ScalarDatapoint(data=datapoints[name]) - except ValidationError as e: - names_to_remove[name] = e - - # Prune and raise a warning about unconverted datapoints - for name in names_to_remove: - data = datapoints.pop(name) - logger.warning( - f"Cannot convert datapoint '{name}' of type '{type(data)}': {names_to_remove[name]}" - ) - - return datapoints + successfully_parsed_datapoints = {} + for name, datapoint in datapoints.items(): + if is_datapoint(datapoint): + successfully_parsed_datapoints[name] = datapoint + + elif is_scalar_type(datapoint): + # Automatically convert raw scalar types into datapoints + successfully_parsed_datapoints[name] = ScalarDatapoint(data=datapoints[name]) + + else: + # Prune and raise a warning about unconverted datapoints + logger.warning( + f"Cannot convert datapoint '{name}' of type '{type(datapoint)}': {datapoint}" + ) + + return successfully_parsed_datapoints # Add dict methods def get(self, key: str, default: Datapoint | None = None) -> Datapoint | None: From 4f0424f280ae7c9a080861aee662025b025fe301 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Wed, 23 Apr 2025 12:56:38 -0400 Subject: [PATCH 6/9] feat(Runner): add param update system tasks --- silverback/main.py | 38 +++++++++++++++++++++++++++++++++++++- silverback/types.py | 2 ++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/silverback/main.py b/silverback/main.py index 66a847d1..36d5a375 100644 --- a/silverback/main.py +++ b/silverback/main.py @@ -27,7 +27,7 @@ ) from .settings import Settings from .state import StateSnapshot -from .types import ScalarType, SilverbackID, TaskType, is_scalar_type +from .types import ParamChange, ScalarType, SilverbackID, TaskType, is_scalar_type from .utils import encode_topics_to_string, parse_hexbytes_dict @@ -198,6 +198,12 @@ def call_override(account_instance, txn, *args, **kwargs): self._create_snapshot = self.__register_system_task( TaskType.SYSTEM_CREATE_SNAPSHOT, self.__create_snapshot_handler ) + self._set_param = self.__register_system_task( + TaskType.SYSTEM_SET_PARAM, self.__param_set_handler + ) + self._batch_set_param = self.__register_system_task( + TaskType.SYSTEM_SET_PARAM_BATCH, self.__batch_param_set_handler + ) # NOTE: Parameters are create via `add_parameter` self.__parameters: dict[str, ParameterInfo] = {} @@ -305,6 +311,36 @@ def add_parameter(self, param_name: str, default: ScalarType | None = None): # Update this to track parameter existance/default value/update handler self.__parameters[param_name] = ParameterInfo(default=default) + async def __param_set_handler(self, param_name: str, new_value: ScalarType) -> ParamChange: + if "system:" in param_name: + raise ValueError(f"Cannot update system parameter '{param_name}'") + + elif param_name not in self.parameters: + raise ValueError(f"Unrecognized parameter '{param_name}'") + + change = ParamChange(old=self.state[param_name], new=new_value) + logger.success(f"Update: app.state['{param_name}'] = {new_value}") + self.state[param_name] = new_value + return change + + async def __batch_param_set_handler( + self, parameter_updates: dict[str, ScalarType] + ) -> dict[str, ParamChange]: + datapoints = {} + for param_name, new_value in parameter_updates.items(): + if "system:" in param_name: + logger.error(f"Cannot update system parameter '{param_name}'") + + elif param_name not in self.parameters: + logger.error(f"Unrecognized parameter '{param_name}'") + + else: + datapoints[param_name] = ParamChange(old=self.state[param_name], new=new_value) + logger.success(f"Update: app.state['{param_name}'] = {new_value}") + self.state[param_name] = new_value + + return datapoints + @property def nonce(self) -> int: if not self.signer: diff --git a/silverback/types.py b/silverback/types.py index d3eff39a..72f3be90 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -19,6 +19,8 @@ class TaskType(str, Enum): SYSTEM_USER_ALL_TASKDATA = "system:user-all-taskdata" SYSTEM_LOAD_SNAPSHOT = "system:load-snapshot" SYSTEM_CREATE_SNAPSHOT = "system:create-snapshot" + SYSTEM_SET_PARAM = "system:set-param" + SYSTEM_SET_PARAM_BATCH = "system:batch-param" # User-accessible Tasks STARTUP = "user:startup" From f8fe3582db59a9994c296fae7e40d9219ff7c278 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Wed, 23 Apr 2025 12:57:46 -0400 Subject: [PATCH 7/9] feat(CLI): add `silverback set-param` cli command to update parameters --- docs/commands/run.rst | 4 +++ silverback/_cli.py | 77 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+) diff --git a/docs/commands/run.rst b/docs/commands/run.rst index 4d11b86c..d5816930 100644 --- a/docs/commands/run.rst +++ b/docs/commands/run.rst @@ -9,3 +9,7 @@ CLI commands for local development of running Silverback bots and task workers. .. click:: silverback._cli:worker :prog: silverback worker :nested: none + +.. click:: silverback._cli:set_param + :prog: silverback set-param + :nested: none diff --git a/silverback/_cli.py b/silverback/_cli.py index 4e6330d3..832c14d3 100644 --- a/silverback/_cli.py +++ b/silverback/_cli.py @@ -3,6 +3,7 @@ import secrets from collections import defaultdict from datetime import datetime, timedelta, timezone +from decimal import Decimal from pathlib import Path from typing import TYPE_CHECKING, Optional @@ -37,6 +38,7 @@ token_amount_callback, ) from .exceptions import ClientError +from .types import ScalarType, is_scalar_type from .utils import get_chain_info if TYPE_CHECKING: @@ -154,6 +156,81 @@ def run(cli_ctx, account, runner_class, record, recorder_class, max_exceptions, asyncio.run(runner.run(), debug=debug) +def convert_param(val) -> ScalarType: + if is_scalar_type(val): + return val + + elif val.lower() in ("f", "false"): + return False + + elif val.lower() in ("t", "true"): + return True + + try: + return int(val) + + except Exception: + pass + + try: + return float(val) + + except Exception: + pass + + # NOTE: Decimal allows the most values, so leave last + return Decimal(val) + + +def convert_param_kwargs(ctx, param, values) -> dict[str, ScalarType]: + converted_params = {} + for kwarg in values: + name, value = kwarg.split("=") + converted_params[name] = convert_param(value) + + return converted_params + + +@cli.command(cls=ConnectedProviderCommand, help="Set parameters against a running silverback app") +@network_option( + default=os.environ.get("SILVERBACK_NETWORK_CHOICE", "auto"), + callback=_network_callback, +) +@click.option( + "-p", + "--param", + "param_updates", + multiple=True, + callback=convert_param_kwargs, +) +@click.argument("bot", required=False, callback=bot_path_callback) +def set_param(param_updates, bot): + if len(param_updates) == 0: + raise click.UsageError("Must change at least one param via `-p/--param`") + + elif len(param_updates) > 1: + kicker = bot._batch_set_param + args = [param_updates] + + else: + kicker = bot._set_param + args = list(param_updates.items())[0] + + async def set_parameters(): + await bot.broker.startup() + task = await kicker.kiq(*args) + result = await task.wait_result() + await bot.broker.shutdown() + + if result.is_err: + raise click.UsageError(str(result.error)) + + else: + click.echo(result.return_value) + + asyncio.run(set_parameters()) + + @cli.command(section="Local Commands") @click.option( "--use-docker", From cf10603a878acb35b4e8dd3289fe22e48ba5b9b5 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Wed, 23 Apr 2025 14:40:07 -0400 Subject: [PATCH 8/9] docs(Dev): add section on defining and using user-managed parameters --- docs/userguides/development.md | 41 ++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/docs/userguides/development.md b/docs/userguides/development.md index 73866c53..85d16772 100644 --- a/docs/userguides/development.md +++ b/docs/userguides/development.md @@ -394,6 +394,47 @@ as this could lead to extremely undesirable behavior, stuck transactions, transa or **loss of funds**. ``` +### Managed Parameters + +Silverback has support for defining numeric/boolean values in `bot.state` that can be updated during runtime operation of the bot. +This is very useful for implementing features in your bot such as operational modes, parametrized signal processing algorithms that can be adjusted in real-time, or otherwise allowing the operational behavior of your bot to become configurable above and beyond what is possible through simple environment variables. +Further, these parameters are backed up and stored through Silverback's state snapshotting feature, which means they retain their changes during bot resets and new deployments on the [Silverback Platform](./managing.html). + +To use this feature in your bot, you will use the `bot.add_parameter` function to define your parameter's name and default value (the value that is loaded in `bot.state.` if no value is detected in the state snapshot on loading). +Here's an example: + +```py +... + +bot = SilverbackBot() + +bot.add_parameter("my_parameter", default=0.1) +``` + +You can then access this value inside of any bot handler functions via `bot.state.my_parameter` or `bot.state["my_parameter"]`: + +```py +... + +@bot.on_(chain.blocks) +async def measure_something(block): + bot.state.measurement *= bot.state.my_parameter * block.base_fee + +... +``` + +```{warning} +Since parameters are loaded into `bot.state`, they are not accessible outside of your bot's handler functions. +It is also recommended that you do not modify them dynamically, although that behavior is allowed. +``` + +```{note} +Parameter definitions are defined under `bot.parameters` but do not contain their current value, which must be accessed through `bot.state`. +However, this can be useful if you need to access your parameter's properties such as the default value. +``` + +If you want to test modifying your parameter when testing locally, first set your model up using [Distributed Execution](#distributed-execution), and then use the [`silverback set-param`](../commands/run.html#silverback-set-param) command. + ## Running your Bot Once you have programmed your bot, From 78245027c031e563b0e76ccfec01117d519a4854 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Wed, 14 Jan 2026 13:23:35 -0500 Subject: [PATCH 9/9] fix(Types): added missing abstractmethods to new datapoint --- silverback/types.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/silverback/types.py b/silverback/types.py index 72f3be90..f5b18995 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -99,6 +99,12 @@ class ParamChange(_BaseDatapoint): old: ScalarType | None new: ScalarType + def render(self) -> str: + return str(self.as_row()) + + def as_row(self) -> dict: + return dict(old=self.old, new=self.new) + # NOTE: Other datapoint types must be explicitly defined as subclasses of `_BaseDatapoint` # Users will have to import and use these directly