From 469219bf964a099a4a8615cd4d21bfd8ef1fdd14 Mon Sep 17 00:00:00 2001 From: hailey Date: Thu, 30 Apr 2026 02:49:24 +0000 Subject: [PATCH 01/18] Add example_atproto_plugins: live JetStream sample Mirrors the existing example_plugins/example_rules pattern with a custom register_input_stream hook that subscribes to Bluesky's JetStream WebSocket firehose. Lets contributors exercise Osprey against real production-rate event volume without the synthetic 1-event/sec generator. Stack docker-compose.atproto.yaml on top of the main compose file (or run ./run-atproto.sh) to use it. Co-Authored-By: Claude Opus 4.7 (1M context) --- AGENTS.md | 4 +- docker-compose.atproto.yaml | 29 +++++ example_atproto_plugins/README.md | 51 ++++++++ example_atproto_plugins/__init__.py | 0 example_atproto_plugins/pyproject.toml | 18 +++ .../src/atproto_plugin/__init__.py | 0 .../atproto_plugin/jetstream_input_stream.py | 122 ++++++++++++++++++ .../src/atproto_plugin/register_plugins.py | 14 ++ example_atproto_rules/config/labels.yaml | 5 + example_atproto_rules/main.sml | 3 + example_atproto_rules/models/base.sml | 25 ++++ .../rules/post_contains_test.sml | 20 +++ osprey_worker/Dockerfile | 7 +- pyproject.toml | 7 +- run-atproto.sh | 5 + uv.lock | 25 ++++ 16 files changed, 330 insertions(+), 5 deletions(-) create mode 100644 docker-compose.atproto.yaml create mode 100644 example_atproto_plugins/README.md create mode 100644 example_atproto_plugins/__init__.py create mode 100644 example_atproto_plugins/pyproject.toml create mode 100644 example_atproto_plugins/src/atproto_plugin/__init__.py create mode 100644 example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py create mode 100644 example_atproto_plugins/src/atproto_plugin/register_plugins.py create mode 100644 example_atproto_rules/config/labels.yaml create mode 100644 example_atproto_rules/main.sml create mode 100644 example_atproto_rules/models/base.sml create mode 100644 example_atproto_rules/rules/post_contains_test.sml create mode 100755 run-atproto.sh diff --git a/AGENTS.md b/AGENTS.md index 2912a473..ff0c48c6 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -12,14 +12,16 @@ Top-level modules: - `osprey_coordinator/` — Rust gRPC coordinator (tokio, tonic, etcd, rdkafka). Rust code belongs here. - `proto/osprey/rpc/` — protobuf source of truth for `osprey_rpc` and `osprey_coordinator` types. - `example_plugins/` — reference plugins (UDFs, output sinks, labels service) using the pluggy-based plugin system. Do not add production code here. +- `example_atproto_plugins/` — reference plugin demonstrating a custom input stream that consumes Bluesky's ATProto JetStream firehose. Stack `docker-compose.atproto.yaml` on top of the main compose file (or use `./run-atproto.sh`) to run Osprey against live ATProto traffic. Do not add production code here. - `example_rules/` — sample SML rules and YAML config. +- `example_atproto_rules/` — sample SML rules paired with `example_atproto_plugins/`. Reference files: `docs/DEVELOPMENT.md` (setup), `example_plugins/src/register_plugins.py` (plugin patterns), `example_plugins/src/services/labels_service.py` (labels service example). ## Design - API: gRPC between `osprey_coordinator` and workers; HTTP/Flask for `osprey-ui-api` (port 5004); protobuf definitions under `proto/osprey/rpc/` are authoritative. -- Rules: SML (Osprey's rule language) with user-defined functions registered via pluggy hooks (`@hookimpl_osprey`): `register_udfs`, `register_output_sinks`, `register_labels_service_or_provider`. +- Rules: SML (Osprey's rule language) with user-defined functions registered via pluggy hooks (`@hookimpl_osprey`): `register_udfs`, `register_output_sinks`, `register_labels_service_or_provider`, `register_input_stream` (custom event source; see `example_atproto_plugins/`). - Data model conventions: Pydantic for models, SQLAlchemy for persistence (versions pinned in `pyproject.toml`). ## Build and run diff --git a/docker-compose.atproto.yaml b/docker-compose.atproto.yaml new file mode 100644 index 00000000..cc60c926 --- /dev/null +++ b/docker-compose.atproto.yaml @@ -0,0 +1,29 @@ +# Override that swaps the synthetic Kafka producer for the live Bluesky JetStream +# firehose, via the example_atproto_plugins package. Stack on top of the main +# compose file: +# +# docker compose -f docker-compose.yaml -f docker-compose.atproto.yaml up +# +# Or use the convenience wrapper: ./run-atproto.sh +services: + osprey-worker: + environment: + OSPREY_INPUT_STREAM_SOURCE: plugin + OSPREY_RULES_PATH: ./example_atproto_rules + ports: !reset [] + volumes: + - ./example_atproto_rules:/osprey/example_atproto_rules + - ./example_atproto_plugins:/osprey/example_atproto_plugins + + osprey-ui-api: + environment: + OSPREY_RULES_PATH: /osprey/example_atproto_rules + volumes: + - ./example_atproto_rules:/osprey/example_atproto_rules + + # Avoid host-port conflicts with supervisord-managed services in + # the Discord monorepo dev environment (which often holds :9000 and :8082). + # Worker -> minio uses the internal docker network, so dropping the + # host bindings doesn't affect the sample. + minio: + ports: !reset [] diff --git a/example_atproto_plugins/README.md b/example_atproto_plugins/README.md new file mode 100644 index 00000000..e472cae7 --- /dev/null +++ b/example_atproto_plugins/README.md @@ -0,0 +1,51 @@ +# example_atproto_plugins + +A sample Osprey plugin that consumes Bluesky's [JetStream](https://docs.bsky.app/blog/jetstream) — the public ATProto firehose — as the input event source. + +This is the missing companion to `example_plugins/`: it shows how a real-world adopter wires Osprey up to their own platform's events. Out of the box it gives you: + +- a `register_input_stream` hook implementation that subscribes to JetStream over WebSocket and yields Osprey `Action`s for ATProto commits (posts, likes, reposts, follows by default), +- realistic per-second event volume from the live Bluesky network — useful for load and soak testing changes that the synthetic 1-event/second producer doesn't exercise, +- a companion `example_atproto_rules/` tree showing how to write rules against ATProto event shapes. + +## Running + +From the repo root: + +```sh +./run-atproto.sh +``` + +Under the hood this stacks `docker-compose.atproto.yaml` on top of the main compose file, swapping the worker's input source from Kafka to the JetStream plugin and pointing it at `example_atproto_rules` instead of `example_rules`. + +## Configuration + +| Env var | Default | Description | +| --- | --- | --- | +| `OSPREY_INPUT_STREAM_SOURCE` | (must be) `plugin` | Selects the plugin-provided stream. | +| `OSPREY_JETSTREAM_ENDPOINT` | `wss://jetstream2.us-west.bsky.network/subscribe` | JetStream WebSocket URL. | +| `OSPREY_JETSTREAM_WANTED_COLLECTIONS` | `app.bsky.feed.post,app.bsky.feed.like,app.bsky.feed.repost,app.bsky.graph.follow` | Comma-separated collections to subscribe to (server-side filter). | + +## Action shape + +Each commit is mapped to an `Action` with: + +``` +data = { + 'did': '', + 'collection': 'app.bsky.feed.post', + 'operation': 'create' | 'update' | 'delete', + 'rkey': '', + 'rev': '', + 'cid': '', + 'event_type': 'create_post', # _ + 'record': { ... raw ATProto record ... }, +} +``` + +Identity and account events are skipped — this sample is concerned with content events. + +## Caveats + +- **Not production-ready.** No durable cursor / resume on restart, no zstd compression, no DID-level filtering. Good for sample / load-testing purposes; not a drop-in for a Bluesky deployment. +- **Schema drift.** Bluesky has not committed to JetStream as a stable long-term API. Treat this as illustrative. diff --git a/example_atproto_plugins/__init__.py b/example_atproto_plugins/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/example_atproto_plugins/pyproject.toml b/example_atproto_plugins/pyproject.toml new file mode 100644 index 00000000..246b1a0f --- /dev/null +++ b/example_atproto_plugins/pyproject.toml @@ -0,0 +1,18 @@ +[project] +name = "example_atproto_plugins" +version = "0.1.0" +description = "Example Osprey plugin that consumes Bluesky's ATProto JetStream firehose" +requires-python = ">=3.11" +dependencies = [ + "pluggy==1.5.0", + "websocket-client==1.8.0", +] + +[tool.setuptools] +package-dir = {"" = "src"} + +[tool.setuptools.packages.find] +where = ["src"] + +[project.entry-points.osprey_plugin] +atproto_plugins = "atproto_plugin.register_plugins" diff --git a/example_atproto_plugins/src/atproto_plugin/__init__.py b/example_atproto_plugins/src/atproto_plugin/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py new file mode 100644 index 00000000..38b37c95 --- /dev/null +++ b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py @@ -0,0 +1,122 @@ +import json +import time +from datetime import datetime, timezone +from typing import Any, Dict, Iterator, List, Optional +from urllib.parse import urlencode + +import sentry_sdk +import websocket +from osprey.engine.executor.execution_context import Action +from osprey.worker.lib.instruments import metrics +from osprey.worker.lib.osprey_shared.logging import get_logger +from osprey.worker.sinks.sink.input_stream import BaseInputStream +from osprey.worker.sinks.utils.acking_contexts import BaseAckingContext, NoopAckingContext + +logger = get_logger() + + +DEFAULT_ENDPOINT = 'wss://jetstream2.us-west.bsky.network/subscribe' +DEFAULT_COLLECTIONS = ( + 'app.bsky.feed.post', + 'app.bsky.feed.like', + 'app.bsky.feed.repost', + 'app.bsky.graph.follow', +) + + +class JetStreamInputStream(BaseInputStream[BaseAckingContext[Action]]): + """Subscribes to Bluesky's ATProto JetStream WebSocket and yields Osprey Actions. + + JetStream is Bluesky's public real-time firehose for ATProto network activity emitted as + plain JSON (no CBOR/CAR), making it a convenient high-volume source for exercising Osprey + against real production traffic. See https://docs.bsky.app/blog/jetstream. + + Each ATProto commit is mapped to an Osprey :class:`Action` whose data dictionary exposes + the actor DID, collection, operation, and the raw record. Identity and account events are + skipped — the sample is concerned with content events. + """ + + def __init__( + self, + endpoint: Optional[str] = None, + wanted_collections: Optional[List[str]] = None, + reconnect_seconds: float = 2.0, + ): + super().__init__() + self._endpoint = endpoint or DEFAULT_ENDPOINT + self._wanted_collections = list(wanted_collections) if wanted_collections else list(DEFAULT_COLLECTIONS) + self._reconnect_seconds = reconnect_seconds + + def _build_url(self) -> str: + params = [('wantedCollections', c) for c in self._wanted_collections] + return f'{self._endpoint}?{urlencode(params)}' + + def _gen(self) -> Iterator[BaseAckingContext[Action]]: + url = self._build_url() + while True: + ws: Optional[websocket.WebSocket] = None + try: + logger.info(f'Connecting to JetStream at {url}') + ws = websocket.create_connection(url) + logger.info('JetStream connection established') + while True: + raw = ws.recv() + if not raw: + continue + try: + event = json.loads(raw) + except json.JSONDecodeError: + logger.warning(f'Failed to parse JetStream event: {raw[:200]!r}') + continue + + action = _event_to_action(event) + if action is None: + continue + metrics.increment('jetstream_input_stream.events', tags=[f'action_name:{action.action_name}']) + yield NoopAckingContext(action) + except Exception as e: + logger.exception(f'JetStream stream error; reconnecting in {self._reconnect_seconds}s: {e}') + sentry_sdk.capture_exception(e) + finally: + if ws is not None: + try: + ws.close() + except Exception: + pass + time.sleep(self._reconnect_seconds) + + +def _event_to_action(event: Dict[str, Any]) -> Optional[Action]: + """Map a JetStream event dict into an Osprey :class:`Action`, or return ``None`` to skip it.""" + if event.get('kind') != 'commit': + return None + commit = event.get('commit') or {} + operation = commit.get('operation') + collection = commit.get('collection', '') or '' + did = event.get('did', '') or '' + time_us = event.get('time_us') + if not time_us: + return None + + short = collection.rsplit('.', 1)[-1] if collection else 'unknown' + action_name = f'{operation}_{short}' if operation else short + + record = commit.get('record') or {} + + data: Dict[str, Any] = { + 'did': did, + 'collection': collection, + 'operation': operation, + 'rkey': commit.get('rkey'), + 'rev': commit.get('rev'), + 'cid': commit.get('cid'), + 'event_type': action_name, + 'record': record, + } + + return Action( + action_id=int(time_us), + action_name=action_name, + data=data, + timestamp=datetime.fromtimestamp(time_us / 1_000_000, tz=timezone.utc), + ) diff --git a/example_atproto_plugins/src/atproto_plugin/register_plugins.py b/example_atproto_plugins/src/atproto_plugin/register_plugins.py new file mode 100644 index 00000000..34514c5e --- /dev/null +++ b/example_atproto_plugins/src/atproto_plugin/register_plugins.py @@ -0,0 +1,14 @@ +from atproto_plugin.jetstream_input_stream import JetStreamInputStream +from osprey.engine.executor.execution_context import Action +from osprey.worker.adaptor.plugin_manager import hookimpl_osprey +from osprey.worker.lib.config import Config +from osprey.worker.sinks.sink.input_stream import BaseInputStream +from osprey.worker.sinks.utils.acking_contexts import BaseAckingContext + + +@hookimpl_osprey +def register_input_stream(config: Config) -> BaseInputStream[BaseAckingContext[Action]]: + endpoint = config.get_optional_str('OSPREY_JETSTREAM_ENDPOINT') + raw_collections = config.get_optional_str('OSPREY_JETSTREAM_WANTED_COLLECTIONS') + wanted = [c.strip() for c in raw_collections.split(',') if c.strip()] if raw_collections else None + return JetStreamInputStream(endpoint=endpoint, wanted_collections=wanted) diff --git a/example_atproto_rules/config/labels.yaml b/example_atproto_rules/config/labels.yaml new file mode 100644 index 00000000..69455774 --- /dev/null +++ b/example_atproto_rules/config/labels.yaml @@ -0,0 +1,5 @@ +labels: + test_poster: + valid_for: [Did] + connotation: neutral + description: ATProto user who posted a record containing the word 'test' diff --git a/example_atproto_rules/main.sml b/example_atproto_rules/main.sml new file mode 100644 index 00000000..af31184a --- /dev/null +++ b/example_atproto_rules/main.sml @@ -0,0 +1,3 @@ +Import(rules=['models/base.sml']) + +Require(rule='rules/post_contains_test.sml') diff --git a/example_atproto_rules/models/base.sml b/example_atproto_rules/models/base.sml new file mode 100644 index 00000000..53a91ccd --- /dev/null +++ b/example_atproto_rules/models/base.sml @@ -0,0 +1,25 @@ +Did: Entity[str] = EntityJson( + type='Did', + path='$.did', + coerce_type=True +) + +Collection: Entity[str] = EntityJson( + type='Collection', + path='$.collection', + coerce_type=True +) + +EventType: Entity[str] = EntityJson( + type='EventType', + path='$.event_type', + coerce_type=True +) + +PostText: Entity[str] = EntityJson( + type='PostText', + path='$.record.text', + coerce_type=True +) + +ActionName=GetActionName() diff --git a/example_atproto_rules/rules/post_contains_test.sml b/example_atproto_rules/rules/post_contains_test.sml new file mode 100644 index 00000000..5424020c --- /dev/null +++ b/example_atproto_rules/rules/post_contains_test.sml @@ -0,0 +1,20 @@ +Import( + rules=[ + 'models/base.sml', + ] +) + +PostContainsTest = Rule( + when_all=[ + EventType == 'create_post', + TextContains(text=PostText, phrase='test') + ], + description='ATProto post contains the word "test"', +) + +WhenRules( + rules_any=[PostContainsTest], + then=[ + LabelAdd(entity=Did, label='test_poster'), + ], +) diff --git a/osprey_worker/Dockerfile b/osprey_worker/Dockerfile index 39f9aa22..b2ee0f89 100644 --- a/osprey_worker/Dockerfile +++ b/osprey_worker/Dockerfile @@ -35,10 +35,11 @@ ADD LICENSE.md /osprey/LICENSE.md ADD osprey_rpc/pyproject.toml /osprey/osprey_rpc/pyproject.toml ADD osprey_worker/pyproject.toml /osprey/osprey_worker/pyproject.toml ADD example_plugins/pyproject.toml /osprey/example_plugins/pyproject.toml +ADD example_atproto_plugins/pyproject.toml /osprey/example_atproto_plugins/pyproject.toml # Create minimal package structure required by uv -RUN mkdir -p /osprey/osprey_worker /osprey/osprey_rpc /osprey/example_plugins/src && \ - touch /osprey/osprey_worker/__init__.py /osprey/osprey_rpc/__init__.py /osprey/example_plugins/src/__init__.py +RUN mkdir -p /osprey/osprey_worker /osprey/osprey_rpc /osprey/example_plugins/src /osprey/example_atproto_plugins/src/atproto_plugin && \ + touch /osprey/osprey_worker/__init__.py /osprey/osprey_rpc/__init__.py /osprey/example_plugins/src/__init__.py /osprey/example_atproto_plugins/src/atproto_plugin/__init__.py # Install dependencies first (this layer will be cached when only source code changes) RUN pip install --upgrade pip uv && \ @@ -50,9 +51,11 @@ RUN . .venv/bin/activate && update-tld-names # Add source code after dependencies are installed ADD example_rules /osprey/example_rules +ADD example_atproto_rules /osprey/example_atproto_rules ADD osprey_worker /osprey/osprey_worker ADD osprey_rpc /osprey/osprey_rpc ADD example_plugins /osprey/example_plugins +ADD example_atproto_plugins /osprey/example_atproto_plugins COPY entrypoint.sh /osprey/entrypoint.sh diff --git a/pyproject.toml b/pyproject.toml index 503445bb..df826a29 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -133,12 +133,14 @@ nostril = { url = "https://github.com/casics/nostril/archive/v1.2.0.tar.gz" } osprey_rpc = { workspace = true } osprey_worker = { workspace = true } example_plugins = { workspace = true } +example_atproto_plugins = { workspace = true } [tool.uv.workspace] members = [ "osprey_rpc", "osprey_worker", "example_plugins", + "example_atproto_plugins", ] @@ -171,10 +173,10 @@ ignore = [ ] [tool.ruff.lint.isort] -known-first-party = ["osprey_worker", "osprey_rpc", "example_plugins"] +known-first-party = ["osprey_worker", "osprey_rpc", "example_plugins", "example_atproto_plugins"] [tool.fawltydeps] -code = ["osprey_worker/src", "osprey_rpc/src", "example_plugins/src"] +code = ["osprey_worker/src", "osprey_rpc/src", "example_plugins/src", "example_atproto_plugins/src"] deps = ["pyproject.toml"] ignore_unused = [ # Type stubs: used by mypy, never imported directly @@ -239,6 +241,7 @@ mypy_path = [ "osprey_rpc/src", "osprey_worker/src", "example_plugins/src", + "example_atproto_plugins/src", ] # Strict mode includes the following flags. When these are all True, they can be replaced with strict mode. diff --git a/run-atproto.sh b/run-atproto.sh new file mode 100755 index 00000000..ba299324 --- /dev/null +++ b/run-atproto.sh @@ -0,0 +1,5 @@ +#!/bin/bash +# Run Osprey against the live Bluesky JetStream firehose by stacking +# docker-compose.atproto.yaml on top of the main compose file. +set -e +exec docker compose -f docker-compose.yaml -f docker-compose.atproto.yaml "${@:-up}" diff --git a/uv.lock b/uv.lock index d410aafb..4c8915c1 100644 --- a/uv.lock +++ b/uv.lock @@ -21,6 +21,7 @@ resolution-markers = [ [manifest] members = [ + "example-atproto-plugins", "example-plugins", "osprey-rpc", "osprey-worker", @@ -495,6 +496,21 @@ dependencies = [ { name = "six" }, ] +[[package]] +name = "example-atproto-plugins" +version = "0.1.0" +source = { editable = "example_atproto_plugins" } +dependencies = [ + { name = "pluggy" }, + { name = "websocket-client" }, +] + +[package.metadata] +requires-dist = [ + { name = "pluggy", specifier = "==1.5.0" }, + { name = "websocket-client", specifier = "==1.8.0" }, +] + [[package]] name = "example-plugins" version = "0.1.0" @@ -2554,6 +2570,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/fd/84/fd2ba7aafacbad3c4201d395674fc6348826569da3c0937e75505ead3528/wcwidth-0.2.13-py2.py3-none-any.whl", hash = "sha256:3da69048e4540d84af32131829ff948f1e022c1c6bdb8d6102117aac784f6859", size = 34166, upload-time = "2024-01-06T02:10:55.763Z" }, ] +[[package]] +name = "websocket-client" +version = "1.8.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/e6/30/fba0d96b4b5fbf5948ed3f4681f7da2f9f64512e1d303f94b4cc174c24a5/websocket_client-1.8.0.tar.gz", hash = "sha256:3239df9f44da632f96012472805d40a23281a991027ce11d2f45a6f24ac4c3da", size = 54648, upload-time = "2024-04-23T22:16:16.976Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5a/84/44687a29792a70e111c5c477230a72c4b957d88d16141199bf9acb7537a3/websocket_client-1.8.0-py3-none-any.whl", hash = "sha256:17b44cc997f5c498e809b22cdf2d9c7a9e71c02c8cc2b6c56e7c2d1239bfa526", size = 58826, upload-time = "2024-04-23T22:16:14.422Z" }, +] + [[package]] name = "werkzeug" version = "1.0.1" From 2c7edba310d931f68be0919119ec844d2eb53534 Mon Sep 17 00:00:00 2001 From: hailey Date: Thu, 30 Apr 2026 16:28:01 +0000 Subject: [PATCH 02/18] Address PR review: align with atproto-ruleset structure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Action data shape now mirrors haileyok/atproto-ruleset: $.did, $.operation.{action,collection,path,cid,record}, $.eventMetadata. Identity events become action_name='identity'; commit events become 'operation#'. - Rules tree restructured to match the index.sml routing pattern: main.sml → rules/index.sml → rules/record/index.sml → rules/record/post/index.sml → individual rule files. Models split into models/{base,record/base,record/post}.sml. - Use required=False on entities so non-applicable events don't emit extraction errors (eliminates __error_count noise on non-post events). - Snowflake-generated action_id via batched generate_snowflake_batch (250 IDs per call) instead of time_us, since collisions are possible at sustained throughput. - Add unit tests for _event_to_action covering posts, likes, deletes, identity, account skip, missing time_us, unknown kinds. - Add app.bsky.actor.profile to default subscribed collections. - Tighten per-event try/except so a malformed event no longer tears down the WebSocket connection. - Drop the Discord-monorepo-specific port-reset block from docker-compose.atproto.yaml. Co-Authored-By: Claude Opus 4.7 (1M context) --- AGENTS.md | 2 +- docker-compose.atproto.yaml | 8 -- example_atproto_plugins/README.md | 44 ++++--- .../atproto_plugin/jetstream_input_stream.py | 108 +++++++++++------- .../src/atproto_plugin/register_plugins.py | 3 +- example_atproto_plugins/tests/__init__.py | 0 .../tests/test_jetstream_input_stream.py | 105 +++++++++++++++++ example_atproto_rules/config/labels.yaml | 4 +- example_atproto_rules/main.sml | 2 +- example_atproto_rules/models/base.sml | 39 ++++--- example_atproto_rules/models/record/base.sml | 23 ++++ example_atproto_rules/models/record/post.sml | 7 ++ example_atproto_rules/rules/index.sml | 6 + .../rules/post_contains_test.sml | 20 ---- example_atproto_rules/rules/record/index.sml | 11 ++ .../rules/record/post/index.sml | 9 ++ .../rules/record/post/post_contains_test.sml | 21 ++++ 17 files changed, 310 insertions(+), 102 deletions(-) create mode 100644 example_atproto_plugins/tests/__init__.py create mode 100644 example_atproto_plugins/tests/test_jetstream_input_stream.py create mode 100644 example_atproto_rules/models/record/base.sml create mode 100644 example_atproto_rules/models/record/post.sml create mode 100644 example_atproto_rules/rules/index.sml delete mode 100644 example_atproto_rules/rules/post_contains_test.sml create mode 100644 example_atproto_rules/rules/record/index.sml create mode 100644 example_atproto_rules/rules/record/post/index.sml create mode 100644 example_atproto_rules/rules/record/post/post_contains_test.sml diff --git a/AGENTS.md b/AGENTS.md index ff0c48c6..e65e58c4 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -12,7 +12,7 @@ Top-level modules: - `osprey_coordinator/` — Rust gRPC coordinator (tokio, tonic, etcd, rdkafka). Rust code belongs here. - `proto/osprey/rpc/` — protobuf source of truth for `osprey_rpc` and `osprey_coordinator` types. - `example_plugins/` — reference plugins (UDFs, output sinks, labels service) using the pluggy-based plugin system. Do not add production code here. -- `example_atproto_plugins/` — reference plugin demonstrating a custom input stream that consumes Bluesky's ATProto JetStream firehose. Stack `docker-compose.atproto.yaml` on top of the main compose file (or use `./run-atproto.sh`) to run Osprey against live ATProto traffic. Do not add production code here. +- `example_atproto_plugins/` — reference plugin demonstrating a custom input stream that consumes the Bluesky firehose. Stack `docker-compose.atproto.yaml` on top of the main compose file (or use `./run-atproto.sh`) to run Osprey against live ATProto traffic. Do not add production code here. - `example_rules/` — sample SML rules and YAML config. - `example_atproto_rules/` — sample SML rules paired with `example_atproto_plugins/`. diff --git a/docker-compose.atproto.yaml b/docker-compose.atproto.yaml index cc60c926..c26469e6 100644 --- a/docker-compose.atproto.yaml +++ b/docker-compose.atproto.yaml @@ -10,7 +10,6 @@ services: environment: OSPREY_INPUT_STREAM_SOURCE: plugin OSPREY_RULES_PATH: ./example_atproto_rules - ports: !reset [] volumes: - ./example_atproto_rules:/osprey/example_atproto_rules - ./example_atproto_plugins:/osprey/example_atproto_plugins @@ -20,10 +19,3 @@ services: OSPREY_RULES_PATH: /osprey/example_atproto_rules volumes: - ./example_atproto_rules:/osprey/example_atproto_rules - - # Avoid host-port conflicts with supervisord-managed services in - # the Discord monorepo dev environment (which often holds :9000 and :8082). - # Worker -> minio uses the internal docker network, so dropping the - # host bindings doesn't affect the sample. - minio: - ports: !reset [] diff --git a/example_atproto_plugins/README.md b/example_atproto_plugins/README.md index e472cae7..62eff838 100644 --- a/example_atproto_plugins/README.md +++ b/example_atproto_plugins/README.md @@ -2,11 +2,11 @@ A sample Osprey plugin that consumes Bluesky's [JetStream](https://docs.bsky.app/blog/jetstream) — the public ATProto firehose — as the input event source. -This is the missing companion to `example_plugins/`: it shows how a real-world adopter wires Osprey up to their own platform's events. Out of the box it gives you: +This is the missing companion to `example_plugins/`: it shows how a real-world adopter wires Osprey up to their own platform's events. The Action shape it emits is intended to be source-compatible with [haileyok/atproto-ruleset](https://github.com/haileyok/atproto-ruleset), so rules written for that ruleset can be pointed at this stream without translation. Out of the box it gives you: -- a `register_input_stream` hook implementation that subscribes to JetStream over WebSocket and yields Osprey `Action`s for ATProto commits (posts, likes, reposts, follows by default), +- a `register_input_stream` hook implementation that subscribes to JetStream over WebSocket and yields Osprey `Action`s for ATProto identity and commit events, - realistic per-second event volume from the live Bluesky network — useful for load and soak testing changes that the synthetic 1-event/second producer doesn't exercise, -- a companion `example_atproto_rules/` tree showing how to write rules against ATProto event shapes. +- a companion `example_atproto_rules/` tree showing how to organize rules against ATProto event shapes. ## Running @@ -24,28 +24,46 @@ Under the hood this stacks `docker-compose.atproto.yaml` on top of the main comp | --- | --- | --- | | `OSPREY_INPUT_STREAM_SOURCE` | (must be) `plugin` | Selects the plugin-provided stream. | | `OSPREY_JETSTREAM_ENDPOINT` | `wss://jetstream2.us-west.bsky.network/subscribe` | JetStream WebSocket URL. | -| `OSPREY_JETSTREAM_WANTED_COLLECTIONS` | `app.bsky.feed.post,app.bsky.feed.like,app.bsky.feed.repost,app.bsky.graph.follow` | Comma-separated collections to subscribe to (server-side filter). | +| `OSPREY_JETSTREAM_WANTED_COLLECTIONS` | `app.bsky.feed.post,app.bsky.feed.like,app.bsky.feed.repost,app.bsky.graph.follow,app.bsky.actor.profile` | Comma-separated collections to subscribe to (server-side filter). | ## Action shape -Each commit is mapped to an `Action` with: +### Commit events + +JetStream `commit` events become `action_name='operation#'` (e.g., `operation#create`): + +``` +data = { + 'did': '', + 'operation': { + 'action': 'create' | 'update' | 'delete', + 'collection': 'app.bsky.feed.post', + 'path': '/', + 'cid': '', + 'record': { ... raw ATProto record ... }, + }, + 'eventMetadata': {}, # reserved for enrichment (handle, profile, account age, ...) +} +``` + +### Identity events + +JetStream `identity` events become `action_name='identity'`: ``` data = { 'did': '', - 'collection': 'app.bsky.feed.post', - 'operation': 'create' | 'update' | 'delete', - 'rkey': '', - 'rev': '', - 'cid': '', - 'event_type': 'create_post', # _ - 'record': { ... raw ATProto record ... }, + 'identity': {'handle': ''}, + 'eventMetadata': {}, } ``` -Identity and account events are skipped — this sample is concerned with content events. +Account events and any other kind are skipped. + +`action_id` is generated by the in-repo `snowflake-id-worker` (batched 250 IDs per HTTP call). ## Caveats - **Not production-ready.** No durable cursor / resume on restart, no zstd compression, no DID-level filtering. Good for sample / load-testing purposes; not a drop-in for a Bluesky deployment. +- **No event enrichment.** `eventMetadata` is left empty; rules that depend on handle / profile / account age (such as much of [atproto-ruleset](https://github.com/haileyok/atproto-ruleset)) need a separate enrichment service. - **Schema drift.** Bluesky has not committed to JetStream as a stable long-term API. Treat this as illustrative. diff --git a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py index 38b37c95..1c821391 100644 --- a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py +++ b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py @@ -9,6 +9,7 @@ from osprey.engine.executor.execution_context import Action from osprey.worker.lib.instruments import metrics from osprey.worker.lib.osprey_shared.logging import get_logger +from osprey.worker.lib.snowflake import generate_snowflake_batch from osprey.worker.sinks.sink.input_stream import BaseInputStream from osprey.worker.sinks.utils.acking_contexts import BaseAckingContext, NoopAckingContext @@ -21,7 +22,9 @@ 'app.bsky.feed.like', 'app.bsky.feed.repost', 'app.bsky.graph.follow', + 'app.bsky.actor.profile', ) +SNOWFLAKE_BATCH_SIZE = 250 class JetStreamInputStream(BaseInputStream[BaseAckingContext[Action]]): @@ -31,9 +34,11 @@ class JetStreamInputStream(BaseInputStream[BaseAckingContext[Action]]): plain JSON (no CBOR/CAR), making it a convenient high-volume source for exercising Osprey against real production traffic. See https://docs.bsky.app/blog/jetstream. - Each ATProto commit is mapped to an Osprey :class:`Action` whose data dictionary exposes - the actor DID, collection, operation, and the raw record. Identity and account events are - skipped — the sample is concerned with content events. + The Action shape this stream emits is intended to be source-compatible with + https://github.com/haileyok/atproto-ruleset, so rules written against that ruleset can be + pointed at this input stream without translation. Identity events become + ``action_name='identity'``; commit events become ``operation#``. + Account events are skipped — the sample is concerned with content events. """ def __init__( @@ -46,11 +51,18 @@ def __init__( self._endpoint = endpoint or DEFAULT_ENDPOINT self._wanted_collections = list(wanted_collections) if wanted_collections else list(DEFAULT_COLLECTIONS) self._reconnect_seconds = reconnect_seconds + self._snowflake_buffer: List[int] = [] def _build_url(self) -> str: params = [('wantedCollections', c) for c in self._wanted_collections] return f'{self._endpoint}?{urlencode(params)}' + def _next_action_id(self) -> int: + if not self._snowflake_buffer: + batch = generate_snowflake_batch(count=SNOWFLAKE_BATCH_SIZE, retries=3) + self._snowflake_buffer = [s.to_int() for s in batch] + return self._snowflake_buffer.pop() + def _gen(self) -> Iterator[BaseAckingContext[Action]]: url = self._build_url() while True: @@ -65,11 +77,11 @@ def _gen(self) -> Iterator[BaseAckingContext[Action]]: continue try: event = json.loads(raw) - except json.JSONDecodeError: - logger.warning(f'Failed to parse JetStream event: {raw[:200]!r}') + action = _event_to_action(event, action_id=self._next_action_id()) + except Exception: + logger.exception('skipping malformed JetStream event') + sentry_sdk.capture_exception() continue - - action = _event_to_action(event) if action is None: continue metrics.increment('jetstream_input_stream.events', tags=[f'action_name:{action.action_name}']) @@ -82,41 +94,57 @@ def _gen(self) -> Iterator[BaseAckingContext[Action]]: try: ws.close() except Exception: - pass + logger.debug('ignored error while closing JetStream socket', exc_info=True) time.sleep(self._reconnect_seconds) -def _event_to_action(event: Dict[str, Any]) -> Optional[Action]: - """Map a JetStream event dict into an Osprey :class:`Action`, or return ``None`` to skip it.""" - if event.get('kind') != 'commit': - return None - commit = event.get('commit') or {} - operation = commit.get('operation') - collection = commit.get('collection', '') or '' - did = event.get('did', '') or '' +def _event_to_action(event: Dict[str, Any], action_id: int) -> Optional[Action]: + """Map a JetStream event dict into an Osprey :class:`Action`, or return ``None`` to skip it. + + Identity events become ``action_name='identity'``; commit events become + ``operation#`` with the body under ``$.operation.*``. Account events + and any other event kind are skipped. + """ time_us = event.get('time_us') - if not time_us: + if time_us is None: return None - - short = collection.rsplit('.', 1)[-1] if collection else 'unknown' - action_name = f'{operation}_{short}' if operation else short - - record = commit.get('record') or {} - - data: Dict[str, Any] = { - 'did': did, - 'collection': collection, - 'operation': operation, - 'rkey': commit.get('rkey'), - 'rev': commit.get('rev'), - 'cid': commit.get('cid'), - 'event_type': action_name, - 'record': record, - } - - return Action( - action_id=int(time_us), - action_name=action_name, - data=data, - timestamp=datetime.fromtimestamp(time_us / 1_000_000, tz=timezone.utc), - ) + timestamp = datetime.fromtimestamp(time_us / 1_000_000, tz=timezone.utc) + did = event.get('did') or '' + + kind = event.get('kind') + if kind == 'identity': + identity = event.get('identity') or {} + return Action( + action_id=action_id, + action_name='identity', + data={ + 'did': did, + 'identity': {'handle': identity.get('handle')}, + 'eventMetadata': {}, + }, + timestamp=timestamp, + ) + + if kind == 'commit': + commit = event.get('commit') or {} + operation = commit.get('operation') + collection = commit.get('collection') or '' + rkey = commit.get('rkey') or '' + return Action( + action_id=action_id, + action_name=f'operation#{operation}' if operation else 'operation', + data={ + 'did': did, + 'operation': { + 'action': operation, + 'collection': collection, + 'path': f'{collection}/{rkey}' if collection and rkey else '', + 'cid': commit.get('cid'), + 'record': commit.get('record') or {}, + }, + 'eventMetadata': {}, + }, + timestamp=timestamp, + ) + + return None diff --git a/example_atproto_plugins/src/atproto_plugin/register_plugins.py b/example_atproto_plugins/src/atproto_plugin/register_plugins.py index 34514c5e..dc6b0736 100644 --- a/example_atproto_plugins/src/atproto_plugin/register_plugins.py +++ b/example_atproto_plugins/src/atproto_plugin/register_plugins.py @@ -1,10 +1,11 @@ -from atproto_plugin.jetstream_input_stream import JetStreamInputStream from osprey.engine.executor.execution_context import Action from osprey.worker.adaptor.plugin_manager import hookimpl_osprey from osprey.worker.lib.config import Config from osprey.worker.sinks.sink.input_stream import BaseInputStream from osprey.worker.sinks.utils.acking_contexts import BaseAckingContext +from atproto_plugin.jetstream_input_stream import JetStreamInputStream + @hookimpl_osprey def register_input_stream(config: Config) -> BaseInputStream[BaseAckingContext[Action]]: diff --git a/example_atproto_plugins/tests/__init__.py b/example_atproto_plugins/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/example_atproto_plugins/tests/test_jetstream_input_stream.py b/example_atproto_plugins/tests/test_jetstream_input_stream.py new file mode 100644 index 00000000..23865504 --- /dev/null +++ b/example_atproto_plugins/tests/test_jetstream_input_stream.py @@ -0,0 +1,105 @@ +from atproto_plugin.jetstream_input_stream import _event_to_action + +SAMPLE_POST_COMMIT = { + 'did': 'did:plc:abc123', + 'time_us': 1714500000000000, + 'kind': 'commit', + 'commit': { + 'rev': '3kf...', + 'operation': 'create', + 'collection': 'app.bsky.feed.post', + 'rkey': 'abcdefg', + 'cid': 'bafyrei...', + 'record': { + '$type': 'app.bsky.feed.post', + 'text': 'this is a test post', + 'createdAt': '2024-04-30T12:00:00Z', + }, + }, +} + + +def test_post_commit_maps_to_operation_create(): + action = _event_to_action(SAMPLE_POST_COMMIT, action_id=42) + + assert action is not None + assert action.action_id == 42 + assert action.action_name == 'operation#create' + assert action.data['did'] == 'did:plc:abc123' + op = action.data['operation'] + assert op['action'] == 'create' + assert op['collection'] == 'app.bsky.feed.post' + assert op['path'] == 'app.bsky.feed.post/abcdefg' + assert op['cid'] == 'bafyrei...' + assert op['record']['text'] == 'this is a test post' + assert action.data['eventMetadata'] == {} + + +def test_like_commit_maps_to_operation_create_without_record_text(): + like = { + **SAMPLE_POST_COMMIT, + 'commit': { + **SAMPLE_POST_COMMIT['commit'], + 'collection': 'app.bsky.feed.like', + 'record': {'subject': {'uri': 'at://...', 'cid': 'bafy...'}}, + }, + } + action = _event_to_action(like, action_id=1) + + assert action is not None + assert action.action_name == 'operation#create' + assert action.data['operation']['collection'] == 'app.bsky.feed.like' + assert 'text' not in action.data['operation']['record'] + + +def test_delete_commit_maps_with_empty_record(): + delete = { + **SAMPLE_POST_COMMIT, + 'commit': { + 'operation': 'delete', + 'collection': 'app.bsky.feed.post', + 'rkey': 'abcdefg', + 'rev': '3kf...', + }, + } + action = _event_to_action(delete, action_id=2) + + assert action is not None + assert action.action_name == 'operation#delete' + assert action.data['operation']['action'] == 'delete' + assert action.data['operation']['record'] == {} + + +def test_identity_event_maps_to_identity_action(): + identity = { + 'did': 'did:plc:xyz', + 'time_us': 1714500000000001, + 'kind': 'identity', + 'identity': {'did': 'did:plc:xyz', 'handle': 'someone.bsky.social', 'seq': 100}, + } + action = _event_to_action(identity, action_id=3) + + assert action is not None + assert action.action_name == 'identity' + assert action.data['identity']['handle'] == 'someone.bsky.social' + assert action.data['did'] == 'did:plc:xyz' + + +def test_account_event_is_skipped(): + account = { + 'did': 'did:plc:xyz', + 'time_us': 1714500000000002, + 'kind': 'account', + 'account': {'active': True, 'did': 'did:plc:xyz', 'seq': 100}, + } + assert _event_to_action(account, action_id=4) is None + + +def test_event_without_time_us_is_skipped(): + bad = {k: v for k, v in SAMPLE_POST_COMMIT.items() if k != 'time_us'} + assert _event_to_action(bad, action_id=5) is None + + +def test_unknown_kind_is_skipped(): + weird = {'did': 'did:plc:xyz', 'time_us': 1, 'kind': 'something-new'} + assert _event_to_action(weird, action_id=6) is None diff --git a/example_atproto_rules/config/labels.yaml b/example_atproto_rules/config/labels.yaml index 69455774..799d5495 100644 --- a/example_atproto_rules/config/labels.yaml +++ b/example_atproto_rules/config/labels.yaml @@ -1,5 +1,5 @@ labels: - test_poster: - valid_for: [Did] + test-poster: + valid_for: [UserId] connotation: neutral description: ATProto user who posted a record containing the word 'test' diff --git a/example_atproto_rules/main.sml b/example_atproto_rules/main.sml index af31184a..d22a8b63 100644 --- a/example_atproto_rules/main.sml +++ b/example_atproto_rules/main.sml @@ -1,3 +1,3 @@ Import(rules=['models/base.sml']) -Require(rule='rules/post_contains_test.sml') +Require(rule='rules/index.sml') diff --git a/example_atproto_rules/models/base.sml b/example_atproto_rules/models/base.sml index 53a91ccd..6d8b5bb0 100644 --- a/example_atproto_rules/models/base.sml +++ b/example_atproto_rules/models/base.sml @@ -1,25 +1,32 @@ -Did: Entity[str] = EntityJson( - type='Did', +ActionName = GetActionName() + +UserId: Entity[str] = EntityJson( + type='UserId', path='$.did', - coerce_type=True + required=False, ) -Collection: Entity[str] = EntityJson( - type='Collection', - path='$.collection', - coerce_type=True +Handle: Entity[str] = EntityJson( + type='Handle', + path='$.eventMetadata.handle', + required=False, ) -EventType: Entity[str] = EntityJson( - type='EventType', - path='$.event_type', - coerce_type=True +PdsHost: Entity[str] = EntityJson( + type='PdsHost', + path='$.eventMetadata.pdsHost', + required=False, ) -PostText: Entity[str] = EntityJson( - type='PostText', - path='$.record.text', - coerce_type=True +OperationKind: Optional[str] = JsonData( + path='$.operation.action', + required=False, ) -ActionName=GetActionName() +IsOperation = OperationKind != None + +Second: int = 1 +Minute: int = Second * 60 +Hour: int = Minute * 60 +Day: int = Hour * 24 +Week: int = Day * 7 diff --git a/example_atproto_rules/models/record/base.sml b/example_atproto_rules/models/record/base.sml new file mode 100644 index 00000000..742cd08b --- /dev/null +++ b/example_atproto_rules/models/record/base.sml @@ -0,0 +1,23 @@ +Import(rules=['models/base.sml']) + +IsCreate = OperationKind == 'create' +IsUpdate = OperationKind == 'update' +IsDelete = OperationKind == 'delete' + +Collection: str = JsonData( + path='$.operation.collection', + required=False, + coerce_type=True, +) + +Path: str = JsonData( + path='$.operation.path', + required=False, + coerce_type=True, +) + +Cid: str = JsonData( + path='$.operation.cid', + required=False, + coerce_type=True, +) diff --git a/example_atproto_rules/models/record/post.sml b/example_atproto_rules/models/record/post.sml new file mode 100644 index 00000000..5636aa3a --- /dev/null +++ b/example_atproto_rules/models/record/post.sml @@ -0,0 +1,7 @@ +Import(rules=['models/base.sml']) + +PostText: str = JsonData( + path='$.operation.record.text', + required=False, + coerce_type=True, +) diff --git a/example_atproto_rules/rules/index.sml b/example_atproto_rules/rules/index.sml new file mode 100644 index 00000000..113cb722 --- /dev/null +++ b/example_atproto_rules/rules/index.sml @@ -0,0 +1,6 @@ +Import(rules=['models/base.sml']) + +Require( + rule='rules/record/index.sml', + require_if=IsOperation, +) diff --git a/example_atproto_rules/rules/post_contains_test.sml b/example_atproto_rules/rules/post_contains_test.sml deleted file mode 100644 index 5424020c..00000000 --- a/example_atproto_rules/rules/post_contains_test.sml +++ /dev/null @@ -1,20 +0,0 @@ -Import( - rules=[ - 'models/base.sml', - ] -) - -PostContainsTest = Rule( - when_all=[ - EventType == 'create_post', - TextContains(text=PostText, phrase='test') - ], - description='ATProto post contains the word "test"', -) - -WhenRules( - rules_any=[PostContainsTest], - then=[ - LabelAdd(entity=Did, label='test_poster'), - ], -) diff --git a/example_atproto_rules/rules/record/index.sml b/example_atproto_rules/rules/record/index.sml new file mode 100644 index 00000000..0fabb9d5 --- /dev/null +++ b/example_atproto_rules/rules/record/index.sml @@ -0,0 +1,11 @@ +Import( + rules=[ + 'models/base.sml', + 'models/record/base.sml', + ], +) + +Require( + rule='rules/record/post/index.sml', + require_if=(IsCreate or IsUpdate) and Collection == 'app.bsky.feed.post', +) diff --git a/example_atproto_rules/rules/record/post/index.sml b/example_atproto_rules/rules/record/post/index.sml new file mode 100644 index 00000000..1d84f15e --- /dev/null +++ b/example_atproto_rules/rules/record/post/index.sml @@ -0,0 +1,9 @@ +Import( + rules=[ + 'models/base.sml', + 'models/record/base.sml', + 'models/record/post.sml', + ], +) + +Require(rule='rules/record/post/post_contains_test.sml') diff --git a/example_atproto_rules/rules/record/post/post_contains_test.sml b/example_atproto_rules/rules/record/post/post_contains_test.sml new file mode 100644 index 00000000..886f068c --- /dev/null +++ b/example_atproto_rules/rules/record/post/post_contains_test.sml @@ -0,0 +1,21 @@ +Import( + rules=[ + 'models/base.sml', + 'models/record/base.sml', + 'models/record/post.sml', + ], +) + +PostContainsTestRule = Rule( + when_all=[ + TextContains(text=PostText, phrase='test'), + ], + description='ATProto post contains the word "test"', +) + +WhenRules( + rules_any=[PostContainsTestRule], + then=[ + LabelAdd(entity=UserId, label='test-poster'), + ], +) From 1f6a8de8cef3d7eb7a82e372d7e62bc1f0605ae7 Mon Sep 17 00:00:00 2001 From: hailey Date: Thu, 30 Apr 2026 16:38:03 +0000 Subject: [PATCH 03/18] Fix mypy: split per-connection streaming into helper mypy couldn't narrow `ws: Optional[WebSocket]` inside the recv loop because the variable also had to survive into the `finally` block. Moved the inner streaming + close lifecycle into _stream_one_connection, leaving _gen as just the reconnect loop. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../atproto_plugin/jetstream_input_stream.py | 52 ++++++++++--------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py index 1c821391..82018b75 100644 --- a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py +++ b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py @@ -66,37 +66,39 @@ def _next_action_id(self) -> int: def _gen(self) -> Iterator[BaseAckingContext[Action]]: url = self._build_url() while True: - ws: Optional[websocket.WebSocket] = None try: - logger.info(f'Connecting to JetStream at {url}') - ws = websocket.create_connection(url) - logger.info('JetStream connection established') - while True: - raw = ws.recv() - if not raw: - continue - try: - event = json.loads(raw) - action = _event_to_action(event, action_id=self._next_action_id()) - except Exception: - logger.exception('skipping malformed JetStream event') - sentry_sdk.capture_exception() - continue - if action is None: - continue - metrics.increment('jetstream_input_stream.events', tags=[f'action_name:{action.action_name}']) - yield NoopAckingContext(action) + yield from self._stream_one_connection(url) except Exception as e: logger.exception(f'JetStream stream error; reconnecting in {self._reconnect_seconds}s: {e}') sentry_sdk.capture_exception(e) - finally: - if ws is not None: - try: - ws.close() - except Exception: - logger.debug('ignored error while closing JetStream socket', exc_info=True) time.sleep(self._reconnect_seconds) + def _stream_one_connection(self, url: str) -> Iterator[BaseAckingContext[Action]]: + logger.info(f'Connecting to JetStream at {url}') + ws = websocket.create_connection(url) + logger.info('JetStream connection established') + try: + while True: + raw = ws.recv() + if not raw: + continue + try: + event = json.loads(raw) + action = _event_to_action(event, action_id=self._next_action_id()) + except Exception: + logger.exception('skipping malformed JetStream event') + sentry_sdk.capture_exception() + continue + if action is None: + continue + metrics.increment('jetstream_input_stream.events', tags=[f'action_name:{action.action_name}']) + yield NoopAckingContext(action) + finally: + try: + ws.close() + except Exception: + logger.debug('ignored error while closing JetStream socket', exc_info=True) + def _event_to_action(event: Dict[str, Any], action_id: int) -> Optional[Action]: """Map a JetStream event dict into an Osprey :class:`Action`, or return ``None`` to skip it. From b28092f6ff00c3a1ea88c7d883e10e33754891ad Mon Sep 17 00:00:00 2001 From: hailey Date: Thu, 30 Apr 2026 21:39:21 +0000 Subject: [PATCH 04/18] Use JetStream-native paths in sample rules and Action shape MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Earlier review feedback pointed to haileyok/atproto-ruleset for project *structure* — but that ruleset is fed by a separate enrichment pipeline, not JetStream directly, and uses paths like \$.eventMetadata.* and \$.operation.path that JetStream doesn't emit. Mapping JetStream events into that shape was misleading: it pretended to expose enrichment fields that are always null and synthesised paths that don't exist. Reverted the data shape so Action.data is the JetStream JSON event passed through unchanged. Rules now read JetStream-native paths: \$.did, \$.kind, \$.commit.{operation,collection,rkey,cid,record}, \$.identity.handle, etc. action_name is the event's kind ('commit' or 'identity'). The file hierarchy from atproto-ruleset (main.sml, models/{base,record/...}, rules/index.sml routing) is preserved. Verified end-to-end on the live worker: - 7/7 unit tests pass in container - 230,392 events processed post-restart, every one with __error_count: 0 - 253,820 commits + 142 identity events emitted with correct paths - 70 PostContainsTestRule positive matches with label mutation UserId/test-poster/1 emitted on posts containing "test" Co-Authored-By: Claude Opus 4.7 (1M context) --- example_atproto_plugins/README.md | 49 ++++++------- .../atproto_plugin/jetstream_input_stream.py | 68 ++++++------------- .../tests/test_jetstream_input_stream.py | 35 +++++----- example_atproto_rules/models/base.sml | 14 +--- example_atproto_rules/models/record/base.sml | 8 +-- example_atproto_rules/models/record/post.sml | 2 +- 6 files changed, 66 insertions(+), 110 deletions(-) diff --git a/example_atproto_plugins/README.md b/example_atproto_plugins/README.md index 62eff838..4874dfff 100644 --- a/example_atproto_plugins/README.md +++ b/example_atproto_plugins/README.md @@ -2,11 +2,11 @@ A sample Osprey plugin that consumes Bluesky's [JetStream](https://docs.bsky.app/blog/jetstream) — the public ATProto firehose — as the input event source. -This is the missing companion to `example_plugins/`: it shows how a real-world adopter wires Osprey up to their own platform's events. The Action shape it emits is intended to be source-compatible with [haileyok/atproto-ruleset](https://github.com/haileyok/atproto-ruleset), so rules written for that ruleset can be pointed at this stream without translation. Out of the box it gives you: +This is the missing companion to `example_plugins/`: it shows how a real-world adopter wires Osprey up to their own platform's events. Out of the box it gives you: -- a `register_input_stream` hook implementation that subscribes to JetStream over WebSocket and yields Osprey `Action`s for ATProto identity and commit events, +- a `register_input_stream` hook implementation that subscribes to JetStream over WebSocket and yields Osprey `Action`s with the JetStream JSON event passed through as-is, - realistic per-second event volume from the live Bluesky network — useful for load and soak testing changes that the synthetic 1-event/second producer doesn't exercise, -- a companion `example_atproto_rules/` tree showing how to organize rules against ATProto event shapes. +- a companion `example_atproto_rules/` tree showing how to organize rules against ATProto event shapes, with file structure modeled on [haileyok/atproto-ruleset](https://github.com/haileyok/atproto-ruleset). ## Running @@ -28,42 +28,43 @@ Under the hood this stacks `docker-compose.atproto.yaml` on top of the main comp ## Action shape -### Commit events +The JetStream JSON event is passed through unchanged as the Action's `data` dict, so rules read JetStream-native paths directly. `action_name` is set to the event's `kind`. -JetStream `commit` events become `action_name='operation#'` (e.g., `operation#create`): +### Commit events (`action_name='commit'`) ``` -data = { - 'did': '', - 'operation': { - 'action': 'create' | 'update' | 'delete', - 'collection': 'app.bsky.feed.post', - 'path': '/', - 'cid': '', - 'record': { ... raw ATProto record ... }, - }, - 'eventMetadata': {}, # reserved for enrichment (handle, profile, account age, ...) +{ + "did": "did:plc:...", + "time_us": 1714500000000000, + "kind": "commit", + "commit": { + "rev": "...", + "operation": "create" | "update" | "delete", + "collection": "app.bsky.feed.post", + "rkey": "...", + "cid": "...", + "record": { ... raw ATProto record ... } + } } ``` -### Identity events - -JetStream `identity` events become `action_name='identity'`: +### Identity events (`action_name='identity'`) ``` -data = { - 'did': '', - 'identity': {'handle': ''}, - 'eventMetadata': {}, +{ + "did": "did:plc:...", + "time_us": ..., + "kind": "identity", + "identity": {"did": "...", "handle": "...", "seq": ..., "time": "..."} } ``` -Account events and any other kind are skipped. +Account events and any unrecognised kind are skipped. `action_id` is generated by the in-repo `snowflake-id-worker` (batched 250 IDs per HTTP call). ## Caveats - **Not production-ready.** No durable cursor / resume on restart, no zstd compression, no DID-level filtering. Good for sample / load-testing purposes; not a drop-in for a Bluesky deployment. -- **No event enrichment.** `eventMetadata` is left empty; rules that depend on handle / profile / account age (such as much of [atproto-ruleset](https://github.com/haileyok/atproto-ruleset)) need a separate enrichment service. +- **No event enrichment.** JetStream only carries what's in the commit itself; rulesets that depend on handle / profile / account age (such as much of [atproto-ruleset](https://github.com/haileyok/atproto-ruleset), which is fed by a separate enrichment pipeline rather than JetStream directly) need an additional service to populate those fields before rules can use them. - **Schema drift.** Bluesky has not committed to JetStream as a stable long-term API. Treat this as illustrative. diff --git a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py index 82018b75..b3a48caa 100644 --- a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py +++ b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py @@ -34,11 +34,11 @@ class JetStreamInputStream(BaseInputStream[BaseAckingContext[Action]]): plain JSON (no CBOR/CAR), making it a convenient high-volume source for exercising Osprey against real production traffic. See https://docs.bsky.app/blog/jetstream. - The Action shape this stream emits is intended to be source-compatible with - https://github.com/haileyok/atproto-ruleset, so rules written against that ruleset can be - pointed at this input stream without translation. Identity events become - ``action_name='identity'``; commit events become ``operation#``. - Account events are skipped — the sample is concerned with content events. + The JetStream JSON event is passed through unchanged as the Action's data dict, so rules + target JetStream-native paths directly: ``$.did``, ``$.kind``, ``$.commit.operation``, + ``$.commit.collection``, ``$.commit.record.text``, ``$.identity.handle``, etc. The + ``action_name`` is set to the event's ``kind`` (``'commit'`` / ``'identity'``). Account + events are skipped — the sample is concerned with content events. """ def __init__( @@ -101,52 +101,22 @@ def _stream_one_connection(self, url: str) -> Iterator[BaseAckingContext[Action] def _event_to_action(event: Dict[str, Any], action_id: int) -> Optional[Action]: - """Map a JetStream event dict into an Osprey :class:`Action`, or return ``None`` to skip it. + """Wrap a JetStream event as an Osprey :class:`Action`, or return ``None`` to skip it. - Identity events become ``action_name='identity'``; commit events become - ``operation#`` with the body under ``$.operation.*``. Account events - and any other event kind are skipped. + The event JSON is passed through as-is so rules can read JetStream-native paths + (``$.did``, ``$.commit.collection``, ``$.commit.record.text``, etc.). ``action_name`` + is set to the event's ``kind``. Only ``commit`` and ``identity`` events are emitted — + account events and any unrecognised kind are skipped. """ + kind = event.get('kind') + if kind not in ('commit', 'identity'): + return None time_us = event.get('time_us') if time_us is None: return None - timestamp = datetime.fromtimestamp(time_us / 1_000_000, tz=timezone.utc) - did = event.get('did') or '' - - kind = event.get('kind') - if kind == 'identity': - identity = event.get('identity') or {} - return Action( - action_id=action_id, - action_name='identity', - data={ - 'did': did, - 'identity': {'handle': identity.get('handle')}, - 'eventMetadata': {}, - }, - timestamp=timestamp, - ) - - if kind == 'commit': - commit = event.get('commit') or {} - operation = commit.get('operation') - collection = commit.get('collection') or '' - rkey = commit.get('rkey') or '' - return Action( - action_id=action_id, - action_name=f'operation#{operation}' if operation else 'operation', - data={ - 'did': did, - 'operation': { - 'action': operation, - 'collection': collection, - 'path': f'{collection}/{rkey}' if collection and rkey else '', - 'cid': commit.get('cid'), - 'record': commit.get('record') or {}, - }, - 'eventMetadata': {}, - }, - timestamp=timestamp, - ) - - return None + return Action( + action_id=action_id, + action_name=kind, + data=event, + timestamp=datetime.fromtimestamp(time_us / 1_000_000, tz=timezone.utc), + ) diff --git a/example_atproto_plugins/tests/test_jetstream_input_stream.py b/example_atproto_plugins/tests/test_jetstream_input_stream.py index 23865504..5d072eda 100644 --- a/example_atproto_plugins/tests/test_jetstream_input_stream.py +++ b/example_atproto_plugins/tests/test_jetstream_input_stream.py @@ -19,23 +19,21 @@ } -def test_post_commit_maps_to_operation_create(): +def test_post_commit_passes_through_jetstream_payload(): action = _event_to_action(SAMPLE_POST_COMMIT, action_id=42) assert action is not None assert action.action_id == 42 - assert action.action_name == 'operation#create' + assert action.action_name == 'commit' + # Action.data is the raw JetStream event — rules read $.did, $.commit.*, etc. + assert action.data is SAMPLE_POST_COMMIT assert action.data['did'] == 'did:plc:abc123' - op = action.data['operation'] - assert op['action'] == 'create' - assert op['collection'] == 'app.bsky.feed.post' - assert op['path'] == 'app.bsky.feed.post/abcdefg' - assert op['cid'] == 'bafyrei...' - assert op['record']['text'] == 'this is a test post' - assert action.data['eventMetadata'] == {} + assert action.data['commit']['operation'] == 'create' + assert action.data['commit']['collection'] == 'app.bsky.feed.post' + assert action.data['commit']['record']['text'] == 'this is a test post' -def test_like_commit_maps_to_operation_create_without_record_text(): +def test_like_commit_passes_through_jetstream_payload(): like = { **SAMPLE_POST_COMMIT, 'commit': { @@ -47,12 +45,12 @@ def test_like_commit_maps_to_operation_create_without_record_text(): action = _event_to_action(like, action_id=1) assert action is not None - assert action.action_name == 'operation#create' - assert action.data['operation']['collection'] == 'app.bsky.feed.like' - assert 'text' not in action.data['operation']['record'] + assert action.action_name == 'commit' + assert action.data['commit']['collection'] == 'app.bsky.feed.like' + assert 'text' not in action.data['commit']['record'] -def test_delete_commit_maps_with_empty_record(): +def test_delete_commit_passes_through_without_record(): delete = { **SAMPLE_POST_COMMIT, 'commit': { @@ -65,12 +63,12 @@ def test_delete_commit_maps_with_empty_record(): action = _event_to_action(delete, action_id=2) assert action is not None - assert action.action_name == 'operation#delete' - assert action.data['operation']['action'] == 'delete' - assert action.data['operation']['record'] == {} + assert action.action_name == 'commit' + assert action.data['commit']['operation'] == 'delete' + assert 'record' not in action.data['commit'] -def test_identity_event_maps_to_identity_action(): +def test_identity_event_passes_through(): identity = { 'did': 'did:plc:xyz', 'time_us': 1714500000000001, @@ -82,7 +80,6 @@ def test_identity_event_maps_to_identity_action(): assert action is not None assert action.action_name == 'identity' assert action.data['identity']['handle'] == 'someone.bsky.social' - assert action.data['did'] == 'did:plc:xyz' def test_account_event_is_skipped(): diff --git a/example_atproto_rules/models/base.sml b/example_atproto_rules/models/base.sml index 6d8b5bb0..cafc322b 100644 --- a/example_atproto_rules/models/base.sml +++ b/example_atproto_rules/models/base.sml @@ -6,20 +6,8 @@ UserId: Entity[str] = EntityJson( required=False, ) -Handle: Entity[str] = EntityJson( - type='Handle', - path='$.eventMetadata.handle', - required=False, -) - -PdsHost: Entity[str] = EntityJson( - type='PdsHost', - path='$.eventMetadata.pdsHost', - required=False, -) - OperationKind: Optional[str] = JsonData( - path='$.operation.action', + path='$.commit.operation', required=False, ) diff --git a/example_atproto_rules/models/record/base.sml b/example_atproto_rules/models/record/base.sml index 742cd08b..02951408 100644 --- a/example_atproto_rules/models/record/base.sml +++ b/example_atproto_rules/models/record/base.sml @@ -5,19 +5,19 @@ IsUpdate = OperationKind == 'update' IsDelete = OperationKind == 'delete' Collection: str = JsonData( - path='$.operation.collection', + path='$.commit.collection', required=False, coerce_type=True, ) -Path: str = JsonData( - path='$.operation.path', +Rkey: str = JsonData( + path='$.commit.rkey', required=False, coerce_type=True, ) Cid: str = JsonData( - path='$.operation.cid', + path='$.commit.cid', required=False, coerce_type=True, ) diff --git a/example_atproto_rules/models/record/post.sml b/example_atproto_rules/models/record/post.sml index 5636aa3a..3f1ed47a 100644 --- a/example_atproto_rules/models/record/post.sml +++ b/example_atproto_rules/models/record/post.sml @@ -1,7 +1,7 @@ Import(rules=['models/base.sml']) PostText: str = JsonData( - path='$.operation.record.text', + path='$.commit.record.text', required=False, coerce_type=True, ) From c2c07469b25b143ad81bc261a75d50e8fd80ff7d Mon Sep 17 00:00:00 2001 From: hailey Date: Wed, 6 May 2026 00:57:52 +0000 Subject: [PATCH 05/18] Address review feedback on JetStream sample - IMP-1: Add 30s socket read timeout to detect stalled connections - IMP-2: Track in-process cursor (time_us) to resume from last event on reconnect - IMP-3: Add test coverage for _build_url wantedCollections format, cursor param presence, and malformed message handling - IMP-4: Drop snowflake-id-worker dependency; pass action_id=0 to rely on RulesSink fallback minting - MIN-5: Drop coerce_type=True from string-typed JsonData fields (Collection, Rkey, Cid, PostText) - MIN-7: Stricter time_us validation: reject 0 and negative values, add unit tests - MIN-8: Change debug log to info for socket-close errors - MIN-9: Clarify docker-compose stack startup time in README - MIN-6: Update atproto-ruleset caveat to note enrichment pipeline shape mismatch Co-Authored-By: Claude Opus 4.7 (1M context) --- example_atproto_plugins/README.md | 9 +-- .../atproto_plugin/jetstream_input_stream.py | 25 +++---- .../tests/test_jetstream_input_stream.py | 70 ++++++++++++++++++- example_atproto_rules/models/record/base.sml | 3 - example_atproto_rules/models/record/post.sml | 1 - uv.lock | 12 ++-- 6 files changed, 92 insertions(+), 28 deletions(-) diff --git a/example_atproto_plugins/README.md b/example_atproto_plugins/README.md index 4874dfff..37f87ed8 100644 --- a/example_atproto_plugins/README.md +++ b/example_atproto_plugins/README.md @@ -16,7 +16,7 @@ From the repo root: ./run-atproto.sh ``` -Under the hood this stacks `docker-compose.atproto.yaml` on top of the main compose file, swapping the worker's input source from Kafka to the JetStream plugin and pointing it at `example_atproto_rules` instead of `example_rules`. +This brings up the full Osprey local stack (Druid, Postgres, Bigtable, MinIO, Kafka) along with a JetStream WebSocket override, and swaps the worker's input source from Kafka to the JetStream plugin, pointing it at `example_atproto_rules` instead of `example_rules`. First-run startup takes a few minutes. ## Configuration @@ -61,10 +61,11 @@ The JetStream JSON event is passed through unchanged as the Action's `data` dict Account events and any unrecognised kind are skipped. -`action_id` is generated by the in-repo `snowflake-id-worker` (batched 250 IDs per HTTP call). +`action_id` is set to 0; the sink's fallback path (`RulesSink`) generates a real snowflake ID during classification. ## Caveats -- **Not production-ready.** No durable cursor / resume on restart, no zstd compression, no DID-level filtering. Good for sample / load-testing purposes; not a drop-in for a Bluesky deployment. -- **No event enrichment.** JetStream only carries what's in the commit itself; rulesets that depend on handle / profile / account age (such as much of [atproto-ruleset](https://github.com/haileyok/atproto-ruleset), which is fed by a separate enrichment pipeline rather than JetStream directly) need an additional service to populate those fields before rules can use them. +- **Not production-ready.** No durable cursor on process restart, no zstd compression, no DID-level filtering. Good for sample / load-testing purposes; not a drop-in for a Bluesky deployment. In-process reconnect resumes from last seen event via socket-level cursor. +- **No event enrichment.** JetStream only carries what's in the commit itself; rulesets that depend on handle / profile / account age (such as much of [atproto-ruleset](https://github.com/haileyok/atproto-ruleset)) are fed by a separate enrichment pipeline, not JetStream directly. This plugin emits JetStream-native paths ($.did, $.commit.collection, etc.); enrichment-fed rulesets would need an enrichment service in front of this one or a different plugin. +- **No application-level keepalive.** Connection health is monitored via socket read timeout (30s). A stalled connection will be detected and trigger reconnection. - **Schema drift.** Bluesky has not committed to JetStream as a stable long-term API. Treat this as illustrative. diff --git a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py index b3a48caa..40b87f8e 100644 --- a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py +++ b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py @@ -9,7 +9,6 @@ from osprey.engine.executor.execution_context import Action from osprey.worker.lib.instruments import metrics from osprey.worker.lib.osprey_shared.logging import get_logger -from osprey.worker.lib.snowflake import generate_snowflake_batch from osprey.worker.sinks.sink.input_stream import BaseInputStream from osprey.worker.sinks.utils.acking_contexts import BaseAckingContext, NoopAckingContext @@ -24,7 +23,6 @@ 'app.bsky.graph.follow', 'app.bsky.actor.profile', ) -SNOWFLAKE_BATCH_SIZE = 250 class JetStreamInputStream(BaseInputStream[BaseAckingContext[Action]]): @@ -51,22 +49,18 @@ def __init__( self._endpoint = endpoint or DEFAULT_ENDPOINT self._wanted_collections = list(wanted_collections) if wanted_collections else list(DEFAULT_COLLECTIONS) self._reconnect_seconds = reconnect_seconds - self._snowflake_buffer: List[int] = [] + self._last_time_us: Optional[int] = None def _build_url(self) -> str: params = [('wantedCollections', c) for c in self._wanted_collections] + if self._last_time_us is not None: + params.append(('cursor', str(self._last_time_us))) return f'{self._endpoint}?{urlencode(params)}' - def _next_action_id(self) -> int: - if not self._snowflake_buffer: - batch = generate_snowflake_batch(count=SNOWFLAKE_BATCH_SIZE, retries=3) - self._snowflake_buffer = [s.to_int() for s in batch] - return self._snowflake_buffer.pop() - def _gen(self) -> Iterator[BaseAckingContext[Action]]: - url = self._build_url() while True: try: + url = self._build_url() yield from self._stream_one_connection(url) except Exception as e: logger.exception(f'JetStream stream error; reconnecting in {self._reconnect_seconds}s: {e}') @@ -75,7 +69,7 @@ def _gen(self) -> Iterator[BaseAckingContext[Action]]: def _stream_one_connection(self, url: str) -> Iterator[BaseAckingContext[Action]]: logger.info(f'Connecting to JetStream at {url}') - ws = websocket.create_connection(url) + ws = websocket.create_connection(url, timeout=30) logger.info('JetStream connection established') try: while True: @@ -84,20 +78,23 @@ def _stream_one_connection(self, url: str) -> Iterator[BaseAckingContext[Action] continue try: event = json.loads(raw) - action = _event_to_action(event, action_id=self._next_action_id()) + action = _event_to_action(event, action_id=0) except Exception: logger.exception('skipping malformed JetStream event') sentry_sdk.capture_exception() continue if action is None: continue + time_us = event.get('time_us') + if time_us and isinstance(time_us, int) and time_us > 0: + self._last_time_us = time_us metrics.increment('jetstream_input_stream.events', tags=[f'action_name:{action.action_name}']) yield NoopAckingContext(action) finally: try: ws.close() except Exception: - logger.debug('ignored error while closing JetStream socket', exc_info=True) + logger.info('ignored error while closing JetStream socket', exc_info=True) def _event_to_action(event: Dict[str, Any], action_id: int) -> Optional[Action]: @@ -112,7 +109,7 @@ def _event_to_action(event: Dict[str, Any], action_id: int) -> Optional[Action]: if kind not in ('commit', 'identity'): return None time_us = event.get('time_us') - if time_us is None: + if not isinstance(time_us, (int, float)) or time_us <= 0: return None return Action( action_id=action_id, diff --git a/example_atproto_plugins/tests/test_jetstream_input_stream.py b/example_atproto_plugins/tests/test_jetstream_input_stream.py index 5d072eda..b96a6e39 100644 --- a/example_atproto_plugins/tests/test_jetstream_input_stream.py +++ b/example_atproto_plugins/tests/test_jetstream_input_stream.py @@ -1,4 +1,7 @@ -from atproto_plugin.jetstream_input_stream import _event_to_action +from unittest import mock + +from atproto_plugin.jetstream_input_stream import JetStreamInputStream, _event_to_action +from websocket import WebSocketConnectionClosedException SAMPLE_POST_COMMIT = { 'did': 'did:plc:abc123', @@ -97,6 +100,71 @@ def test_event_without_time_us_is_skipped(): assert _event_to_action(bad, action_id=5) is None +def test_event_with_zero_time_us_is_skipped(): + zero_time = {**SAMPLE_POST_COMMIT, 'time_us': 0} + assert _event_to_action(zero_time, action_id=5) is None + + +def test_event_with_negative_time_us_is_skipped(): + neg_time = {**SAMPLE_POST_COMMIT, 'time_us': -1} + assert _event_to_action(neg_time, action_id=5) is None + + def test_unknown_kind_is_skipped(): weird = {'did': 'did:plc:xyz', 'time_us': 1, 'kind': 'something-new'} assert _event_to_action(weird, action_id=6) is None + + +def test_build_url_includes_wanted_collections(): + stream = JetStreamInputStream( + endpoint='wss://example.com/sub', + wanted_collections=['app.bsky.feed.post', 'app.bsky.feed.like'], + ) + url = stream._build_url() + assert 'wss://example.com/sub' in url + assert 'wantedCollections=app.bsky.feed.post' in url + assert 'wantedCollections=app.bsky.feed.like' in url + assert 'cursor=' not in url + + +def test_build_url_includes_cursor_when_last_time_us_set(): + stream = JetStreamInputStream( + endpoint='wss://example.com/sub', + wanted_collections=['app.bsky.feed.post'], + ) + stream._last_time_us = 1714500000000000 + url = stream._build_url() + assert 'wss://example.com/sub' in url + assert 'wantedCollections=app.bsky.feed.post' in url + assert 'cursor=1714500000000000' in url + + +def test_stream_one_connection_handles_malformed_json_and_gracefully_closes(): + class FakeWebSocket: + def __init__(self): + self.closed = False + self.recv_calls = 0 + + def recv(self): + self.recv_calls += 1 + if self.recv_calls == 1: + return b'not-json' + elif self.recv_calls == 2: + return b'{"did": "did:plc:x", "time_us": 1714500000000000, "kind": "commit", "commit": {"operation": "create", "collection": "app.bsky.feed.post", "rkey": "abc", "rev": "3kf...", "cid": "bafy...", "record": {"$type": "app.bsky.feed.post", "text": "hello"}}}' + else: + raise WebSocketConnectionClosedException() + + def close(self): + self.closed = True + + fake_ws = FakeWebSocket() + stream = JetStreamInputStream(reconnect_seconds=0.1) + + with mock.patch('websocket.create_connection', return_value=fake_ws): + actions = list(stream._stream_one_connection('wss://example.com/sub')) + + assert len(actions) == 1 + assert actions[0].item.action_id == 0 + assert actions[0].item.action_name == 'commit' + assert actions[0].item.data['did'] == 'did:plc:x' + assert fake_ws.closed diff --git a/example_atproto_rules/models/record/base.sml b/example_atproto_rules/models/record/base.sml index 02951408..e0b4f315 100644 --- a/example_atproto_rules/models/record/base.sml +++ b/example_atproto_rules/models/record/base.sml @@ -7,17 +7,14 @@ IsDelete = OperationKind == 'delete' Collection: str = JsonData( path='$.commit.collection', required=False, - coerce_type=True, ) Rkey: str = JsonData( path='$.commit.rkey', required=False, - coerce_type=True, ) Cid: str = JsonData( path='$.commit.cid', required=False, - coerce_type=True, ) diff --git a/example_atproto_rules/models/record/post.sml b/example_atproto_rules/models/record/post.sml index 3f1ed47a..405eda0c 100644 --- a/example_atproto_rules/models/record/post.sml +++ b/example_atproto_rules/models/record/post.sml @@ -3,5 +3,4 @@ Import(rules=['models/base.sml']) PostText: str = JsonData( path='$.commit.record.text', required=False, - coerce_type=True, ) diff --git a/uv.lock b/uv.lock index 4c8915c1..a2c0b9f1 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.11" resolution-markers = [ "python_full_version >= '4' and platform_machine == 'x86_64'", @@ -499,7 +499,7 @@ dependencies = [ [[package]] name = "example-atproto-plugins" version = "0.1.0" -source = { editable = "example_atproto_plugins" } +source = { virtual = "example_atproto_plugins" } dependencies = [ { name = "pluggy" }, { name = "websocket-client" }, @@ -514,7 +514,7 @@ requires-dist = [ [[package]] name = "example-plugins" version = "0.1.0" -source = { editable = "example_plugins" } +source = { virtual = "example_plugins" } dependencies = [ { name = "pluggy" }, ] @@ -941,6 +941,7 @@ dependencies = [ ] sdist = { url = "https://files.pythonhosted.org/packages/a3/1c/c42834d4fee45c5cf2d9546e97e879a1cbcdecfd16eb1a12144dcb91edae/grpcio-1.49.1.tar.gz", hash = "sha256:d4725fc9ec8e8822906ae26bb26f5546891aa7fbc3443de970cc556d43a5c99f", size = 22059239, upload-time = "2022-09-22T03:02:44.376Z" } wheels = [ + { url = "https://files.pythonhosted.org/packages/2d/e2/aaccddb8b06637625d847dbb5fe76ec3d15a74d89d983f5202f3666706e3/grpcio-1.49.1-cp311-cp311-linux_armv7l.whl", hash = "sha256:9fb17ff8c0d56099ac6ebfa84f670c5a62228d6b5c695cf21c02160c2ac1446b", size = 73399185, upload-time = "2022-09-22T02:57:56.219Z" }, { url = "https://files.pythonhosted.org/packages/90/0f/4d614d59f500835cbd27cb90743fb6b299098b0f22b8fd058d3586c933c0/grpcio-1.49.1-cp311-cp311-macosx_10_10_x86_64.whl", hash = "sha256:075f2d06e3db6b48a2157a1bcd52d6cbdca980dd18988fe6afdb41795d51625f", size = 4296299, upload-time = "2022-09-22T02:58:01.417Z" }, { url = "https://files.pythonhosted.org/packages/4d/ea/359a98f8b3b4ff9a2f457a0d20ed81775a64149fbb7617177ed23d9d10c9/grpcio-1.49.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dc79b2b37d779ac42341ddef40ad5bf0966a64af412c89fc2b062e3ddabb093f", size = 4656437, upload-time = "2022-09-22T02:58:06.23Z" }, { url = "https://files.pythonhosted.org/packages/fc/89/4952d2dff95f5b95db5943b2d1b55c82a485830b992f52f212b33616b523/grpcio-1.49.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:49b301740cf5bc8fed4fee4c877570189ae3951432d79fa8e524b09353659811", size = 4888051, upload-time = "2022-09-22T02:58:11.411Z" }, @@ -1069,6 +1070,7 @@ dependencies = [ ] sdist = { url = "https://files.pythonhosted.org/packages/6c/e4/3416d25aebc4477141a491fae2c9494c5e0437a706c59103a936aac7d072/grpcio-tools-1.49.1.tar.gz", hash = "sha256:84cc64e5b46bad43d5d7bd2fd772b656eba0366961187a847e908e2cb735db91", size = 2252679, upload-time = "2022-09-22T03:03:00.279Z" } wheels = [ + { url = "https://files.pythonhosted.org/packages/e4/c1/ba298fe650b67c9e31a7ad88b2fe1d8d22ff2c6a9e131604054835397dfc/grpcio_tools-1.49.1-cp311-cp311-linux_armv7l.whl", hash = "sha256:9e5c13809ab2f245398e8446c4c3b399a62d591db651e46806cccf52a700452e", size = 36912892, upload-time = "2022-09-22T03:00:51.237Z" }, { url = "https://files.pythonhosted.org/packages/9c/8b/a45a39bf7d1c4956d48179831e4da88c3f6ee14dbdcb273e575bbeb7de20/grpcio_tools-1.49.1-cp311-cp311-macosx_10_10_x86_64.whl", hash = "sha256:ab3d0ee9623720ee585fdf3753b3755d3144a4a8ae35bca8e3655fa2f41056be", size = 2025040, upload-time = "2022-09-22T03:00:55.219Z" }, { url = "https://files.pythonhosted.org/packages/6d/7f/89dc6036b91f8cbada98b06801ac2f5db60885000feaf88f9d7cabe665b7/grpcio_tools-1.49.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:13e13b3643e7577a3ec13b79689eb4d7548890b1e104c04b9ed6557a3c3dd452", size = 2370982, upload-time = "2022-09-22T03:00:59.807Z" }, { url = "https://files.pythonhosted.org/packages/01/98/4730bfff6bcd3163db8c3d70689e19a1a5f419152316edfc1f13ff06a5d7/grpcio_tools-1.49.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:a64bab81b220c50033f584f57978ebbea575f09c1ccee765cd5c462177988098", size = 2731915, upload-time = "2022-09-22T03:01:05.44Z" }, @@ -1586,7 +1588,7 @@ wheels = [ [[package]] name = "osprey-rpc" version = "0.1.0" -source = { editable = "osprey_rpc" } +source = { virtual = "osprey_rpc" } [[package]] name = "osprey-worker" @@ -1600,7 +1602,7 @@ dependencies = [ [package.metadata] requires-dist = [ { name = "flask-cors", specifier = ">=6.0.1" }, - { name = "osprey-rpc", editable = "osprey_rpc" }, + { name = "osprey-rpc", virtual = "osprey_rpc" }, ] [[package]] From 7cccb4b12351a166ed072599ecc73369bff3ceca Mon Sep 17 00:00:00 2001 From: hailey Date: Wed, 6 May 2026 01:18:42 +0000 Subject: [PATCH 06/18] Fix _stream_one_connection test: catch close exception, use _item attr MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The new test from the previous commit never actually ran (pytest can't import the workspace package outside Docker). Inside Docker the test revealed two issues: - WebSocketConnectionClosedException propagated out of _stream_one_connection, breaking list(gen). Catch it inside the loop and return cleanly — a closed connection is the expected end of a session, not an error worth reraising. _gen still reconnects since the generator just finishes. - NoopAckingContext exposes _item, not item; tests now use the actual attribute name. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/atproto_plugin/jetstream_input_stream.py | 7 ++++++- .../tests/test_jetstream_input_stream.py | 6 +++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py index 40b87f8e..6b2eddae 100644 --- a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py +++ b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py @@ -11,6 +11,7 @@ from osprey.worker.lib.osprey_shared.logging import get_logger from osprey.worker.sinks.sink.input_stream import BaseInputStream from osprey.worker.sinks.utils.acking_contexts import BaseAckingContext, NoopAckingContext +from websocket import WebSocketConnectionClosedException logger = get_logger() @@ -73,7 +74,11 @@ def _stream_one_connection(self, url: str) -> Iterator[BaseAckingContext[Action] logger.info('JetStream connection established') try: while True: - raw = ws.recv() + try: + raw = ws.recv() + except WebSocketConnectionClosedException: + logger.info('JetStream connection closed; will reconnect') + return if not raw: continue try: diff --git a/example_atproto_plugins/tests/test_jetstream_input_stream.py b/example_atproto_plugins/tests/test_jetstream_input_stream.py index b96a6e39..d1662052 100644 --- a/example_atproto_plugins/tests/test_jetstream_input_stream.py +++ b/example_atproto_plugins/tests/test_jetstream_input_stream.py @@ -164,7 +164,7 @@ def close(self): actions = list(stream._stream_one_connection('wss://example.com/sub')) assert len(actions) == 1 - assert actions[0].item.action_id == 0 - assert actions[0].item.action_name == 'commit' - assert actions[0].item.data['did'] == 'did:plc:x' + assert actions[0]._item.action_id == 0 + assert actions[0]._item.action_name == 'commit' + assert actions[0]._item.data['did'] == 'did:plc:x' assert fake_ws.closed From 232c449bb1ffee19385a8efb0bf190a5d153f978 Mon Sep 17 00:00:00 2001 From: hailey Date: Wed, 6 May 2026 01:22:25 +0000 Subject: [PATCH 07/18] Tighten _event_to_action time_us check to int only MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit JetStream's contract is time_us: int (microseconds). The previous isinstance(time_us, (int, float)) was over-permissive and would have allowed _event_to_action to yield while the cursor-update narrowing at the call site (isinstance(time_us, int)) silently rejected the same value — leaving the cursor stale on reconnect. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/atproto_plugin/jetstream_input_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py index 6b2eddae..9a2b8eaf 100644 --- a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py +++ b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py @@ -114,7 +114,7 @@ def _event_to_action(event: Dict[str, Any], action_id: int) -> Optional[Action]: if kind not in ('commit', 'identity'): return None time_us = event.get('time_us') - if not isinstance(time_us, (int, float)) or time_us <= 0: + if not isinstance(time_us, int) or time_us <= 0: return None return Action( action_id=action_id, From 0c8f4620d6cb46d8a3a6e8f8c91d8d2d2024eb57 Mon Sep 17 00:00:00 2001 From: hailey Date: Wed, 6 May 2026 23:34:54 +0000 Subject: [PATCH 08/18] Re-lock uv.lock at revision 3 to match main and fix Docker build The two earlier commits on this branch that touched uv.lock (469219b, c2c0746) were generated with the older host uv, which downgraded the lockfile from rev 3 (main's format, with editable workspace sources) to rev 2 (with virtual workspace sources). Docker builds use a newer uv that requires rev 3, so `uv sync --locked` was failing in the worker image build. Regenerated with the uv shipped in the worker image so the lockfile again matches main's format. Source-only changes: revision bump, workspace members marked editable, and two unused linux_armv7l grpcio wheels dropped. Co-Authored-By: Claude Opus 4.7 (1M context) --- uv.lock | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/uv.lock b/uv.lock index a2c0b9f1..4c8915c1 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.11" resolution-markers = [ "python_full_version >= '4' and platform_machine == 'x86_64'", @@ -499,7 +499,7 @@ dependencies = [ [[package]] name = "example-atproto-plugins" version = "0.1.0" -source = { virtual = "example_atproto_plugins" } +source = { editable = "example_atproto_plugins" } dependencies = [ { name = "pluggy" }, { name = "websocket-client" }, @@ -514,7 +514,7 @@ requires-dist = [ [[package]] name = "example-plugins" version = "0.1.0" -source = { virtual = "example_plugins" } +source = { editable = "example_plugins" } dependencies = [ { name = "pluggy" }, ] @@ -941,7 +941,6 @@ dependencies = [ ] sdist = { url = "https://files.pythonhosted.org/packages/a3/1c/c42834d4fee45c5cf2d9546e97e879a1cbcdecfd16eb1a12144dcb91edae/grpcio-1.49.1.tar.gz", hash = "sha256:d4725fc9ec8e8822906ae26bb26f5546891aa7fbc3443de970cc556d43a5c99f", size = 22059239, upload-time = "2022-09-22T03:02:44.376Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/2d/e2/aaccddb8b06637625d847dbb5fe76ec3d15a74d89d983f5202f3666706e3/grpcio-1.49.1-cp311-cp311-linux_armv7l.whl", hash = "sha256:9fb17ff8c0d56099ac6ebfa84f670c5a62228d6b5c695cf21c02160c2ac1446b", size = 73399185, upload-time = "2022-09-22T02:57:56.219Z" }, { url = "https://files.pythonhosted.org/packages/90/0f/4d614d59f500835cbd27cb90743fb6b299098b0f22b8fd058d3586c933c0/grpcio-1.49.1-cp311-cp311-macosx_10_10_x86_64.whl", hash = "sha256:075f2d06e3db6b48a2157a1bcd52d6cbdca980dd18988fe6afdb41795d51625f", size = 4296299, upload-time = "2022-09-22T02:58:01.417Z" }, { url = "https://files.pythonhosted.org/packages/4d/ea/359a98f8b3b4ff9a2f457a0d20ed81775a64149fbb7617177ed23d9d10c9/grpcio-1.49.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dc79b2b37d779ac42341ddef40ad5bf0966a64af412c89fc2b062e3ddabb093f", size = 4656437, upload-time = "2022-09-22T02:58:06.23Z" }, { url = "https://files.pythonhosted.org/packages/fc/89/4952d2dff95f5b95db5943b2d1b55c82a485830b992f52f212b33616b523/grpcio-1.49.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:49b301740cf5bc8fed4fee4c877570189ae3951432d79fa8e524b09353659811", size = 4888051, upload-time = "2022-09-22T02:58:11.411Z" }, @@ -1070,7 +1069,6 @@ dependencies = [ ] sdist = { url = "https://files.pythonhosted.org/packages/6c/e4/3416d25aebc4477141a491fae2c9494c5e0437a706c59103a936aac7d072/grpcio-tools-1.49.1.tar.gz", hash = "sha256:84cc64e5b46bad43d5d7bd2fd772b656eba0366961187a847e908e2cb735db91", size = 2252679, upload-time = "2022-09-22T03:03:00.279Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/e4/c1/ba298fe650b67c9e31a7ad88b2fe1d8d22ff2c6a9e131604054835397dfc/grpcio_tools-1.49.1-cp311-cp311-linux_armv7l.whl", hash = "sha256:9e5c13809ab2f245398e8446c4c3b399a62d591db651e46806cccf52a700452e", size = 36912892, upload-time = "2022-09-22T03:00:51.237Z" }, { url = "https://files.pythonhosted.org/packages/9c/8b/a45a39bf7d1c4956d48179831e4da88c3f6ee14dbdcb273e575bbeb7de20/grpcio_tools-1.49.1-cp311-cp311-macosx_10_10_x86_64.whl", hash = "sha256:ab3d0ee9623720ee585fdf3753b3755d3144a4a8ae35bca8e3655fa2f41056be", size = 2025040, upload-time = "2022-09-22T03:00:55.219Z" }, { url = "https://files.pythonhosted.org/packages/6d/7f/89dc6036b91f8cbada98b06801ac2f5db60885000feaf88f9d7cabe665b7/grpcio_tools-1.49.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:13e13b3643e7577a3ec13b79689eb4d7548890b1e104c04b9ed6557a3c3dd452", size = 2370982, upload-time = "2022-09-22T03:00:59.807Z" }, { url = "https://files.pythonhosted.org/packages/01/98/4730bfff6bcd3163db8c3d70689e19a1a5f419152316edfc1f13ff06a5d7/grpcio_tools-1.49.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:a64bab81b220c50033f584f57978ebbea575f09c1ccee765cd5c462177988098", size = 2731915, upload-time = "2022-09-22T03:01:05.44Z" }, @@ -1588,7 +1586,7 @@ wheels = [ [[package]] name = "osprey-rpc" version = "0.1.0" -source = { virtual = "osprey_rpc" } +source = { editable = "osprey_rpc" } [[package]] name = "osprey-worker" @@ -1602,7 +1600,7 @@ dependencies = [ [package.metadata] requires-dist = [ { name = "flask-cors", specifier = ">=6.0.1" }, - { name = "osprey-rpc", virtual = "osprey_rpc" }, + { name = "osprey-rpc", editable = "osprey_rpc" }, ] [[package]] From 6bf2841d6e4732db277daecb91f85091fd972e1a Mon Sep 17 00:00:00 2001 From: hailey Date: Thu, 7 May 2026 21:18:04 +0000 Subject: [PATCH 09/18] Per-collection+operation action_names; UI default features per event MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Plugin emits action_name = '_' for commit events (create_post, delete_like, update_profile, ...) using the COLLECTION_NAMES map next to DEFAULT_COLLECTIONS, or 'identity' for identity events. Commits for unmapped collections or unexpected operations are skipped. Rules side adds three small JsonData feature models — IdentityHandle, LikeSubjectUri, FollowSubject — and imports them from main.sml so they evaluate for every action (None when the path doesn't resolve). example_atproto_rules/config/ui_config.yaml registers default_summary_features mapping action globs to the features the Osprey UI surfaces in the event stream — so each event type shows contextually relevant fields without rule-code changes. Co-Authored-By: Claude Opus 4.7 (1M context) --- example_atproto_plugins/README.md | 10 ++-- .../atproto_plugin/jetstream_input_stream.py | 38 +++++++++----- .../tests/test_jetstream_input_stream.py | 50 +++++++++++++++++-- example_atproto_rules/config/ui_config.yaml | 16 ++++++ example_atproto_rules/main.sml | 11 +++- example_atproto_rules/models/identity.sml | 6 +++ .../models/record/follow.sml | 6 +++ example_atproto_rules/models/record/like.sml | 6 +++ 8 files changed, 122 insertions(+), 21 deletions(-) create mode 100644 example_atproto_rules/config/ui_config.yaml create mode 100644 example_atproto_rules/models/identity.sml create mode 100644 example_atproto_rules/models/record/follow.sml create mode 100644 example_atproto_rules/models/record/like.sml diff --git a/example_atproto_plugins/README.md b/example_atproto_plugins/README.md index 37f87ed8..88377e36 100644 --- a/example_atproto_plugins/README.md +++ b/example_atproto_plugins/README.md @@ -28,9 +28,9 @@ This brings up the full Osprey local stack (Druid, Postgres, Bigtable, MinIO, Ka ## Action shape -The JetStream JSON event is passed through unchanged as the Action's `data` dict, so rules read JetStream-native paths directly. `action_name` is set to the event's `kind`. +The JetStream JSON event is passed through unchanged as the Action's `data` dict, so rules read JetStream-native paths directly. `action_name` is `_` for commit events (`create_post`, `delete_like`, `update_profile`, …) using the short names defined in `COLLECTION_NAMES`, or `identity` for identity events. -### Commit events (`action_name='commit'`) +### Commit events (e.g. `create_post`, `delete_like`) ``` { @@ -59,7 +59,11 @@ The JetStream JSON event is passed through unchanged as the Action's `data` dict } ``` -Account events and any unrecognised kind are skipped. +Account events, commits for collections not in `COLLECTION_NAMES`, and commits with operations other than `create` / `update` / `delete` are skipped. + +### UI default features + +`example_atproto_rules/config/ui_config.yaml` declares the per-action default features the Osprey UI surfaces in the event stream — e.g. `PostText` for `create_post`, `IdentityHandle` for `identity`, `LikeSubjectUri` for like events. Add new entries there to expose more fields without touching rule code. `action_id` is set to 0; the sink's fallback path (`RulesSink`) generates a real snowflake ID during classification. diff --git a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py index 9a2b8eaf..0348fb2a 100644 --- a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py +++ b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py @@ -17,13 +17,14 @@ DEFAULT_ENDPOINT = 'wss://jetstream2.us-west.bsky.network/subscribe' -DEFAULT_COLLECTIONS = ( - 'app.bsky.feed.post', - 'app.bsky.feed.like', - 'app.bsky.feed.repost', - 'app.bsky.graph.follow', - 'app.bsky.actor.profile', -) +COLLECTION_NAMES = { + 'app.bsky.feed.post': 'post', + 'app.bsky.feed.like': 'like', + 'app.bsky.feed.repost': 'repost', + 'app.bsky.graph.follow': 'follow', + 'app.bsky.actor.profile': 'profile', +} +DEFAULT_COLLECTIONS = tuple(COLLECTION_NAMES.keys()) class JetStreamInputStream(BaseInputStream[BaseAckingContext[Action]]): @@ -36,8 +37,9 @@ class JetStreamInputStream(BaseInputStream[BaseAckingContext[Action]]): The JetStream JSON event is passed through unchanged as the Action's data dict, so rules target JetStream-native paths directly: ``$.did``, ``$.kind``, ``$.commit.operation``, ``$.commit.collection``, ``$.commit.record.text``, ``$.identity.handle``, etc. The - ``action_name`` is set to the event's ``kind`` (``'commit'`` / ``'identity'``). Account - events are skipped — the sample is concerned with content events. + ``action_name`` is ``_`` for commit events (e.g. ``create_post``, + ``delete_like``) using the short names in :data:`COLLECTION_NAMES`, or ``'identity'`` + for identity events. Account events and commits for unmapped collections are skipped. """ def __init__( @@ -106,9 +108,10 @@ def _event_to_action(event: Dict[str, Any], action_id: int) -> Optional[Action]: """Wrap a JetStream event as an Osprey :class:`Action`, or return ``None`` to skip it. The event JSON is passed through as-is so rules can read JetStream-native paths - (``$.did``, ``$.commit.collection``, ``$.commit.record.text``, etc.). ``action_name`` - is set to the event's ``kind``. Only ``commit`` and ``identity`` events are emitted — - account events and any unrecognised kind are skipped. + (``$.did``, ``$.commit.collection``, ``$.commit.record.text``, etc.). For commit + events ``action_name`` is ``_`` (e.g. ``create_post``); for + identity events it is ``'identity'``. Account events, commits for collections not + in :data:`COLLECTION_NAMES`, and commits with unexpected operations are skipped. """ kind = event.get('kind') if kind not in ('commit', 'identity'): @@ -116,9 +119,18 @@ def _event_to_action(event: Dict[str, Any], action_id: int) -> Optional[Action]: time_us = event.get('time_us') if not isinstance(time_us, int) or time_us <= 0: return None + if kind == 'commit': + commit = event.get('commit') or {} + operation = commit.get('operation') + short = COLLECTION_NAMES.get(commit.get('collection', '')) + if short is None or operation not in ('create', 'update', 'delete'): + return None + action_name = f'{operation}_{short}' + else: + action_name = 'identity' return Action( action_id=action_id, - action_name=kind, + action_name=action_name, data=event, timestamp=datetime.fromtimestamp(time_us / 1_000_000, tz=timezone.utc), ) diff --git a/example_atproto_plugins/tests/test_jetstream_input_stream.py b/example_atproto_plugins/tests/test_jetstream_input_stream.py index d1662052..a90668ea 100644 --- a/example_atproto_plugins/tests/test_jetstream_input_stream.py +++ b/example_atproto_plugins/tests/test_jetstream_input_stream.py @@ -1,5 +1,6 @@ from unittest import mock +import pytest from atproto_plugin.jetstream_input_stream import JetStreamInputStream, _event_to_action from websocket import WebSocketConnectionClosedException @@ -27,7 +28,7 @@ def test_post_commit_passes_through_jetstream_payload(): assert action is not None assert action.action_id == 42 - assert action.action_name == 'commit' + assert action.action_name == 'create_post' # Action.data is the raw JetStream event — rules read $.did, $.commit.*, etc. assert action.data is SAMPLE_POST_COMMIT assert action.data['did'] == 'did:plc:abc123' @@ -48,7 +49,7 @@ def test_like_commit_passes_through_jetstream_payload(): action = _event_to_action(like, action_id=1) assert action is not None - assert action.action_name == 'commit' + assert action.action_name == 'create_like' assert action.data['commit']['collection'] == 'app.bsky.feed.like' assert 'text' not in action.data['commit']['record'] @@ -66,7 +67,7 @@ def test_delete_commit_passes_through_without_record(): action = _event_to_action(delete, action_id=2) assert action is not None - assert action.action_name == 'commit' + assert action.action_name == 'delete_post' assert action.data['commit']['operation'] == 'delete' assert 'record' not in action.data['commit'] @@ -115,6 +116,47 @@ def test_unknown_kind_is_skipped(): assert _event_to_action(weird, action_id=6) is None +@pytest.mark.parametrize( + 'collection,operation,expected_name', + [ + ('app.bsky.feed.post', 'create', 'create_post'), + ('app.bsky.feed.post', 'update', 'update_post'), + ('app.bsky.feed.post', 'delete', 'delete_post'), + ('app.bsky.feed.like', 'create', 'create_like'), + ('app.bsky.feed.like', 'delete', 'delete_like'), + ('app.bsky.feed.repost', 'create', 'create_repost'), + ('app.bsky.feed.repost', 'delete', 'delete_repost'), + ('app.bsky.graph.follow', 'create', 'create_follow'), + ('app.bsky.graph.follow', 'delete', 'delete_follow'), + ('app.bsky.actor.profile', 'update', 'update_profile'), + ], +) +def test_commit_action_name_combines_operation_and_collection(collection, operation, expected_name): + event = { + **SAMPLE_POST_COMMIT, + 'commit': {**SAMPLE_POST_COMMIT['commit'], 'collection': collection, 'operation': operation}, + } + action = _event_to_action(event, action_id=1) + assert action is not None + assert action.action_name == expected_name + + +def test_commit_for_unmapped_collection_is_skipped(): + event = { + **SAMPLE_POST_COMMIT, + 'commit': {**SAMPLE_POST_COMMIT['commit'], 'collection': 'app.bsky.graph.starterpack'}, + } + assert _event_to_action(event, action_id=1) is None + + +def test_commit_with_unexpected_operation_is_skipped(): + event = { + **SAMPLE_POST_COMMIT, + 'commit': {**SAMPLE_POST_COMMIT['commit'], 'operation': 'wat'}, + } + assert _event_to_action(event, action_id=1) is None + + def test_build_url_includes_wanted_collections(): stream = JetStreamInputStream( endpoint='wss://example.com/sub', @@ -165,6 +207,6 @@ def close(self): assert len(actions) == 1 assert actions[0]._item.action_id == 0 - assert actions[0]._item.action_name == 'commit' + assert actions[0]._item.action_name == 'create_post' assert actions[0]._item.data['did'] == 'did:plc:x' assert fake_ws.closed diff --git a/example_atproto_rules/config/ui_config.yaml b/example_atproto_rules/config/ui_config.yaml new file mode 100644 index 00000000..db8d5cb8 --- /dev/null +++ b/example_atproto_rules/config/ui_config.yaml @@ -0,0 +1,16 @@ +ui_config: + default_summary_features: + - actions: ['*'] + features: [UserId] + - actions: ['create_post', 'update_post'] + features: [PostText, Rkey] + - actions: ['delete_post'] + features: [Rkey] + - actions: ['create_like', 'delete_like'] + features: [LikeSubjectUri, Rkey] + - actions: ['create_follow', 'delete_follow'] + features: [FollowSubject, Rkey] + - actions: ['create_repost', 'delete_repost'] + features: [Rkey] + - actions: ['identity'] + features: [IdentityHandle] diff --git a/example_atproto_rules/main.sml b/example_atproto_rules/main.sml index d22a8b63..995d92d7 100644 --- a/example_atproto_rules/main.sml +++ b/example_atproto_rules/main.sml @@ -1,3 +1,12 @@ -Import(rules=['models/base.sml']) +Import( + rules=[ + 'models/base.sml', + 'models/identity.sml', + 'models/record/base.sml', + 'models/record/follow.sml', + 'models/record/like.sml', + 'models/record/post.sml', + ], +) Require(rule='rules/index.sml') diff --git a/example_atproto_rules/models/identity.sml b/example_atproto_rules/models/identity.sml new file mode 100644 index 00000000..73be9cce --- /dev/null +++ b/example_atproto_rules/models/identity.sml @@ -0,0 +1,6 @@ +Import(rules=['models/base.sml']) + +IdentityHandle: str = JsonData( + path='$.identity.handle', + required=False, +) diff --git a/example_atproto_rules/models/record/follow.sml b/example_atproto_rules/models/record/follow.sml new file mode 100644 index 00000000..f020f620 --- /dev/null +++ b/example_atproto_rules/models/record/follow.sml @@ -0,0 +1,6 @@ +Import(rules=['models/base.sml']) + +FollowSubject: str = JsonData( + path='$.commit.record.subject', + required=False, +) diff --git a/example_atproto_rules/models/record/like.sml b/example_atproto_rules/models/record/like.sml new file mode 100644 index 00000000..790513e2 --- /dev/null +++ b/example_atproto_rules/models/record/like.sml @@ -0,0 +1,6 @@ +Import(rules=['models/base.sml']) + +LikeSubjectUri: str = JsonData( + path='$.commit.record.subject.uri', + required=False, +) From 74ae20d9279724c5b0db97efd60a4b431dc4b0eb Mon Sep 17 00:00:00 2001 From: hailey Date: Thu, 7 May 2026 22:06:58 +0000 Subject: [PATCH 10/18] Mint action_ids in plugin to fix UI duplication MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The cycle-1 IMP-4 review change passed action_id=0 expecting RulesSink.run's fallback to mint a snowflake. The fallback's condition is `if not action.action_id and action.action_id != 0`, which short- circuits to False when action_id is exactly 0 — so action_id stays 0 forever. Every event landed in MinIO storage keyed action_id=0, overwriting the previous one. Druid retained N distinct rows but the event-scan UI fetched by action_id, returning the same MinIO object N times — hence the duplicate-event rendering. Restoring the local snowflake batch (250 ids per snowflake-id-worker call) so each event gets a distinct id. README updated to surface the SNOWFLAKE_API_ENDPOINT dependency. Co-Authored-By: Claude Opus 4.7 (1M context) --- example_atproto_plugins/README.md | 2 +- .../src/atproto_plugin/jetstream_input_stream.py | 11 ++++++++++- .../tests/test_jetstream_input_stream.py | 4 +++- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/example_atproto_plugins/README.md b/example_atproto_plugins/README.md index 88377e36..ff6ebe1d 100644 --- a/example_atproto_plugins/README.md +++ b/example_atproto_plugins/README.md @@ -65,7 +65,7 @@ Account events, commits for collections not in `COLLECTION_NAMES`, and commits w `example_atproto_rules/config/ui_config.yaml` declares the per-action default features the Osprey UI surfaces in the event stream — e.g. `PostText` for `create_post`, `IdentityHandle` for `identity`, `LikeSubjectUri` for like events. Add new entries there to expose more fields without touching rule code. -`action_id` is set to 0; the sink's fallback path (`RulesSink`) generates a real snowflake ID during classification. +`action_id` is minted from `snowflake-id-worker` in batches of 250. The plugin therefore needs `SNOWFLAKE_API_ENDPOINT` to be set (the local docker-compose stack provides it). ## Caveats diff --git a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py index 0348fb2a..d531f527 100644 --- a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py +++ b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py @@ -9,6 +9,7 @@ from osprey.engine.executor.execution_context import Action from osprey.worker.lib.instruments import metrics from osprey.worker.lib.osprey_shared.logging import get_logger +from osprey.worker.lib.snowflake import generate_snowflake_batch from osprey.worker.sinks.sink.input_stream import BaseInputStream from osprey.worker.sinks.utils.acking_contexts import BaseAckingContext, NoopAckingContext from websocket import WebSocketConnectionClosedException @@ -25,6 +26,7 @@ 'app.bsky.actor.profile': 'profile', } DEFAULT_COLLECTIONS = tuple(COLLECTION_NAMES.keys()) +SNOWFLAKE_BATCH_SIZE = 250 class JetStreamInputStream(BaseInputStream[BaseAckingContext[Action]]): @@ -53,6 +55,13 @@ def __init__( self._wanted_collections = list(wanted_collections) if wanted_collections else list(DEFAULT_COLLECTIONS) self._reconnect_seconds = reconnect_seconds self._last_time_us: Optional[int] = None + self._snowflake_buffer: List[int] = [] + + def _next_action_id(self) -> int: + if not self._snowflake_buffer: + batch = generate_snowflake_batch(count=SNOWFLAKE_BATCH_SIZE, retries=3) + self._snowflake_buffer = [s.to_int() for s in batch] + return self._snowflake_buffer.pop() def _build_url(self) -> str: params = [('wantedCollections', c) for c in self._wanted_collections] @@ -85,7 +94,7 @@ def _stream_one_connection(self, url: str) -> Iterator[BaseAckingContext[Action] continue try: event = json.loads(raw) - action = _event_to_action(event, action_id=0) + action = _event_to_action(event, action_id=self._next_action_id()) except Exception: logger.exception('skipping malformed JetStream event') sentry_sdk.capture_exception() diff --git a/example_atproto_plugins/tests/test_jetstream_input_stream.py b/example_atproto_plugins/tests/test_jetstream_input_stream.py index a90668ea..47ef8f76 100644 --- a/example_atproto_plugins/tests/test_jetstream_input_stream.py +++ b/example_atproto_plugins/tests/test_jetstream_input_stream.py @@ -201,12 +201,14 @@ def close(self): fake_ws = FakeWebSocket() stream = JetStreamInputStream(reconnect_seconds=0.1) + # Pre-fill so _next_action_id() doesn't hit the snowflake-id-worker. + stream._snowflake_buffer = [12345] with mock.patch('websocket.create_connection', return_value=fake_ws): actions = list(stream._stream_one_connection('wss://example.com/sub')) assert len(actions) == 1 - assert actions[0]._item.action_id == 0 + assert actions[0]._item.action_id == 12345 assert actions[0]._item.action_name == 'create_post' assert actions[0]._item.data['did'] == 'did:plc:x' assert fake_ws.closed From b650d9480f03e307ee62b6728cbe9cfdf906af02 Mon Sep 17 00:00:00 2001 From: hailey Date: Thu, 7 May 2026 22:37:50 +0000 Subject: [PATCH 11/18] Unify FollowSubject + LikeSubjectUri into one Subject feature MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Old shape evaluated FollowSubject ($.commit.record.subject as str) for every event. For like/repost records, the path resolves to a {uri, cid} dict, the str type check fails, and the engine increments __error_count by 1 on every like/repost — likely also what was breaking the timeseries chart, since errored events get aggregated differently. Replace with a single Subject feature in models/record/base.sml that prefers $.commit.record.subject.uri (the dict-shape) and falls back to $.commit.record.subject coerced to str (the follow DID-string shape) via ResolveOptional. Drop the per-collection follow.sml / like.sml files and update ui_config.yaml so likes / reposts / follows all reference the unified Subject. Co-Authored-By: Claude Opus 4.7 (1M context) --- example_atproto_plugins/README.md | 2 +- example_atproto_rules/config/ui_config.yaml | 8 ++------ example_atproto_rules/main.sml | 2 -- example_atproto_rules/models/record/base.sml | 17 +++++++++++++++++ example_atproto_rules/models/record/follow.sml | 6 ------ example_atproto_rules/models/record/like.sml | 6 ------ 6 files changed, 20 insertions(+), 21 deletions(-) delete mode 100644 example_atproto_rules/models/record/follow.sml delete mode 100644 example_atproto_rules/models/record/like.sml diff --git a/example_atproto_plugins/README.md b/example_atproto_plugins/README.md index ff6ebe1d..ed93a338 100644 --- a/example_atproto_plugins/README.md +++ b/example_atproto_plugins/README.md @@ -63,7 +63,7 @@ Account events, commits for collections not in `COLLECTION_NAMES`, and commits w ### UI default features -`example_atproto_rules/config/ui_config.yaml` declares the per-action default features the Osprey UI surfaces in the event stream — e.g. `PostText` for `create_post`, `IdentityHandle` for `identity`, `LikeSubjectUri` for like events. Add new entries there to expose more fields without touching rule code. +`example_atproto_rules/config/ui_config.yaml` declares the per-action default features the Osprey UI surfaces in the event stream — e.g. `PostText` for `create_post`, `IdentityHandle` for `identity`, `Subject` for like / repost / follow events. Add new entries there to expose more fields without touching rule code. `action_id` is minted from `snowflake-id-worker` in batches of 250. The plugin therefore needs `SNOWFLAKE_API_ENDPOINT` to be set (the local docker-compose stack provides it). diff --git a/example_atproto_rules/config/ui_config.yaml b/example_atproto_rules/config/ui_config.yaml index db8d5cb8..8b1d6036 100644 --- a/example_atproto_rules/config/ui_config.yaml +++ b/example_atproto_rules/config/ui_config.yaml @@ -6,11 +6,7 @@ ui_config: features: [PostText, Rkey] - actions: ['delete_post'] features: [Rkey] - - actions: ['create_like', 'delete_like'] - features: [LikeSubjectUri, Rkey] - - actions: ['create_follow', 'delete_follow'] - features: [FollowSubject, Rkey] - - actions: ['create_repost', 'delete_repost'] - features: [Rkey] + - actions: ['create_like', 'delete_like', 'create_repost', 'delete_repost', 'create_follow', 'delete_follow'] + features: [Subject, Rkey] - actions: ['identity'] features: [IdentityHandle] diff --git a/example_atproto_rules/main.sml b/example_atproto_rules/main.sml index 995d92d7..63227315 100644 --- a/example_atproto_rules/main.sml +++ b/example_atproto_rules/main.sml @@ -3,8 +3,6 @@ Import( 'models/base.sml', 'models/identity.sml', 'models/record/base.sml', - 'models/record/follow.sml', - 'models/record/like.sml', 'models/record/post.sml', ], ) diff --git a/example_atproto_rules/models/record/base.sml b/example_atproto_rules/models/record/base.sml index e0b4f315..f768b31a 100644 --- a/example_atproto_rules/models/record/base.sml +++ b/example_atproto_rules/models/record/base.sml @@ -18,3 +18,20 @@ Cid: str = JsonData( path='$.commit.cid', required=False, ) + +# Some collections (like, repost) have `record.subject = {uri, cid}`; others (follow) +# have `record.subject = "did:plc:..."`. Prefer the URI when present, else fall back to +# the raw subject value (coerced to str — for follows that's the DID; for the dict shape +# the URI path resolves first so the coerced repr is never used). +SubjectUri: Optional[str] = JsonData( + path='$.commit.record.subject.uri', + required=False, +) + +SubjectRaw: Optional[str] = JsonData( + path='$.commit.record.subject', + required=False, + coerce_type=True, +) + +Subject: str = ResolveOptional(optional_value=SubjectUri, default_value=SubjectRaw) diff --git a/example_atproto_rules/models/record/follow.sml b/example_atproto_rules/models/record/follow.sml deleted file mode 100644 index f020f620..00000000 --- a/example_atproto_rules/models/record/follow.sml +++ /dev/null @@ -1,6 +0,0 @@ -Import(rules=['models/base.sml']) - -FollowSubject: str = JsonData( - path='$.commit.record.subject', - required=False, -) diff --git a/example_atproto_rules/models/record/like.sml b/example_atproto_rules/models/record/like.sml deleted file mode 100644 index 790513e2..00000000 --- a/example_atproto_rules/models/record/like.sml +++ /dev/null @@ -1,6 +0,0 @@ -Import(rules=['models/base.sml']) - -LikeSubjectUri: str = JsonData( - path='$.commit.record.subject.uri', - required=False, -) From 58d1bcf07fb4ad37c90a0427c61ddbdfe9304ac2 Mon Sep 17 00:00:00 2001 From: hailey Date: Thu, 7 May 2026 23:09:08 +0000 Subject: [PATCH 12/18] wording tweaks --- example_atproto_plugins/README.md | 11 ++++------- .../atproto_plugin/jetstream_input_stream.py | 18 ++++++------------ 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/example_atproto_plugins/README.md b/example_atproto_plugins/README.md index ed93a338..835bc7e7 100644 --- a/example_atproto_plugins/README.md +++ b/example_atproto_plugins/README.md @@ -1,11 +1,9 @@ # example_atproto_plugins -A sample Osprey plugin that consumes Bluesky's [JetStream](https://docs.bsky.app/blog/jetstream) — the public ATProto firehose — as the input event source. - -This is the missing companion to `example_plugins/`: it shows how a real-world adopter wires Osprey up to their own platform's events. Out of the box it gives you: +A sample Osprey plugin that consumes ATProto's [JetStream](https://docs.bsky.app/blog/jetstream) as the input event source. It gives you: - a `register_input_stream` hook implementation that subscribes to JetStream over WebSocket and yields Osprey `Action`s with the JetStream JSON event passed through as-is, -- realistic per-second event volume from the live Bluesky network — useful for load and soak testing changes that the synthetic 1-event/second producer doesn't exercise, +- realistic per-second event volume from the live Bluesky network, which is useful for load and soak testing changes that the synthetic 1-event/second producer doesn't exercise, - a companion `example_atproto_rules/` tree showing how to organize rules against ATProto event shapes, with file structure modeled on [haileyok/atproto-ruleset](https://github.com/haileyok/atproto-ruleset). ## Running @@ -16,7 +14,7 @@ From the repo root: ./run-atproto.sh ``` -This brings up the full Osprey local stack (Druid, Postgres, Bigtable, MinIO, Kafka) along with a JetStream WebSocket override, and swaps the worker's input source from Kafka to the JetStream plugin, pointing it at `example_atproto_rules` instead of `example_rules`. First-run startup takes a few minutes. +This brings up the full Osprey local stack (Druid, Postgres, MinIO, Kafka) along with a JetStream websocket override, and swaps the worker's input source from Kafka to the JetStream plugin, pointing it at `example_atproto_rules` instead of `example_rules`. First-run startup takes a few minutes. ## Configuration @@ -69,7 +67,6 @@ Account events, commits for collections not in `COLLECTION_NAMES`, and commits w ## Caveats -- **Not production-ready.** No durable cursor on process restart, no zstd compression, no DID-level filtering. Good for sample / load-testing purposes; not a drop-in for a Bluesky deployment. In-process reconnect resumes from last seen event via socket-level cursor. +- **Not production-ready.** No durable cursor on process restart, no zstd compression, no DID-level filtering. Good for sample / load-testing purposes; not a drop-in for a real ATProto deployment. - **No event enrichment.** JetStream only carries what's in the commit itself; rulesets that depend on handle / profile / account age (such as much of [atproto-ruleset](https://github.com/haileyok/atproto-ruleset)) are fed by a separate enrichment pipeline, not JetStream directly. This plugin emits JetStream-native paths ($.did, $.commit.collection, etc.); enrichment-fed rulesets would need an enrichment service in front of this one or a different plugin. - **No application-level keepalive.** Connection health is monitored via socket read timeout (30s). A stalled connection will be detected and trigger reconnection. -- **Schema drift.** Bluesky has not committed to JetStream as a stable long-term API. Treat this as illustrative. diff --git a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py index d531f527..0ab5eb78 100644 --- a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py +++ b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py @@ -30,18 +30,11 @@ class JetStreamInputStream(BaseInputStream[BaseAckingContext[Action]]): - """Subscribes to Bluesky's ATProto JetStream WebSocket and yields Osprey Actions. + """An Osprey event input stream that subscribes to the ATProto JetStream websocket and yields + Osprey actions. - JetStream is Bluesky's public real-time firehose for ATProto network activity emitted as - plain JSON (no CBOR/CAR), making it a convenient high-volume source for exercising Osprey - against real production traffic. See https://docs.bsky.app/blog/jetstream. - - The JetStream JSON event is passed through unchanged as the Action's data dict, so rules - target JetStream-native paths directly: ``$.did``, ``$.kind``, ``$.commit.operation``, - ``$.commit.collection``, ``$.commit.record.text``, ``$.identity.handle``, etc. The - ``action_name`` is ``_`` for commit events (e.g. ``create_post``, - ``delete_like``) using the short names in :data:`COLLECTION_NAMES`, or ``'identity'`` - for identity events. Account events and commits for unmapped collections are skipped. + The JetStream JSON event is passed through unchanged as the Action's data dict, so rules may + target the JetStream-native paths directly, like $.did, $.kind, or $.commit.operation. """ def __init__( @@ -114,7 +107,8 @@ def _stream_one_connection(self, url: str) -> Iterator[BaseAckingContext[Action] def _event_to_action(event: Dict[str, Any], action_id: int) -> Optional[Action]: - """Wrap a JetStream event as an Osprey :class:`Action`, or return ``None`` to skip it. + """Wraps a JetStream event as an Osprey action, or returns None if it should be skipped. + """ The event JSON is passed through as-is so rules can read JetStream-native paths (``$.did``, ``$.commit.collection``, ``$.commit.record.text``, etc.). For commit From 853aa74d3181dfed24d9f2322d990a68d55b1dc5 Mon Sep 17 00:00:00 2001 From: hailey Date: Thu, 7 May 2026 23:12:49 +0000 Subject: [PATCH 13/18] fix --- .../src/atproto_plugin/jetstream_input_stream.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py index 0ab5eb78..33789d94 100644 --- a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py +++ b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py @@ -107,15 +107,7 @@ def _stream_one_connection(self, url: str) -> Iterator[BaseAckingContext[Action] def _event_to_action(event: Dict[str, Any], action_id: int) -> Optional[Action]: - """Wraps a JetStream event as an Osprey action, or returns None if it should be skipped. - """ - - The event JSON is passed through as-is so rules can read JetStream-native paths - (``$.did``, ``$.commit.collection``, ``$.commit.record.text``, etc.). For commit - events ``action_name`` is ``_`` (e.g. ``create_post``); for - identity events it is ``'identity'``. Account events, commits for collections not - in :data:`COLLECTION_NAMES`, and commits with unexpected operations are skipped. - """ + """Wraps a JetStream event as an Osprey action, or returns None if it should be skipped.""" kind = event.get('kind') if kind not in ('commit', 'identity'): return None From 2cdb46205563facb2ba50fe9d6a26b4f6ac1497b Mon Sep 17 00:00:00 2001 From: hailey Date: Fri, 8 May 2026 01:22:23 +0000 Subject: [PATCH 14/18] add a changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c7d7677b..3b24b478 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add `ParseInt` UDF — converts a numeric string to an integer ([#190](https://github.com/roostorg/osprey/pull/190) by [@bealsbe](https://github.com/bealsbe)) - Add `StringSlice` UDF which extracts a substring by index range ([#189](https://github.com/roostorg/osprey/pull/189) by [@bealsbe](https://github.com/bealsbe)) - Add `InExperiment` UDF which checks if an entity is in an experiment ([#203](https://github.com/roostorg/osprey/pull/203) by [@bealsbe](https://github.com/bealsbe)) +- Add ATProto JetStream example plugins and rules ([#236](https://github.com/roostorg/osprey/pull/236) by [@haileyok](https://github.com/haileyok)) ### 🐛 Bug fixes - Default to selecting all for event stream ([#194](https://github.com/roostorg/osprey/pull/194) by [@chimosky](https://github.com/chimosky)) From a591e1c68c60602981f4eee534690c8cb917dc8d Mon Sep 17 00:00:00 2001 From: hailey Date: Fri, 8 May 2026 03:16:13 +0000 Subject: [PATCH 15/18] Switch to WebSocketApp.run_forever for keepalive websocket.create_connection returns the low-level socket which doesn't auto-PING; the only thing keeping the connection alive was JetStream's volume plus the read timeout. Use WebSocketApp.run_forever with ping_interval=20 and ping_timeout=10 instead, and bridge the callback API into _gen via a gevent.queue.Queue. greenlet.link pushes a 'done' sentinel as a safety net so the generator never blocks if run_forever exits without firing on_close. Co-Authored-By: Claude Opus 4.7 (1M context) --- example_atproto_plugins/README.md | 2 +- .../atproto_plugin/jetstream_input_stream.py | 50 +++++++++++++++---- .../tests/test_jetstream_input_stream.py | 40 +++++++++------ 3 files changed, 67 insertions(+), 25 deletions(-) diff --git a/example_atproto_plugins/README.md b/example_atproto_plugins/README.md index 835bc7e7..c844a43e 100644 --- a/example_atproto_plugins/README.md +++ b/example_atproto_plugins/README.md @@ -69,4 +69,4 @@ Account events, commits for collections not in `COLLECTION_NAMES`, and commits w - **Not production-ready.** No durable cursor on process restart, no zstd compression, no DID-level filtering. Good for sample / load-testing purposes; not a drop-in for a real ATProto deployment. - **No event enrichment.** JetStream only carries what's in the commit itself; rulesets that depend on handle / profile / account age (such as much of [atproto-ruleset](https://github.com/haileyok/atproto-ruleset)) are fed by a separate enrichment pipeline, not JetStream directly. This plugin emits JetStream-native paths ($.did, $.commit.collection, etc.); enrichment-fed rulesets would need an enrichment service in front of this one or a different plugin. -- **No application-level keepalive.** Connection health is monitored via socket read timeout (30s). A stalled connection will be detected and trigger reconnection. +- **Connection health.** WebSocket-level PING/PONG keepalive runs every 20s with a 10s pong timeout (`websocket-client`'s `WebSocketApp.run_forever(ping_interval, ping_timeout)`). A stalled or dead connection is detected within ~30s and triggers a reconnect from the last seen `time_us` cursor. diff --git a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py index 33789d94..85743332 100644 --- a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py +++ b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py @@ -4,15 +4,16 @@ from typing import Any, Dict, Iterator, List, Optional from urllib.parse import urlencode +import gevent import sentry_sdk import websocket +from gevent.queue import Queue from osprey.engine.executor.execution_context import Action from osprey.worker.lib.instruments import metrics from osprey.worker.lib.osprey_shared.logging import get_logger from osprey.worker.lib.snowflake import generate_snowflake_batch from osprey.worker.sinks.sink.input_stream import BaseInputStream from osprey.worker.sinks.utils.acking_contexts import BaseAckingContext, NoopAckingContext -from websocket import WebSocketConnectionClosedException logger = get_logger() @@ -27,6 +28,8 @@ } DEFAULT_COLLECTIONS = tuple(COLLECTION_NAMES.keys()) SNOWFLAKE_BATCH_SIZE = 250 +PING_INTERVAL_SECONDS = 20 +PING_TIMEOUT_SECONDS = 10 class JetStreamInputStream(BaseInputStream[BaseAckingContext[Action]]): @@ -73,15 +76,43 @@ def _gen(self) -> Iterator[BaseAckingContext[Action]]: time.sleep(self._reconnect_seconds) def _stream_one_connection(self, url: str) -> Iterator[BaseAckingContext[Action]]: + # WebSocketApp drives PING/PONG keepalive on its own greenlet; we bridge its + # callback API into this generator via a gevent.queue.Queue. The 'done' sentinel + # is pushed by on_close/on_error, and also as a safety net by greenlet.link in + # case run_forever exits without firing on_close (e.g., uncaught exception). + queue: 'Queue[tuple[str, Optional[bytes]]]' = Queue() + + def on_open(ws: Any) -> None: + logger.info('JetStream connection established') + + def on_message(ws: Any, raw: Any) -> None: + queue.put(('message', raw)) + + def on_close(ws: Any, status: Any, msg: Any) -> None: + logger.info(f'JetStream connection closed (status={status}); will reconnect') + queue.put(('done', None)) + + def on_error(ws: Any, err: Any) -> None: + logger.warning(f'JetStream connection error: {err}; will reconnect') + queue.put(('done', None)) + logger.info(f'Connecting to JetStream at {url}') - ws = websocket.create_connection(url, timeout=30) - logger.info('JetStream connection established') + app = websocket.WebSocketApp( + url, + on_open=on_open, + on_message=on_message, + on_close=on_close, + on_error=on_error, + ) + runner = gevent.spawn( + app.run_forever, ping_interval=PING_INTERVAL_SECONDS, ping_timeout=PING_TIMEOUT_SECONDS + ) + runner.link(lambda _g: queue.put(('done', None))) + try: while True: - try: - raw = ws.recv() - except WebSocketConnectionClosedException: - logger.info('JetStream connection closed; will reconnect') + kind, raw = queue.get() + if kind == 'done': return if not raw: continue @@ -101,9 +132,10 @@ def _stream_one_connection(self, url: str) -> Iterator[BaseAckingContext[Action] yield NoopAckingContext(action) finally: try: - ws.close() + app.close() except Exception: - logger.info('ignored error while closing JetStream socket', exc_info=True) + logger.info('ignored error while closing JetStream WebSocketApp', exc_info=True) + runner.join(timeout=5) def _event_to_action(event: Dict[str, Any], action_id: int) -> Optional[Action]: diff --git a/example_atproto_plugins/tests/test_jetstream_input_stream.py b/example_atproto_plugins/tests/test_jetstream_input_stream.py index 47ef8f76..f8c43e3b 100644 --- a/example_atproto_plugins/tests/test_jetstream_input_stream.py +++ b/example_atproto_plugins/tests/test_jetstream_input_stream.py @@ -2,7 +2,6 @@ import pytest from atproto_plugin.jetstream_input_stream import JetStreamInputStream, _event_to_action -from websocket import WebSocketConnectionClosedException SAMPLE_POST_COMMIT = { 'did': 'did:plc:abc123', @@ -182,33 +181,44 @@ def test_build_url_includes_cursor_when_last_time_us_set(): def test_stream_one_connection_handles_malformed_json_and_gracefully_closes(): - class FakeWebSocket: - def __init__(self): + class FakeWebSocketApp: + def __init__(self, url, on_open=None, on_message=None, on_close=None, on_error=None): + self.url = url + self.on_open = on_open + self.on_message = on_message + self.on_close = on_close self.closed = False - self.recv_calls = 0 - def recv(self): - self.recv_calls += 1 - if self.recv_calls == 1: - return b'not-json' - elif self.recv_calls == 2: - return b'{"did": "did:plc:x", "time_us": 1714500000000000, "kind": "commit", "commit": {"operation": "create", "collection": "app.bsky.feed.post", "rkey": "abc", "rev": "3kf...", "cid": "bafy...", "record": {"$type": "app.bsky.feed.post", "text": "hello"}}}' - else: - raise WebSocketConnectionClosedException() + def run_forever(self, **kwargs): + self.on_open(self) + self.on_message(self, b'not-json') + self.on_message( + self, + b'{"did": "did:plc:x", "time_us": 1714500000000000, "kind": "commit",' + b' "commit": {"operation": "create", "collection": "app.bsky.feed.post",' + b' "rkey": "abc", "rev": "3kf...", "cid": "bafy...",' + b' "record": {"$type": "app.bsky.feed.post", "text": "hello"}}}', + ) + self.on_close(self, 1000, 'normal') def close(self): self.closed = True - fake_ws = FakeWebSocket() + captured = {} + + def factory(*args, **kwargs): + captured['app'] = FakeWebSocketApp(*args, **kwargs) + return captured['app'] + stream = JetStreamInputStream(reconnect_seconds=0.1) # Pre-fill so _next_action_id() doesn't hit the snowflake-id-worker. stream._snowflake_buffer = [12345] - with mock.patch('websocket.create_connection', return_value=fake_ws): + with mock.patch('websocket.WebSocketApp', factory): actions = list(stream._stream_one_connection('wss://example.com/sub')) assert len(actions) == 1 assert actions[0]._item.action_id == 12345 assert actions[0]._item.action_name == 'create_post' assert actions[0]._item.data['did'] == 'did:plc:x' - assert fake_ws.closed + assert captured['app'].closed From bd44b1d3376c908ff8fbc67037e0547139869414 Mon Sep 17 00:00:00 2001 From: hailey Date: Fri, 8 May 2026 03:21:50 +0000 Subject: [PATCH 16/18] Exponential reconnect backoff in JetStream input stream Previously every reconnect waited a fixed reconnect_seconds (default 2s), so a dead JetStream host or a bad URL would just be hammered every two seconds forever. Add max_reconnect_seconds (default 60s) and double the sleep on each session that produced no events; reset to the base on any session that yielded at least one event. Extracted the state update into _advance_backoff so it can be tested without spinning up the full _gen loop. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../atproto_plugin/jetstream_input_stream.py | 25 ++++++++++++++++--- .../tests/test_jetstream_input_stream.py | 21 ++++++++++++++++ 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py index 85743332..1eb7ace5 100644 --- a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py +++ b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py @@ -45,11 +45,14 @@ def __init__( endpoint: Optional[str] = None, wanted_collections: Optional[List[str]] = None, reconnect_seconds: float = 2.0, + max_reconnect_seconds: float = 60.0, ): super().__init__() self._endpoint = endpoint or DEFAULT_ENDPOINT self._wanted_collections = list(wanted_collections) if wanted_collections else list(DEFAULT_COLLECTIONS) self._reconnect_seconds = reconnect_seconds + self._max_reconnect_seconds = max_reconnect_seconds + self._next_reconnect_seconds = reconnect_seconds self._last_time_us: Optional[int] = None self._snowflake_buffer: List[int] = [] @@ -65,15 +68,31 @@ def _build_url(self) -> str: params.append(('cursor', str(self._last_time_us))) return f'{self._endpoint}?{urlencode(params)}' + def _advance_backoff(self, had_event: bool) -> None: + # Reset on a session that produced events; otherwise grow exponentially up to the cap + # so a dead host doesn't get hammered. + if had_event: + self._next_reconnect_seconds = self._reconnect_seconds + else: + self._next_reconnect_seconds = min( + self._next_reconnect_seconds * 2, self._max_reconnect_seconds + ) + def _gen(self) -> Iterator[BaseAckingContext[Action]]: while True: + had_event = False try: url = self._build_url() - yield from self._stream_one_connection(url) + for ctx in self._stream_one_connection(url): + had_event = True + yield ctx except Exception as e: - logger.exception(f'JetStream stream error; reconnecting in {self._reconnect_seconds}s: {e}') + logger.exception(f'JetStream stream error: {e}') sentry_sdk.capture_exception(e) - time.sleep(self._reconnect_seconds) + + self._advance_backoff(had_event) + logger.info(f'Reconnecting in {self._next_reconnect_seconds:.1f}s') + time.sleep(self._next_reconnect_seconds) def _stream_one_connection(self, url: str) -> Iterator[BaseAckingContext[Action]]: # WebSocketApp drives PING/PONG keepalive on its own greenlet; we bridge its diff --git a/example_atproto_plugins/tests/test_jetstream_input_stream.py b/example_atproto_plugins/tests/test_jetstream_input_stream.py index f8c43e3b..c6dbf578 100644 --- a/example_atproto_plugins/tests/test_jetstream_input_stream.py +++ b/example_atproto_plugins/tests/test_jetstream_input_stream.py @@ -180,6 +180,27 @@ def test_build_url_includes_cursor_when_last_time_us_set(): assert 'cursor=1714500000000000' in url +def test_backoff_doubles_then_caps_after_no_event_sessions(): + stream = JetStreamInputStream(reconnect_seconds=1.0, max_reconnect_seconds=8.0) + assert stream._next_reconnect_seconds == 1.0 + + expected_after = [2.0, 4.0, 8.0, 8.0, 8.0] + for expected in expected_after: + stream._advance_backoff(had_event=False) + assert stream._next_reconnect_seconds == expected + + +def test_backoff_resets_after_session_that_yielded_events(): + stream = JetStreamInputStream(reconnect_seconds=1.0, max_reconnect_seconds=8.0) + + for _ in range(5): + stream._advance_backoff(had_event=False) + assert stream._next_reconnect_seconds == 8.0 + + stream._advance_backoff(had_event=True) + assert stream._next_reconnect_seconds == 1.0 + + def test_stream_one_connection_handles_malformed_json_and_gracefully_closes(): class FakeWebSocketApp: def __init__(self, url, on_open=None, on_message=None, on_close=None, on_error=None): From ff18575fa09fe325bd4776a1ab17335825b054e6 Mon Sep 17 00:00:00 2001 From: hailey Date: Fri, 8 May 2026 03:23:39 +0000 Subject: [PATCH 17/18] Drop Sentry usage and split JSON-decode error handling Examples shouldn't pull in observability infra. Removed sentry_sdk import + the two capture_exception calls from the plugin entirely. Replaced the catch-all `except Exception` around `json.loads` plus `_event_to_action` with three narrower paths: - json.JSONDecodeError logs the parser message and the first 200 bytes of the raw payload so a non-JSON message is identifiable in logs. - A non-dict JSON payload (list, scalar, etc.) is logged with the parsed type name. - snowflake_id_worker minting failures (the only realistic raise from _next_action_id) are logged distinctly so they don't get conflated with payload-shape problems. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../atproto_plugin/jetstream_input_stream.py | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py index 1eb7ace5..f8aed6b1 100644 --- a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py +++ b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py @@ -5,7 +5,6 @@ from urllib.parse import urlencode import gevent -import sentry_sdk import websocket from gevent.queue import Queue from osprey.engine.executor.execution_context import Action @@ -88,7 +87,6 @@ def _gen(self) -> Iterator[BaseAckingContext[Action]]: yield ctx except Exception as e: logger.exception(f'JetStream stream error: {e}') - sentry_sdk.capture_exception(e) self._advance_backoff(had_event) logger.info(f'Reconnecting in {self._next_reconnect_seconds:.1f}s') @@ -137,11 +135,25 @@ def on_error(ws: Any, err: Any) -> None: continue try: event = json.loads(raw) - action = _event_to_action(event, action_id=self._next_action_id()) + except json.JSONDecodeError as e: + raw_bytes = raw if isinstance(raw, bytes) else str(raw).encode('utf-8', errors='replace') + logger.warning( + f'JetStream payload was not valid JSON ({e}); ' + f'first 200 bytes: {raw_bytes[:200]!r}' + ) + continue + if not isinstance(event, dict): + logger.warning( + f'JetStream payload parsed to non-object JSON ' + f'(got {type(event).__name__}); skipping' + ) + continue + try: + action_id = self._next_action_id() except Exception: - logger.exception('skipping malformed JetStream event') - sentry_sdk.capture_exception() + logger.exception('failed to mint action_id from snowflake-id-worker; skipping event') continue + action = _event_to_action(event, action_id=action_id) if action is None: continue time_us = event.get('time_us') From fbee1a8a0f4ae38e041cd23e0ee45f2914e48420 Mon Sep 17 00:00:00 2001 From: hailey Date: Fri, 8 May 2026 03:32:57 +0000 Subject: [PATCH 18/18] Use osprey.worker.lib.backoff.Backoff instead of hand-rolled state MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The repo already has a Backoff utility used by the etcd watcher and the bulk-label sink. Drop the hand-rolled _advance_backoff and _next_reconnect_seconds in favour of it; succeed() resets and fail() returns the next delay. The Backoff class also includes jitter, which the hand-rolled version did not. The dedicated _advance_backoff tests go away with the helper — the Backoff class is exercised elsewhere in the worker. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../atproto_plugin/jetstream_input_stream.py | 25 +++++++------------ .../tests/test_jetstream_input_stream.py | 21 ---------------- 2 files changed, 9 insertions(+), 37 deletions(-) diff --git a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py index f8aed6b1..5921f95c 100644 --- a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py +++ b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py @@ -8,6 +8,7 @@ import websocket from gevent.queue import Queue from osprey.engine.executor.execution_context import Action +from osprey.worker.lib.backoff import Backoff from osprey.worker.lib.instruments import metrics from osprey.worker.lib.osprey_shared.logging import get_logger from osprey.worker.lib.snowflake import generate_snowflake_batch @@ -49,9 +50,7 @@ def __init__( super().__init__() self._endpoint = endpoint or DEFAULT_ENDPOINT self._wanted_collections = list(wanted_collections) if wanted_collections else list(DEFAULT_COLLECTIONS) - self._reconnect_seconds = reconnect_seconds - self._max_reconnect_seconds = max_reconnect_seconds - self._next_reconnect_seconds = reconnect_seconds + self._backoff = Backoff(min_delay=reconnect_seconds, max_delay=max_reconnect_seconds) self._last_time_us: Optional[int] = None self._snowflake_buffer: List[int] = [] @@ -67,16 +66,6 @@ def _build_url(self) -> str: params.append(('cursor', str(self._last_time_us))) return f'{self._endpoint}?{urlencode(params)}' - def _advance_backoff(self, had_event: bool) -> None: - # Reset on a session that produced events; otherwise grow exponentially up to the cap - # so a dead host doesn't get hammered. - if had_event: - self._next_reconnect_seconds = self._reconnect_seconds - else: - self._next_reconnect_seconds = min( - self._next_reconnect_seconds * 2, self._max_reconnect_seconds - ) - def _gen(self) -> Iterator[BaseAckingContext[Action]]: while True: had_event = False @@ -88,9 +77,13 @@ def _gen(self) -> Iterator[BaseAckingContext[Action]]: except Exception as e: logger.exception(f'JetStream stream error: {e}') - self._advance_backoff(had_event) - logger.info(f'Reconnecting in {self._next_reconnect_seconds:.1f}s') - time.sleep(self._next_reconnect_seconds) + if had_event: + self._backoff.succeed() + delay = self._backoff.current + else: + delay = self._backoff.fail() + logger.info(f'Reconnecting in {delay:.1f}s') + time.sleep(delay) def _stream_one_connection(self, url: str) -> Iterator[BaseAckingContext[Action]]: # WebSocketApp drives PING/PONG keepalive on its own greenlet; we bridge its diff --git a/example_atproto_plugins/tests/test_jetstream_input_stream.py b/example_atproto_plugins/tests/test_jetstream_input_stream.py index c6dbf578..f8c43e3b 100644 --- a/example_atproto_plugins/tests/test_jetstream_input_stream.py +++ b/example_atproto_plugins/tests/test_jetstream_input_stream.py @@ -180,27 +180,6 @@ def test_build_url_includes_cursor_when_last_time_us_set(): assert 'cursor=1714500000000000' in url -def test_backoff_doubles_then_caps_after_no_event_sessions(): - stream = JetStreamInputStream(reconnect_seconds=1.0, max_reconnect_seconds=8.0) - assert stream._next_reconnect_seconds == 1.0 - - expected_after = [2.0, 4.0, 8.0, 8.0, 8.0] - for expected in expected_after: - stream._advance_backoff(had_event=False) - assert stream._next_reconnect_seconds == expected - - -def test_backoff_resets_after_session_that_yielded_events(): - stream = JetStreamInputStream(reconnect_seconds=1.0, max_reconnect_seconds=8.0) - - for _ in range(5): - stream._advance_backoff(had_event=False) - assert stream._next_reconnect_seconds == 8.0 - - stream._advance_backoff(had_event=True) - assert stream._next_reconnect_seconds == 1.0 - - def test_stream_one_connection_handles_malformed_json_and_gracefully_closes(): class FakeWebSocketApp: def __init__(self, url, on_open=None, on_message=None, on_close=None, on_error=None):