Skip to content
22 changes: 19 additions & 3 deletions bots/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,24 @@
# Do this first to initialize your bot
bot = SilverbackBot()

# You can use environment variables to load sensitive data
# NOTE: Use `os.environ["<NAME>"]` if you want your bot to fail without it
api_key = os.environ.get("SOME_SERVICE_API_KEY", "badkey")
# NOTE: You can use `silverback cluster vars` commands to configure variable groups

# Cannot call `bot.state` outside of an bot function handler
# bot.state.something # NOTE: raises AttributeError

# NOTE: Don't do any networking until after initializing bot
usdc = tokens["USDC"]


# We can add "parameters", which are values in `bot.state` that are able to be updated
# by external triggers while the bot is in runtime
bot.add_parameter("bad_number", default=3)
# NOTE: the value specified by `default=` is only used if no value exists in the snapshot


@bot.on_startup()
def bot_startup(startup_state: StateSnapshot):
# This is called just as the bot is put into "run" state,
Expand All @@ -37,6 +48,11 @@ def bot_startup(startup_state: StateSnapshot):
# This is a great place to set `bot.state` values
bot.state.logs_processed = 0
# NOTE: Can put anything here, any python object works
# NOTE: Non-parameter state is `None` by default

# Parameters are loaded from state snapshot (or set to the default above)
assert bot.state.bad_number == startup_state.bad_number or 3
# NOTE: parameter values are stored in the state snapshot for successive runs

return {"block_number": startup_state.last_block_seen}

Expand Down Expand Up @@ -74,10 +90,10 @@ def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]):
@bot.on_(usdc.Transfer)
# NOTE: Typing isn't required, it will still be an Ape `ContractLog` type
def exec_event1(log):
if log.log_index % 7 == 3:
if log.log_index % 7 == bot.state.bad_number:
# If you raise any exception, Silverback will track the failure and keep running
# NOTE: By default, if you have 3 tasks fail in a row, the bot will shutdown itself
raise ValueError("I don't like the number 3.")
raise ValueError(f"I don't like the number {bot.state.bad_number}.")

# You can update state whenever you want
bot.state.logs_processed += 1
Expand All @@ -97,7 +113,7 @@ async def handle_mints(log):
# Any handler function can be async too
async def exec_event2(log: ContractLog):
token = Token.at(log.contract_address)
# All `bot.state` values are updated across all workers at the same time
# All `bot.state` values (including parameters) are updated across all workers at the same time
bot.state.logs_processed += 1
# Do any other long running tasks...
await asyncio.sleep(5)
Expand Down
4 changes: 4 additions & 0 deletions docs/commands/run.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ CLI commands for local development of running Silverback bots and task workers.
.. click:: silverback._cli:worker
:prog: silverback worker
:nested: none

.. click:: silverback._cli:set_param
:prog: silverback set-param
:nested: none
41 changes: 41 additions & 0 deletions docs/userguides/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,47 @@ as this could lead to extremely undesirable behavior, stuck transactions, transa
or **loss of funds**.
```

### Managed Parameters

Silverback has support for defining numeric/boolean values in `bot.state` that can be updated during runtime operation of the bot.
This is very useful for implementing features in your bot such as operational modes, parametrized signal processing algorithms that can be adjusted in real-time, or otherwise allowing the operational behavior of your bot to become configurable above and beyond what is possible through simple environment variables.
Further, these parameters are backed up and stored through Silverback's state snapshotting feature, which means they retain their changes during bot resets and new deployments on the [Silverback Platform](./managing.html).

To use this feature in your bot, you will use the `bot.add_parameter` function to define your parameter's name and default value (the value that is loaded in `bot.state.<your_parameter>` if no value is detected in the state snapshot on loading).
Here's an example:

```py
...

bot = SilverbackBot()

bot.add_parameter("my_parameter", default=0.1)
```

You can then access this value inside of any bot handler functions via `bot.state.my_parameter` or `bot.state["my_parameter"]`:

```py
...

@bot.on_(chain.blocks)
async def measure_something(block):
bot.state.measurement *= bot.state.my_parameter * block.base_fee

