[examples] Bluesky/ATProto sample integration (input stream, rules)#236
[examples] Bluesky/ATProto sample integration (input stream, rules)#236haileyok wants to merge 20 commits into
Conversation
Mirrors the existing example_plugins/example_rules pattern with a custom register_input_stream hook that subscribes to Bluesky's JetStream WebSocket firehose. Lets contributors exercise Osprey against real production-rate event volume without the synthetic 1-event/sec generator. Stack docker-compose.atproto.yaml on top of the main compose file (or run ./run-atproto.sh) to use it. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Action data shape now mirrors haileyok/atproto-ruleset: $.did,
$.operation.{action,collection,path,cid,record}, $.eventMetadata.
Identity events become action_name='identity'; commit events become
'operation#<create|update|delete>'.
- Rules tree restructured to match the index.sml routing pattern:
main.sml → rules/index.sml → rules/record/index.sml →
rules/record/post/index.sml → individual rule files. Models split
into models/{base,record/base,record/post}.sml.
- Use required=False on entities so non-applicable events don't emit
extraction errors (eliminates __error_count noise on non-post events).
- Snowflake-generated action_id via batched generate_snowflake_batch
(250 IDs per call) instead of time_us, since collisions are possible
at sustained throughput.
- Add unit tests for _event_to_action covering posts, likes, deletes,
identity, account skip, missing time_us, unknown kinds.
- Add app.bsky.actor.profile to default subscribed collections.
- Tighten per-event try/except so a malformed event no longer tears
down the WebSocket connection.
- Drop the Discord-monorepo-specific port-reset block from
docker-compose.atproto.yaml.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
mypy couldn't narrow `ws: Optional[WebSocket]` inside the recv loop because the variable also had to survive into the `finally` block. Moved the inner streaming + close lifecycle into _stream_one_connection, leaving _gen as just the reconnect loop. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Earlier review feedback pointed to haileyok/atproto-ruleset for project
*structure* — but that ruleset is fed by a separate enrichment pipeline,
not JetStream directly, and uses paths like \$.eventMetadata.* and
\$.operation.path that JetStream doesn't emit. Mapping JetStream events
into that shape was misleading: it pretended to expose enrichment
fields that are always null and synthesised paths that don't exist.
Reverted the data shape so Action.data is the JetStream JSON event
passed through unchanged. Rules now read JetStream-native paths:
\$.did, \$.kind, \$.commit.{operation,collection,rkey,cid,record},
\$.identity.handle, etc. action_name is the event's kind ('commit'
or 'identity'). The file hierarchy from atproto-ruleset (main.sml,
models/{base,record/...}, rules/index.sml routing) is preserved.
Verified end-to-end on the live worker:
- 7/7 unit tests pass in container
- 230,392 events processed post-restart, every one with __error_count: 0
- 253,820 commits + 142 identity events emitted with correct paths
- 70 PostContainsTestRule positive matches with label mutation
UserId/test-poster/1 emitted on posts containing "test"
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- IMP-1: Add 30s socket read timeout to detect stalled connections - IMP-2: Track in-process cursor (time_us) to resume from last event on reconnect - IMP-3: Add test coverage for _build_url wantedCollections format, cursor param presence, and malformed message handling - IMP-4: Drop snowflake-id-worker dependency; pass action_id=0 to rely on RulesSink fallback minting - MIN-5: Drop coerce_type=True from string-typed JsonData fields (Collection, Rkey, Cid, PostText) - MIN-7: Stricter time_us validation: reject 0 and negative values, add unit tests - MIN-8: Change debug log to info for socket-close errors - MIN-9: Clarify docker-compose stack startup time in README - MIN-6: Update atproto-ruleset caveat to note enrichment pipeline shape mismatch Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The new test from the previous commit never actually ran (pytest can't import the workspace package outside Docker). Inside Docker the test revealed two issues: - WebSocketConnectionClosedException propagated out of _stream_one_connection, breaking list(gen). Catch it inside the loop and return cleanly — a closed connection is the expected end of a session, not an error worth reraising. _gen still reconnects since the generator just finishes. - NoopAckingContext exposes _item, not item; tests now use the actual attribute name. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
JetStream's contract is time_us: int (microseconds). The previous isinstance(time_us, (int, float)) was over-permissive and would have allowed _event_to_action to yield while the cursor-update narrowing at the call site (isinstance(time_us, int)) silently rejected the same value — leaving the cursor stale on reconnect. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The two earlier commits on this branch that touched uv.lock (469219b, c2c0746) were generated with the older host uv, which downgraded the lockfile from rev 3 (main's format, with editable workspace sources) to rev 2 (with virtual workspace sources). Docker builds use a newer uv that requires rev 3, so `uv sync --locked` was failing in the worker image build. Regenerated with the uv shipped in the worker image so the lockfile again matches main's format. Source-only changes: revision bump, workspace members marked editable, and two unused linux_armv7l grpcio wheels dropped. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Plugin emits action_name = '<operation>_<short>' for commit events (create_post, delete_like, update_profile, ...) using the COLLECTION_NAMES map next to DEFAULT_COLLECTIONS, or 'identity' for identity events. Commits for unmapped collections or unexpected operations are skipped. Rules side adds three small JsonData feature models — IdentityHandle, LikeSubjectUri, FollowSubject — and imports them from main.sml so they evaluate for every action (None when the path doesn't resolve). example_atproto_rules/config/ui_config.yaml registers default_summary_features mapping action globs to the features the Osprey UI surfaces in the event stream — so each event type shows contextually relevant fields without rule-code changes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The cycle-1 IMP-4 review change passed action_id=0 expecting RulesSink.run's fallback to mint a snowflake. The fallback's condition is `if not action.action_id and action.action_id != 0`, which short- circuits to False when action_id is exactly 0 — so action_id stays 0 forever. Every event landed in MinIO storage keyed action_id=0, overwriting the previous one. Druid retained N distinct rows but the event-scan UI fetched by action_id, returning the same MinIO object N times — hence the duplicate-event rendering. Restoring the local snowflake batch (250 ids per snowflake-id-worker call) so each event gets a distinct id. README updated to surface the SNOWFLAKE_API_ENDPOINT dependency. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Old shape evaluated FollowSubject ($.commit.record.subject as str) for
every event. For like/repost records, the path resolves to a {uri, cid}
dict, the str type check fails, and the engine increments
__error_count by 1 on every like/repost — likely also what was
breaking the timeseries chart, since errored events get aggregated
differently.
Replace with a single Subject feature in models/record/base.sml that
prefers $.commit.record.subject.uri (the dict-shape) and falls back to
$.commit.record.subject coerced to str (the follow DID-string shape)
via ResolveOptional. Drop the per-collection follow.sml / like.sml
files and update ui_config.yaml so likes / reposts / follows all
reference the unified Subject.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| SNOWFLAKE_BATCH_SIZE = 250 | ||
|
|
||
|
|
||
| class JetStreamInputStream(BaseInputStream[BaseAckingContext[Action]]): |
There was a problem hiding this comment.
this is the main thing i'd love to get eyes on if anyone has python websocket experience...it doesn't need to be production stable since its really a testing/example guy (not to mention jetstream shouldn't be used for real-world moderation tasks anyway) but it should be at least mostly stable...
There was a problem hiding this comment.
i did want to opt for a non-kafka option in here, because doing this helps break the misconception that osprey can only be used with kafka
juanmrad
left a comment
There was a problem hiding this comment.
Left some comments. My main concern is using WebSocket over WebSocketApp. we'd get the connection lifecycle (ping_interval / pong handling) for free.
|
|
||
| def _stream_one_connection(self, url: str) -> Iterator[BaseAckingContext[Action]]: | ||
| logger.info(f'Connecting to JetStream at {url}') | ||
| ws = websocket.create_connection(url, timeout=30) |
There was a problem hiding this comment.
websocket.create_connection() returns the low-level WebSocket instance, which doesn't auto-PING. ref
Meaning there is no ping interval ensuring the connection stays alive. So today the only thing keeping the connection healthy is JetStream's volume + the read timeout. Here you can either spawn a daemon to health ping the socket.
Or better yet use WebSocketApp.run_forever(ping_interval=20, ping_timeout=10) and bridge the callback API into the _gen iterator via a gevent.queue.Queue. ref
docs explain my thought here.
| def _gen(self) -> Iterator[BaseAckingContext[Action]]: | ||
| while True: |
There was a problem hiding this comment.
We should provably want to add a backoff. If JetStream is down or rate-limiting, you reconnect every 2 seconds in a tight loop and trigger escalating errors, potential higher rate limits and sentry capture on every event. We can start with a simple
backoff = self._reconnect_seconds
while True:
.....
backoff = min(backoff * 2, 60.0)
| except Exception: | ||
| logger.exception('skipping malformed JetStream event') | ||
| sentry_sdk.capture_exception() | ||
| continue |
There was a problem hiding this comment.
| except Exception: | |
| logger.exception('skipping malformed JetStream event') | |
| sentry_sdk.capture_exception() | |
| continue | |
| except json.JSONDecodeError: | |
| logger.warning('skipping malformed JetStream JSON') | |
| continue |
we should be more specific on the error.
websocket.create_connection returns the low-level socket which doesn't auto-PING; the only thing keeping the connection alive was JetStream's volume plus the read timeout. Use WebSocketApp.run_forever with ping_interval=20 and ping_timeout=10 instead, and bridge the callback API into _gen via a gevent.queue.Queue. greenlet.link pushes a 'done' sentinel as a safety net so the generator never blocks if run_forever exits without firing on_close. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previously every reconnect waited a fixed reconnect_seconds (default 2s), so a dead JetStream host or a bad URL would just be hammered every two seconds forever. Add max_reconnect_seconds (default 60s) and double the sleep on each session that produced no events; reset to the base on any session that yielded at least one event. Extracted the state update into _advance_backoff so it can be tested without spinning up the full _gen loop. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Examples shouldn't pull in observability infra. Removed sentry_sdk import + the two capture_exception calls from the plugin entirely. Replaced the catch-all `except Exception` around `json.loads` plus `_event_to_action` with three narrower paths: - json.JSONDecodeError logs the parser message and the first 200 bytes of the raw payload so a non-JSON message is identifiable in logs. - A non-dict JSON payload (list, scalar, etc.) is logged with the parsed type name. - snowflake_id_worker minting failures (the only realistic raise from _next_action_id) are logged distinctly so they don't get conflated with payload-shape problems. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| # WebSocketApp drives PING/PONG keepalive on its own greenlet; we bridge its | ||
| # callback API into this generator via a gevent.queue.Queue. The 'done' sentinel |
There was a problem hiding this comment.
man websocket with gevent scares me
The repo already has a Backoff utility used by the etcd watcher and the bulk-label sink. Drop the hand-rolled _advance_backoff and _next_reconnect_seconds in favour of it; succeed() resets and fail() returns the next delay. The Backoff class also includes jitter, which the hand-rolled version did not. The dedicated _advance_backoff tests go away with the helper — the Backoff class is exercised elsewhere in the worker. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

Description
Adds a new workspace package,
example_atproto_plugins, demonstrating how to plug Osprey into a real-world event source via the existingregister_input_streampluggy hook. The sample subscribes to Bluesky's JetStream WebSocket firehose and maps each ATProto commit (posts, likes, reposts, follows) into an OspreyAction. Companion rules live underexample_atproto_rules/.The main motivation is making it cheap to exercise Osprey changes against realistic event shapes and volume — the existing
osprey-kafka-test-data-produceremits one synthetic event per second from a single template, which doesn't catch issues that only show up with real production traffic. JetStream is free, public, no-auth, and runs at hundreds of events per second; in a few minutes of local testing this stack processed ~34k events of varied content.A secondary benefit: it fills the gap between the README's "used by Bluesky" claim and the lack of any actual ATProto reference code in this repo. The existing
example_pluginsis Discord-shaped; this is the Bluesky-shaped counterpart.Usage
Stacks
docker-compose.atproto.yamlon top of the main compose file, switching the worker fromOSPREY_INPUT_STREAM_SOURCE=kafkato=pluginand pointing it atexample_atproto_rules/. The existing demo path (./demo.sh) is unchanged.Notes / non-goals
websocket-client==1.8.0(Apache-2.0).uv.lockrevision bumps from 2 to 3 because the project's[build-system]requiresuv_build>=0.8.12,<0.9.0, which expects the newer lockfile format.osprey_worker/Dockerfilepicks up the new workspace member alongsideexample_plugins.Confidence Level
Confidence Level: Claude
Testing
_event_to_actionmapping for posts, likes, deletes, identity-event skip, and missingtime_us.docker compose -f docker-compose.yaml -f docker-compose.atproto.yaml configmerges cleanly; maindocker-compose.yamlstill validates standalone.uv run ruff check,uv run ruff format --checkclean.register_plugins(existing UDFs) andatproto_plugins(new input stream); no name collision.PostContainsTestrule evaluates correctly against real post text (gated byEventType == 'create_post').Checklist
uv run ruff check .passesuv tool run fawltydeps --check-unused --pyenv .venv— not run on host (venv permission issue from a docker volume mount); CI will check.CHANGELOG.md— leaving for reviewer to decide on appropriate entry.