diff --git a/Dockerfile b/Dockerfile index d718c673..788d02c4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,11 +16,13 @@ RUN pip wheel . --wheel-dir=/wheels # Install from wheels FROM ghcr.io/apeworx/ape:${BASE_APE_IMAGE_TAG:-latest} USER root -COPY --from=builder /wheels /wheels +COPY --from=builder /wheels/*.whl /wheels RUN pip install --upgrade pip \ - && pip install silverback \ + && pip install \ + --no-cache-dir --no-index --find-links=/wheels \ 'taskiq-sqs>=0.0.11' \ - --no-cache-dir --find-links=/wheels + silverback + USER harambe ENTRYPOINT ["silverback"] diff --git a/example.py b/example.py index 900b79e8..70737d54 100644 --- a/example.py +++ b/example.py @@ -7,29 +7,29 @@ from ape_tokens import tokens # type: ignore[import] from taskiq import Context, TaskiqDepends, TaskiqState -from silverback import CircuitBreaker, SilverbackApp, StateSnapshot +from silverback import CircuitBreaker, SilverbackBot, StateSnapshot -# Do this first to initialize your app -app = SilverbackApp() +# Do this first to initialize your bot +bot = SilverbackBot() -# Cannot call `app.state` outside of an app function handler -# app.state.something # NOTE: raises AttributeError +# Cannot call `bot.state` outside of an bot function handler +# bot.state.something # NOTE: raises AttributeError -# NOTE: Don't do any networking until after initializing app +# NOTE: Don't do any networking until after initializing bot USDC = tokens["USDC"] YFI = tokens["YFI"] -@app.on_startup() -def app_startup(startup_state: StateSnapshot): - # This is called just as the app is put into "run" state, +@bot.on_startup() +def bot_startup(startup_state: StateSnapshot): + # This is called just as the bot is put into "run" state, # and handled by the first available worker # Any exception raised on startup aborts immediately: # raise Exception # NOTE: raises StartupFailure - # This is a great place to set `app.state` values - app.state.logs_processed = 0 + # This is a great place to set `bot.state` values + bot.state.logs_processed = 0 # NOTE: Can put anything here, any python object works return {"block_number": startup_state.last_block_seen} @@ -41,7 +41,7 @@ def execute(self, query: str): pass # Handle query somehow... -@app.on_worker_startup() +@bot.on_worker_startup() # NOTE: This event is triggered internally, do not use unless you know what you're doing def worker_startup(worker_state: TaskiqState): # NOTE: You need the type hint to load worker state # NOTE: Worker state is per-worker, not shared with other workers @@ -51,12 +51,12 @@ def worker_startup(worker_state: TaskiqState): # NOTE: You need the type hint t # Any exception raised on worker startup aborts immediately: # raise Exception # NOTE: raises StartupFailure - # Cannot call `app.state` because it is not set up yet on worker startup functions - # app.state.something # NOTE: raises AttributeError + # Cannot call `bot.state` because it is not set up yet on worker startup functions + # bot.state.something # NOTE: raises AttributeError # This is how we trigger off of new blocks -@app.on_(chain.blocks) +@bot.on_(chain.blocks) # NOTE: The type hint for block is `BlockAPI`, but we parse it using `EcosystemAPI` # NOTE: If you need something from worker state, you have to use taskiq context def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]): @@ -66,48 +66,48 @@ def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]): # This is how we trigger off of events # Set new_block_timeout to adjust the expected block time. -@app.on_(USDC.Transfer, start_block=19784367, new_block_timeout=25) +@bot.on_(USDC.Transfer, start_block=19784367, new_block_timeout=25) # NOTE: Typing isn't required, it will still be an Ape `ContractLog` type def exec_event1(log): if log.log_index % 7 == 3: # If you raise any exception, Silverback will track the failure and keep running - # NOTE: By default, if you have 3 tasks fail in a row, the app will shutdown itself + # NOTE: By default, if you have 3 tasks fail in a row, the bot will shutdown itself raise ValueError("I don't like the number 3.") # You can update state whenever you want - app.state.logs_processed += 1 + bot.state.logs_processed += 1 return {"amount": log.amount} -@app.on_(YFI.Approval) +@bot.on_(YFI.Approval) # Any handler function can be async too async def exec_event2(log: ContractLog): - # All `app.state` values are updated across all workers at the same time - app.state.logs_processed += 1 + # All `bot.state` values are updated across all workers at the same time + bot.state.logs_processed += 1 # Do any other long running tasks... await asyncio.sleep(5) return log.amount -@app.on_(chain.blocks) +@bot.on_(chain.blocks) # NOTE: You can have multiple handlers for any trigger we support def check_logs(log): - if app.state.logs_processed > 20: - # If you ever want the app to immediately shutdown under some scenario, raise this exception + if bot.state.logs_processed > 20: + # If you ever want the bot to immediately shutdown under some scenario, raise this exception raise CircuitBreaker("Oopsie!") # A final job to execute on Silverback shutdown -@app.on_shutdown() -def app_shutdown(): +@bot.on_shutdown() +def bot_shutdown(): # NOTE: Any exception raised on worker shutdown is ignored: # raise Exception return {"some_metric": 123} # Just in case you need to release some resources or something inside each worker -@app.on_worker_shutdown() +@bot.on_worker_shutdown() def worker_shutdown(state: TaskiqState): # NOTE: You need the type hint here # This is a good time to release resources state.db = None diff --git a/silverback/__init__.py b/silverback/__init__.py index a366aba1..d393af90 100644 --- a/silverback/__init__.py +++ b/silverback/__init__.py @@ -1,10 +1,10 @@ -from .application import SilverbackApp +from .main import SilverbackBot from .exceptions import CircuitBreaker, SilverbackException from .state import StateSnapshot __all__ = [ "StateSnapshot", "CircuitBreaker", - "SilverbackApp", + "SilverbackBot", "SilverbackException", ] diff --git a/silverback/_cli.py b/silverback/_cli.py index fe193e47..fc2f30b1 100644 --- a/silverback/_cli.py +++ b/silverback/_cli.py @@ -51,7 +51,7 @@ @click.group(cls=SectionedHelpGroup) def cli(): """ - Silverback: Build Python apps that react to on-chain events + Silverback: Build Python bots that react to on-chain events To learn more about our cloud offering, please check out https://silverback.apeworx.io """ @@ -110,7 +110,7 @@ def _network_callback(ctx, param, val): @click.option("-x", "--max-exceptions", type=int, default=3) @click.argument("bot", required=False, callback=bot_path_callback) def run(cli_ctx, account, runner_class, recorder_class, max_exceptions, bot): - """Run Silverback application""" + """Run Silverback bot""" if not runner_class: # NOTE: Automatically select runner class @@ -120,7 +120,7 @@ def run(cli_ctx, account, runner_class, recorder_class, max_exceptions, bot): runner_class = PollingRunner else: raise click.BadOptionUsage( - option_name="network", message="Network choice cannot support running app" + option_name="network", message="Network choice cannot support running bot" ) runner = runner_class( @@ -213,7 +213,7 @@ def login(auth: FiefAuth): @cli.group(cls=SectionedHelpGroup, section="Cloud Commands (https://silverback.apeworx.io)") def cluster(): - """Manage a Silverback hosted application cluster + """Manage a Silverback hosted bot cluster For clusters on the Silverback Platform, please provide a name for the cluster to access under your platform account via `-c WORKSPACE/NAME`""" diff --git a/silverback/exceptions.py b/silverback/exceptions.py index 371dff83..554f5a88 100644 --- a/silverback/exceptions.py +++ b/silverback/exceptions.py @@ -48,11 +48,11 @@ def __init__(self): class Halt(SilverbackException): def __init__(self): - super().__init__("App halted, must restart manually") + super().__init__("Bot halted, must restart manually") class CircuitBreaker(Halt): - """Custom exception (created by user) that will trigger an application shutdown.""" + """Custom exception (created by user) that will trigger an bot shutdown.""" def __init__(self, message: str): super(SilverbackException, self).__init__(message) diff --git a/silverback/application.py b/silverback/main.py similarity index 92% rename from silverback/application.py rename to silverback/main.py index f97e4904..015ec1a5 100644 --- a/silverback/application.py +++ b/silverback/main.py @@ -21,9 +21,9 @@ class SystemConfig(BaseModel): # NOTE: Do not change this datatype unless major breaking - # NOTE: Useful for determining if Runner can handle this app + # NOTE: Useful for determining if Runner can handle this bot sdk_version: str - # NOTE: Useful for specifying what task types can be specified by app + # NOTE: Useful for specifying what task types can be specified by bot task_types: list[str] @@ -37,7 +37,7 @@ class TaskData(BaseModel): class SharedState(defaultdict): """ - Class containing the application shared state that all workers can read from and write to. + Class containing the bot shared state that all workers can read from and write to. ```{warning} This is not networked in any way, nor is it multi-process safe, but will be @@ -46,19 +46,19 @@ class SharedState(defaultdict): Usage example:: - @app.on_(...) + @bot.on_(...) def do_something_with_state(value): # Read from state using `getattr` - ... = app.state.something + ... = bot.state.something # Set state using `setattr` - app.state.something = ... + bot.state.something = ... # Read from state using `getitem` - ... = app.state["something"] + ... = bot.state["something"] # Set state using setitem - app.state["something"] = ... + bot.state["something"] = ... """ # TODO: This class does not have thread-safe access control, but should remain safe due to @@ -82,22 +82,22 @@ def __setattr__(self, attr, val): super().__setitem__(attr, val) -class SilverbackApp(ManagerAccessMixin): +class SilverbackBot(ManagerAccessMixin): """ - The application singleton. Must be initialized prior to use. + The bot singleton. Must be initialized prior to use. Usage example:: - from silverback import SilverbackApp + from silverback import SilverbackBot - app = SilverbackApp() + bot = SilverbackBot() - ... # Connection has been initialized, can call broker methods e.g. `app.on_(...)` + ... # Connection has been initialized, can call broker methods e.g. `bot.on_(...)` """ def __init__(self, settings: Settings | None = None): """ - Create app + Create bot Args: settings (~:class:`silverback.settings.Settings` | None): Settings override. @@ -111,7 +111,7 @@ def __init__(self, settings: Settings | None = None): provider = provider_context.__enter__() self.identifier = SilverbackID( - name=settings.APP_NAME, + name=settings.BOT_NAME, network=provider.network.name, ecosystem=provider.network.ecosystem.name, ) @@ -123,7 +123,7 @@ def __init__(self, settings: Settings | None = None): settings.NEW_BLOCK_TIMEOUT = int(timedelta(days=1).total_seconds()) settings_str = "\n ".join(f'{key}="{val}"' for key, val in settings.dict().items() if val) - logger.info(f"Loading Silverback App with settings:\n {settings_str}") + logger.info(f"Loading Silverback Bot with settings:\n {settings_str}") self.broker = settings.get_broker() self.tasks: dict[TaskType, list[TaskData]] = { @@ -146,7 +146,7 @@ def __init__(self, settings: Settings | None = None): network_choice = f"{self.identifier.ecosystem}:{self.identifier.network}" logger.success( - f'Loaded Silverback App:\n NETWORK="{network_choice}"' + f'Loaded Silverback Bot:\n NETWORK="{network_choice}"' f"{signer_str}{new_block_timeout_str}" ) @@ -198,7 +198,7 @@ def __get_user_all_taskdata_handler(self) -> list[TaskData]: return [v for k, l in self.tasks.items() if str(k).startswith("user:") for v in l] async def __load_snapshot_handler(self, startup_state: StateSnapshot): - # NOTE: *DO NOT USE* in Runner, as it will not be updated by the app + # NOTE: *DO NOT USE* in Runner, as it will not be updated by the bot self.state = SharedState() # NOTE: attribute does not exist before this task is executed, # ensuring no one uses it during worker startup @@ -295,11 +295,11 @@ def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask: def on_startup(self) -> Callable: """ Code that will be exected by one worker after worker startup, but before the - application is put into the "run" state by the Runner. + bot is put into the "run" state by the Runner. Usage example:: - @app.on_startup() + @bot.on_startup() def do_something_on_startup(startup_state: StateSnapshot): ... # Reprocess missed events or blocks """ @@ -308,13 +308,13 @@ def do_something_on_startup(startup_state: StateSnapshot): def on_shutdown(self) -> Callable: """ Code that will be exected by one worker before worker shutdown, after the - Runner has decided to put the application into the "shutdown" state. + Runner has decided to put the bot into the "shutdown" state. Usage example:: - @app.on_shutdown() + @bot.on_shutdown() def do_something_on_shutdown(): - ... # Record final state of app + ... # Record final state of bot """ return self.broker_task_decorator(TaskType.SHUTDOWN) @@ -330,7 +330,7 @@ def on_worker_startup(self) -> Callable: Usage example:: - @app.on_worker_startup() + @bot.on_worker_startup() def do_something_on_startup(state): ... # Can provision resources, or add things to `state`. """ @@ -348,7 +348,7 @@ def on_worker_shutdown(self) -> Callable: Usage example:: - @app.on_worker_shutdown() + @bot.on_worker_shutdown() def do_something_on_shutdown(state): ... # Update some external service, perhaps using information from `state`. """ @@ -367,13 +367,13 @@ def on_( Args: container: (BlockContainer | ContractEvent): The event source to watch. new_block_timeout: (int | None): Override for block timeout that is acceptable. - Defaults to whatever the app's settings are for default polling timeout are. + Defaults to whatever the bot's settings are for default polling timeout are. start_block (int | None): block number to start processing events from. Defaults to whatever the latest block is. Raises: :class:`~silverback.exceptions.InvalidContainerTypeError`: - If the type of `container` is not configurable for the app. + If the type of `container` is not configurable for the bot. """ if isinstance(container, BlockContainer): if new_block_timeout is not None: diff --git a/silverback/runner.py b/silverback/runner.py index 8669e3dc..7f24827f 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -11,10 +11,10 @@ from taskiq import AsyncTaskiqTask from taskiq.kicker import AsyncKicker -from .application import SilverbackApp, SystemConfig, TaskData +from .main import SilverbackBot, SystemConfig, TaskData from .exceptions import Halt, NoTasksAvailableError, NoWebsocketAvailableError, StartupFailure from .recorder import BaseRecorder, TaskResult -from .state import AppDatastore, StateSnapshot +from .state import Datastore, StateSnapshot from .subscriptions import SubscriptionType, Web3SubscriptionsManager from .types import TaskType from .utils import ( @@ -28,18 +28,18 @@ class BaseRunner(ABC): def __init__( self, - # TODO: Make fully stateless by replacing `app` with `broker` and `identifier` - app: SilverbackApp, + # TODO: Make fully stateless by replacing `bot` with `broker` and `identifier` + bot: SilverbackBot, *args, max_exceptions: int = 3, recorder: BaseRecorder | None = None, **kwargs, ): - self.app = app + self.bot = bot # TODO: Make datastore optional and settings-driven # TODO: Allow configuring datastore class - self.datastore = AppDatastore() + self.datastore = Datastore() self.recorder = recorder self.max_exceptions = max_exceptions @@ -49,7 +49,7 @@ def __init__( def _create_task_kicker(self, task_data: TaskData) -> AsyncKicker: return AsyncKicker( - task_name=task_data.name, broker=self.app.broker, labels=task_data.labels + task_name=task_data.name, broker=self.bot.broker, labels=task_data.labels ) def _create_system_task_kicker(self, task_type: TaskType) -> AsyncKicker: @@ -81,7 +81,7 @@ async def _checkpoint( if not self._snapshotting_supported: return # Can't support this feature - task = await self.app._create_snapshot.kiq(last_block_seen, last_block_processed) + task = await self.bot._create_snapshot.kiq(last_block_seen, last_block_processed) if (result := await task.wait_result()).is_err: logger.error(f"Error saving snapshot: {result.error}") else: @@ -101,7 +101,7 @@ async def _event_task(self, task_data: TaskData): async def run(self): """ - Run the task broker client for the assembled ``SilverbackApp`` application. + Run the task broker client for the assembled ``SilverbackBot`` bot. Will listen for events against the connected provider (using `ManagerAccessMixin` context), and process them by kicking events over to the configured broker. @@ -113,7 +113,7 @@ async def run(self): If there are no configured tasks to execute. """ # Initialize broker (run worker startup events) - await self.app.broker.startup() + await self.bot.broker.startup() # Obtain system configuration for worker result = await run_taskiq_task_wait_result( @@ -153,7 +153,7 @@ async def run(self): last_block_processed=-1, ) # Use empty snapshot - elif not (startup_state := await self.datastore.init(app_id=self.app.identifier)): + elif not (startup_state := await self.datastore.init(bot_id=self.bot.identifier)): logger.warning("No state snapshot detected, using empty snapshot") startup_state = StateSnapshot( # TODO: Migrate these to parameters (remove explicitly from state) @@ -164,7 +164,7 @@ async def run(self): logger.debug(f"Startup state: {startup_state}") # NOTE: State snapshot is immediately out of date after init - # Send startup state to app + # Send startup state to bot if ( result := await run_taskiq_task_wait_result( self._create_system_task_kicker(TaskType.SYSTEM_LOAD_SNAPSHOT), startup_state @@ -178,7 +178,7 @@ async def run(self): # Initialize recorder (if available) if self.recorder: - await self.recorder.init(app_id=self.app.identifier) + await self.recorder.init(bot_id=self.bot.identifier) # Execute Silverback startup task before we init the rest startup_taskdata_result = await run_taskiq_task_wait_result( @@ -260,7 +260,7 @@ async def run(self): # NOTE: All listener tasks are shut down now - # Execute Silverback shutdown task(s) before shutting down the broker and app + # Execute Silverback shutdown task(s) before shutting down the broker and bot shutdown_taskdata_result = await run_taskiq_task_wait_result( self._create_system_task_kicker(TaskType.SYSTEM_USER_TASKDATA), TaskType.SHUTDOWN ) @@ -293,16 +293,16 @@ async def run(self): # Do one last checkpoint to save a snapshot of final state await self._checkpoint() - await self.app.broker.shutdown() # Release broker + await self.bot.broker.shutdown() # Release broker class WebsocketRunner(BaseRunner, ManagerAccessMixin): """ - Run a single app against a live network using a basic in-memory queue and websockets. + Run a single bot against a live network using a basic in-memory queue and websockets. """ - def __init__(self, app: SilverbackApp, *args, **kwargs): - super().__init__(app, *args, **kwargs) + def __init__(self, bot: SilverbackBot, *args, **kwargs): + super().__init__(bot, *args, **kwargs) # Check for websocket support if not (ws_uri := self.chain_manager.provider.ws_uri): @@ -357,14 +357,14 @@ async def run(self): class PollingRunner(BaseRunner, ManagerAccessMixin): """ - Run a single app against a live network using a basic in-memory queue. + Run a single bot against a live network using a basic in-memory queue. """ # TODO: Move block_timeout settings to Ape core config # TODO: Merge polling/websocket subscriptions downstream in Ape core - def __init__(self, app: SilverbackApp, *args, **kwargs): - super().__init__(app, *args, **kwargs) + def __init__(self, bot: SilverbackBot, *args, **kwargs): + super().__init__(bot, *args, **kwargs) logger.warning( "The polling runner makes a significant amount of requests. " "Do not use in production over long time periods unless you know what you're doing." @@ -373,13 +373,13 @@ def __init__(self, app: SilverbackApp, *args, **kwargs): async def _block_task(self, task_data: TaskData): new_block_task_kicker = self._create_task_kicker(task_data) - if block_settings := self.app.poll_settings.get("_blocks_"): + if block_settings := self.bot.poll_settings.get("_blocks_"): new_block_timeout = block_settings.get("new_block_timeout") else: new_block_timeout = None new_block_timeout = ( - new_block_timeout if new_block_timeout is not None else self.app.new_block_timeout + new_block_timeout if new_block_timeout is not None else self.bot.new_block_timeout ) async for block in async_wrap_iter( chain.blocks.poll_blocks( @@ -401,13 +401,13 @@ async def _event_task(self, task_data: TaskData): event_abi = EventABI.from_signature(event_signature) event_log_task_kicker = self._create_task_kicker(task_data) - if address_settings := self.app.poll_settings.get(contract_address): + if address_settings := self.bot.poll_settings.get(contract_address): new_block_timeout = address_settings.get("new_block_timeout") else: new_block_timeout = None new_block_timeout = ( - new_block_timeout if new_block_timeout is not None else self.app.new_block_timeout + new_block_timeout if new_block_timeout is not None else self.bot.new_block_timeout ) async for event in async_wrap_iter( self.provider.poll_logs( diff --git a/silverback/settings.py b/silverback/settings.py index f4c5366c..7b756484 100644 --- a/silverback/settings.py +++ b/silverback/settings.py @@ -19,14 +19,14 @@ class Settings(BaseSettings, ManagerAccessMixin): """ - Settings for the Silverback app. + Settings for the Silverback bot. Can override these settings from a default state, typically for advanced testing or deployment purposes. Defaults to a working in-memory broker. """ # A unique identifier for this silverback instance - APP_NAME: str = "bot" + BOT_NAME: str = "bot" BROKER_CLASS: str = "taskiq:InMemoryBroker" BROKER_URI: str = "" # To be deprecated in 0.6 @@ -122,7 +122,7 @@ def get_signer(self) -> AccountAPI | None: acct_idx = int(alias.replace("TEST::", "")) return self.account_manager.test_accounts[acct_idx] - # NOTE: Will only have a signer if assigned one here (or in app) + # NOTE: Will only have a signer if assigned one here (or in bot) signer = self.account_manager.load(alias) # NOTE: Set autosign if it's a keyfile account (for local testing) diff --git a/silverback/state.py b/silverback/state.py index 412fc3f6..22591ffe 100644 --- a/silverback/state.py +++ b/silverback/state.py @@ -17,30 +17,30 @@ class StateSnapshot(BaseModel): last_updated: UTCTimestamp = Field(default_factory=utc_now) -class AppDatastore: +class Datastore: """ - Very basic implementation used to store application state and handler result data by + Very basic implementation used to store bot state and handler result data by storing/retreiving state from a JSON-encoded file. - The file structure that this Recorder uses leverages the value of `SILVERBACK_APP_NAME` + The file structure that this Recorder uses leverages the value of `SILVERBACK_BOT_NAME` as well as the configured network to determine the location where files get saved: ./.silverback-sessions/ - / + / / state.json # always write here Note that this format can be read by basic means (even in a JS frontend): - You may also want to give your app a unique name so the data does not get overwritten, - if you are using multiple apps from the same directory: + You may also want to give your bot a unique name so the data does not get overwritten, + if you are using multiple bots from the same directory: - - `SILVERBACK_APP_NAME`: Any alphabetical string valid as a folder name + - `SILVERBACK_BOT_NAME`: Any alphabetical string valid as a folder name """ - async def init(self, app_id: SilverbackID) -> StateSnapshot | None: + async def init(self, bot_id: SilverbackID) -> StateSnapshot | None: data_folder = ( - Path.cwd() / ".silverback-sessions" / app_id.name / app_id.ecosystem / app_id.network + Path.cwd() / ".silverback-sessions" / bot_id.name / bot_id.ecosystem / bot_id.network ) data_folder.mkdir(parents=True, exist_ok=True) self.state_backup_file = data_folder / "state.json"