...
```

```{warning}
Since parameters are loaded into `bot.state`, they are not accessible outside of your bot's handler functions.
It is also recommended that you do not modify them dynamically, although that behavior is allowed.
```

```{note}
Parameter definitions are defined under `bot.parameters` but do not contain their current value, which must be accessed through `bot.state`.
However, this can be useful if you need to access your parameter's properties such as the default value.
```

If you want to test modifying your parameter when testing locally, first set your model up using [Distributed Execution](#distributed-execution), and then use the [`silverback set-param`](../commands/run.html#silverback-set-param) command.

## Running your Bot

Once you have programmed your bot,
Expand Down
77 changes: 77 additions & 0 deletions silverback/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import secrets
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from decimal import Decimal
from pathlib import Path
from typing import TYPE_CHECKING, Optional

Expand Down Expand Up @@ -37,6 +38,7 @@
token_amount_callback,
)
from .exceptions import ClientError
from .types import ScalarType, is_scalar_type
from .utils import get_chain_info

if TYPE_CHECKING:
Expand Down Expand Up @@ -154,6 +156,81 @@ def run(cli_ctx, account, runner_class, record, recorder_class, max_exceptions,
asyncio.run(runner.run(), debug=debug)


def convert_param(val) -> ScalarType:
if is_scalar_type(val):
return val

elif val.lower() in ("f", "false"):
return False

elif val.lower() in ("t", "true"):
return True

try:
return int(val)

except Exception:
pass

try:
return float(val)

except Exception:
pass

# NOTE: Decimal allows the most values, so leave last
return Decimal(val)


def convert_param_kwargs(ctx, param, values) -> dict[str, ScalarType]:
converted_params = {}
for kwarg in values:
name, value = kwarg.split("=")
converted_params[name] = convert_param(value)

return converted_params


@cli.command(cls=ConnectedProviderCommand, help="Set parameters against a running silverback app")
@network_option(
default=os.environ.get("SILVERBACK_NETWORK_CHOICE", "auto"),
callback=_network_callback,
)
@click.option(
"-p",
"--param",
"param_updates",
multiple=True,
callback=convert_param_kwargs,
)
@click.argument("bot", required=False, callback=bot_path_callback)
def set_param(param_updates, bot):
if len(param_updates) == 0:
raise click.UsageError("Must change at least one param via `-p/--param`")

elif len(param_updates) > 1:
kicker = bot._batch_set_param
args = [param_updates]

else:
kicker = bot._set_param
args = list(param_updates.items())[0]

async def set_parameters():
await bot.broker.startup()
task = await kicker.kiq(*args)
result = await task.wait_result()
await bot.broker.shutdown()

if result.is_err:
raise click.UsageError(str(result.error))

else:
click.echo(result.return_value)

asyncio.run(set_parameters())


@cli.command(section="Local Commands")
@click.option(
"--use-docker",
Expand Down
88 changes: 85 additions & 3 deletions silverback/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
)
from .settings import Settings
from .state import StateSnapshot
from .types import ScalarType, SilverbackID, TaskType
from .types import ParamChange, ScalarType, SilverbackID, TaskType, is_scalar_type
from .utils import encode_topics_to_string, parse_hexbytes_dict


Expand All @@ -51,6 +51,12 @@ def __hash__(self) -> int:
return hash(self.model_dump_json())


class ParameterInfo(BaseModel):
default: ScalarType | None

# NOTE: Any other fields should have defaults


class SharedState(defaultdict):
"""
Class containing the bot shared state that all workers can read from and write to.
Expand Down Expand Up @@ -192,6 +198,15 @@ def call_override(account_instance, txn, *args, **kwargs):
self._create_snapshot = self.__register_system_task(
TaskType.SYSTEM_CREATE_SNAPSHOT, self.__create_snapshot_handler
)
self._set_param = self.__register_system_task(
TaskType.SYSTEM_SET_PARAM, self.__param_set_handler
)
self._batch_set_param = self.__register_system_task(
TaskType.SYSTEM_SET_PARAM_BATCH, self.__batch_param_set_handler
)

# NOTE: Parameters are create via `add_parameter`
self.__parameters: dict[str, ParameterInfo] = {}

def __register_system_task(
self, task_type: TaskType, task_handler: Callable
Expand Down Expand Up @@ -249,16 +264,83 @@ async def __load_snapshot_handler(self, startup_state: StateSnapshot):
startup_state.last_nonce_used or -1, self.signer.nonce - 1
)

# TODO: Load user custom state (should not start with `system:`)
# NOTE: We need to load defaults in case user parameters are not in the snapshot yet
for parameter_name, parameter_info in self.parameters.items():
self.state[parameter_name] = parameter_info.default

# Load parameters from snapshot into state
for parameter_name, parameter_value in startup_state.parameters.items():
if parameter_name.startswith("system:"):
logger.error(f"Cannot restore '{parameter_name}'")
continue

elif parameter_name in self.parameters:
# NOTE: Keep both of these in sync (primarily for debugging)
self.__parameters[parameter_name].default = parameter_value

self.state[parameter_name] = parameter_value

async def __create_snapshot_handler(self) -> StateSnapshot:
return StateSnapshot(
# TODO: Migrate these to parameters (remove explicitly from state)
last_block_seen=self.state.get("system:last_block_seen", -1),
last_block_processed=self.state.get("system:last_block_processed", -1),
last_nonce_used=self.state.get("system:last_nonce_used"),
parameters={
param_name: param_value
for param_name in self.parameters
# NOTE: Do not backup invalid parameters
if is_scalar_type(param_value := self.state.get(param_name))
},
)

@property
def parameters(self) -> dict[str, ParameterInfo]:
# NOTE: makes this variable read-only
return self.__parameters

def add_parameter(self, param_name: str, default: ScalarType | None = None):
if "system:" in param_name:
raise ValueError("Cannot override system parameters")

if param_name in self.parameters:
raise ValueError(f"{param_name} already added!")

if default and not is_scalar_type(default):
raise ValueError(f"Default value type '{type(default)}' is not a valid scalar type.")

# Update this to track parameter existance/default value/update handler
self.__parameters[param_name] = ParameterInfo(default=default)

async def __param_set_handler(self, param_name: str, new_value: ScalarType) -> ParamChange:
if "system:" in param_name:
raise ValueError(f"Cannot update system parameter '{param_name}'")

elif param_name not in self.parameters:
raise ValueError(f"Unrecognized parameter '{param_name}'")

change = ParamChange(old=self.state[param_name], new=new_value)
logger.success(f"Update: app.state['{param_name}'] = {new_value}")
self.state[param_name] = new_value
return change

async def __batch_param_set_handler(
self, parameter_updates: dict[str, ScalarType]
) -> dict[str, ParamChange]:
datapoints = {}
for param_name, new_value in parameter_updates.items():
if "system:" in param_name:
logger.error(f"Cannot update system parameter '{param_name}'")

elif param_name not in self.parameters:
logger.error(f"Unrecognized parameter '{param_name}'")

else:
datapoints[param_name] = ParamChange(old=self.state[param_name], new=new_value)
logger.success(f"Update: app.state['{param_name}'] = {new_value}")
self.state[param_name] = new_value

return datapoints

@property
def nonce(self) -> int:
if not self.signer:
Expand Down
18 changes: 17 additions & 1 deletion silverback/state.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from pathlib import Path
from typing import Any

from pydantic import BaseModel, Field

from .types import SilverbackID, UTCTimestamp, utc_now
from .types import ScalarType, SilverbackID, UTCTimestamp, utc_now


class StateSnapshot(BaseModel):
Expand All @@ -17,10 +18,25 @@ class StateSnapshot(BaseModel):
# Last nonce used by signer
last_nonce_used: int | None = None

# User-defined parameters
parameters: dict[str, ScalarType] = {}

# Last time the state was updated
# NOTE: intended to use default when creating a model with this type
last_updated: UTCTimestamp = Field(default_factory=utc_now)

def __dir__(self) -> list[str]:
return sorted([*super().__dir__(), *self.parameters])

def __getattr__(self, attr: str) -> Any:
try:
return super().__getattr__(attr) # type: ignore[misc]
except AttributeError:
if val := self.parameters.get(attr):
return val

raise AttributeError(f"'{self.__class__.__qualname__}' object has no attribute '{attr}'")


class Datastore:
"""
Expand Down
Loading
Loading