-
Notifications
You must be signed in to change notification settings - Fork 45
[examples] Bluesky/ATProto sample integration (input stream, rules) #236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
469219b
2c7edba
1f6a8de
b28092f
c2c0746
7cccb4b
232c449
0c8f462
260e462
6bf2841
74ae20d
b650d94
58d1bcf
853aa74
2cdb462
a591e1c
bd44b1d
ff18575
fbee1a8
f5b23ff
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| # Override that swaps the synthetic Kafka producer for the live Bluesky JetStream | ||
| # firehose, via the example_atproto_plugins package. Stack on top of the main | ||
| # compose file: | ||
| # | ||
| # docker compose -f docker-compose.yaml -f docker-compose.atproto.yaml up | ||
| # | ||
| # Or use the convenience wrapper: ./run-atproto.sh | ||
| services: | ||
| osprey-worker: | ||
| environment: | ||
| OSPREY_INPUT_STREAM_SOURCE: plugin | ||
| OSPREY_RULES_PATH: ./example_atproto_rules | ||
| volumes: | ||
| - ./example_atproto_rules:/osprey/example_atproto_rules | ||
| - ./example_atproto_plugins:/osprey/example_atproto_plugins | ||
|
|
||
| osprey-ui-api: | ||
| environment: | ||
| OSPREY_RULES_PATH: /osprey/example_atproto_rules | ||
| volumes: | ||
| - ./example_atproto_rules:/osprey/example_atproto_rules |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| # example_atproto_plugins | ||
|
|
||
| A sample Osprey plugin that consumes ATProto's [JetStream](https://docs.bsky.app/blog/jetstream) as the input event source. It gives you: | ||
|
|
||
| - a `register_input_stream` hook implementation that subscribes to JetStream over WebSocket and yields Osprey `Action`s with the JetStream JSON event passed through as-is, | ||
| - realistic per-second event volume from the live Bluesky network, which is useful for load and soak testing changes that the synthetic 1-event/second producer doesn't exercise, | ||
| - a companion `example_atproto_rules/` tree showing how to organize rules against ATProto event shapes, with file structure modeled on [haileyok/atproto-ruleset](https://github.com/haileyok/atproto-ruleset). | ||
|
|
||
| ## Running | ||
|
|
||
| From the repo root: | ||
|
|
||
| ```sh | ||
| ./run-atproto.sh | ||
| ``` | ||
|
|
||
| This brings up the full Osprey local stack (Druid, Postgres, MinIO, Kafka) along with a JetStream websocket override, and swaps the worker's input source from Kafka to the JetStream plugin, pointing it at `example_atproto_rules` instead of `example_rules`. First-run startup takes a few minutes. | ||
|
|
||
| ## Configuration | ||
|
|
||
| | Env var | Default | Description | | ||
| | --- | --- | --- | | ||
| | `OSPREY_INPUT_STREAM_SOURCE` | (must be) `plugin` | Selects the plugin-provided stream. | | ||
| | `OSPREY_JETSTREAM_ENDPOINT` | `wss://jetstream2.us-west.bsky.network/subscribe` | JetStream WebSocket URL. | | ||
| | `OSPREY_JETSTREAM_WANTED_COLLECTIONS` | `app.bsky.feed.post,app.bsky.feed.like,app.bsky.feed.repost,app.bsky.graph.follow,app.bsky.actor.profile` | Comma-separated collections to subscribe to (server-side filter). | | ||
|
|
||
| ## Action shape | ||
|
|
||
| The JetStream JSON event is passed through unchanged as the Action's `data` dict, so rules read JetStream-native paths directly. `action_name` is `<operation>_<short>` for commit events (`create_post`, `delete_like`, `update_profile`, …) using the short names defined in `COLLECTION_NAMES`, or `identity` for identity events. | ||
|
|
||
| ### Commit events (e.g. `create_post`, `delete_like`) | ||
|
|
||
| ``` | ||
| { | ||
| "did": "did:plc:...", | ||
| "time_us": 1714500000000000, | ||
| "kind": "commit", | ||
| "commit": { | ||
| "rev": "...", | ||
| "operation": "create" | "update" | "delete", | ||
| "collection": "app.bsky.feed.post", | ||
| "rkey": "...", | ||
| "cid": "...", | ||
| "record": { ... raw ATProto record ... } | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| ### Identity events (`action_name='identity'`) | ||
|
|
||
| ``` | ||
| { | ||
| "did": "did:plc:...", | ||
| "time_us": ..., | ||
| "kind": "identity", | ||
| "identity": {"did": "...", "handle": "...", "seq": ..., "time": "..."} | ||
| } | ||
| ``` | ||
|
|
||
| Account events, commits for collections not in `COLLECTION_NAMES`, and commits with operations other than `create` / `update` / `delete` are skipped. | ||
|
|
||
| ### UI default features | ||
|
|
||
| `example_atproto_rules/config/ui_config.yaml` declares the per-action default features the Osprey UI surfaces in the event stream — e.g. `PostText` for `create_post`, `IdentityHandle` for `identity`, `Subject` for like / repost / follow events. Add new entries there to expose more fields without touching rule code. | ||
|
|
||
| `action_id` is minted from `snowflake-id-worker` in batches of 250. The plugin therefore needs `SNOWFLAKE_API_ENDPOINT` to be set (the local docker-compose stack provides it). | ||
|
|
||
| ## Caveats | ||
|
|
||
| - **Not production-ready.** No durable cursor on process restart, no zstd compression, no DID-level filtering. Good for sample / load-testing purposes; not a drop-in for a real ATProto deployment. | ||
| - **No event enrichment.** JetStream only carries what's in the commit itself; rulesets that depend on handle / profile / account age (such as much of [atproto-ruleset](https://github.com/haileyok/atproto-ruleset)) are fed by a separate enrichment pipeline, not JetStream directly. This plugin emits JetStream-native paths ($.did, $.commit.collection, etc.); enrichment-fed rulesets would need an enrichment service in front of this one or a different plugin. | ||
| - **Connection health.** WebSocket-level PING/PONG keepalive runs every 20s with a 10s pong timeout (`websocket-client`'s `WebSocketApp.run_forever(ping_interval, ping_timeout)`). A stalled or dead connection is detected within ~30s and triggers a reconnect from the last seen `time_us` cursor. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| [project] | ||
| name = "example_atproto_plugins" | ||
| version = "0.1.0" | ||
| description = "Example Osprey plugin that consumes Bluesky's ATProto JetStream firehose" | ||
| requires-python = ">=3.11" | ||
| dependencies = [ | ||
| "pluggy==1.5.0", | ||
| "websocket-client==1.8.0", | ||
| ] | ||
|
|
||
| [tool.setuptools] | ||
| package-dir = {"" = "src"} | ||
|
|
||
| [tool.setuptools.packages.find] | ||
| where = ["src"] | ||
|
|
||
| [project.entry-points.osprey_plugin] | ||
| atproto_plugins = "atproto_plugin.register_plugins" |
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,187 @@ | ||||||||||||||||
| import json | ||||||||||||||||
| import time | ||||||||||||||||
| from datetime import datetime, timezone | ||||||||||||||||
| from typing import Any, Dict, Iterator, List, Optional | ||||||||||||||||
| from urllib.parse import urlencode | ||||||||||||||||
|
|
||||||||||||||||
| import gevent | ||||||||||||||||
| import websocket | ||||||||||||||||
| from gevent.queue import Queue | ||||||||||||||||
| from osprey.engine.executor.execution_context import Action | ||||||||||||||||
| from osprey.worker.lib.backoff import Backoff | ||||||||||||||||
| from osprey.worker.lib.instruments import metrics | ||||||||||||||||
| from osprey.worker.lib.osprey_shared.logging import get_logger | ||||||||||||||||
| from osprey.worker.lib.snowflake import generate_snowflake_batch | ||||||||||||||||
| from osprey.worker.sinks.sink.input_stream import BaseInputStream | ||||||||||||||||
| from osprey.worker.sinks.utils.acking_contexts import BaseAckingContext, NoopAckingContext | ||||||||||||||||
|
|
||||||||||||||||
| logger = get_logger() | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| DEFAULT_ENDPOINT = 'wss://jetstream2.us-west.bsky.network/subscribe' | ||||||||||||||||
| COLLECTION_NAMES = { | ||||||||||||||||
| 'app.bsky.feed.post': 'post', | ||||||||||||||||
| 'app.bsky.feed.like': 'like', | ||||||||||||||||
| 'app.bsky.feed.repost': 'repost', | ||||||||||||||||
| 'app.bsky.graph.follow': 'follow', | ||||||||||||||||
| 'app.bsky.actor.profile': 'profile', | ||||||||||||||||
| } | ||||||||||||||||
| DEFAULT_COLLECTIONS = tuple(COLLECTION_NAMES.keys()) | ||||||||||||||||
| SNOWFLAKE_BATCH_SIZE = 250 | ||||||||||||||||
| PING_INTERVAL_SECONDS = 20 | ||||||||||||||||
| PING_TIMEOUT_SECONDS = 10 | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| class JetStreamInputStream(BaseInputStream[BaseAckingContext[Action]]): | ||||||||||||||||
| """An Osprey event input stream that subscribes to the ATProto JetStream websocket and yields | ||||||||||||||||
| Osprey actions. | ||||||||||||||||
|
|
||||||||||||||||
| The JetStream JSON event is passed through unchanged as the Action's data dict, so rules may | ||||||||||||||||
| target the JetStream-native paths directly, like $.did, $.kind, or $.commit.operation. | ||||||||||||||||
| """ | ||||||||||||||||
|
|
||||||||||||||||
| def __init__( | ||||||||||||||||
| self, | ||||||||||||||||
| endpoint: Optional[str] = None, | ||||||||||||||||
| wanted_collections: Optional[List[str]] = None, | ||||||||||||||||
| reconnect_seconds: float = 2.0, | ||||||||||||||||
| max_reconnect_seconds: float = 60.0, | ||||||||||||||||
| ): | ||||||||||||||||
| super().__init__() | ||||||||||||||||
| self._endpoint = endpoint or DEFAULT_ENDPOINT | ||||||||||||||||
| self._wanted_collections = list(wanted_collections) if wanted_collections else list(DEFAULT_COLLECTIONS) | ||||||||||||||||
| self._backoff = Backoff(min_delay=reconnect_seconds, max_delay=max_reconnect_seconds) | ||||||||||||||||
| self._last_time_us: Optional[int] = None | ||||||||||||||||
| self._snowflake_buffer: List[int] = [] | ||||||||||||||||
|
|
||||||||||||||||
| def _next_action_id(self) -> int: | ||||||||||||||||
| if not self._snowflake_buffer: | ||||||||||||||||
| batch = generate_snowflake_batch(count=SNOWFLAKE_BATCH_SIZE, retries=3) | ||||||||||||||||
| self._snowflake_buffer = [s.to_int() for s in batch] | ||||||||||||||||
| return self._snowflake_buffer.pop() | ||||||||||||||||
|
|
||||||||||||||||
| def _build_url(self) -> str: | ||||||||||||||||
| params = [('wantedCollections', c) for c in self._wanted_collections] | ||||||||||||||||
| if self._last_time_us is not None: | ||||||||||||||||
| params.append(('cursor', str(self._last_time_us))) | ||||||||||||||||
| return f'{self._endpoint}?{urlencode(params)}' | ||||||||||||||||
|
|
||||||||||||||||
| def _gen(self) -> Iterator[BaseAckingContext[Action]]: | ||||||||||||||||
| while True: | ||||||||||||||||
|
Comment on lines
+69
to
+70
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should provably want to add a backoff. If JetStream is down or rate-limiting, you reconnect every 2 seconds in a tight loop and trigger escalating errors, potential higher rate limits and sentry capture on every event. We can start with a simple |
||||||||||||||||
| had_event = False | ||||||||||||||||
| try: | ||||||||||||||||
| url = self._build_url() | ||||||||||||||||
| for ctx in self._stream_one_connection(url): | ||||||||||||||||
| had_event = True | ||||||||||||||||
| yield ctx | ||||||||||||||||
| except Exception as e: | ||||||||||||||||
| logger.exception(f'JetStream stream error: {e}') | ||||||||||||||||
|
|
||||||||||||||||
| if had_event: | ||||||||||||||||
| self._backoff.succeed() | ||||||||||||||||
| delay = self._backoff.current | ||||||||||||||||
| else: | ||||||||||||||||
| delay = self._backoff.fail() | ||||||||||||||||
| logger.info(f'Reconnecting in {delay:.1f}s') | ||||||||||||||||
| time.sleep(delay) | ||||||||||||||||
|
|
||||||||||||||||
| def _stream_one_connection(self, url: str) -> Iterator[BaseAckingContext[Action]]: | ||||||||||||||||
| # WebSocketApp drives PING/PONG keepalive on its own greenlet; we bridge its | ||||||||||||||||
| # callback API into this generator via a gevent.queue.Queue. The 'done' sentinel | ||||||||||||||||
|
Comment on lines
+89
to
+90
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. man websocket with gevent scares me |
||||||||||||||||
| # is pushed by on_close/on_error, and also as a safety net by greenlet.link in | ||||||||||||||||
| # case run_forever exits without firing on_close (e.g., uncaught exception). | ||||||||||||||||
| queue: 'Queue[tuple[str, Optional[bytes]]]' = Queue() | ||||||||||||||||
|
|
||||||||||||||||
| def on_open(ws: Any) -> None: | ||||||||||||||||
| logger.info('JetStream connection established') | ||||||||||||||||
|
|
||||||||||||||||
| def on_message(ws: Any, raw: Any) -> None: | ||||||||||||||||
| queue.put(('message', raw)) | ||||||||||||||||
|
|
||||||||||||||||
| def on_close(ws: Any, status: Any, msg: Any) -> None: | ||||||||||||||||
| logger.info(f'JetStream connection closed (status={status}); will reconnect') | ||||||||||||||||
| queue.put(('done', None)) | ||||||||||||||||
|
|
||||||||||||||||
| def on_error(ws: Any, err: Any) -> None: | ||||||||||||||||
| logger.warning(f'JetStream connection error: {err}; will reconnect') | ||||||||||||||||
| queue.put(('done', None)) | ||||||||||||||||
|
|
||||||||||||||||
| logger.info(f'Connecting to JetStream at {url}') | ||||||||||||||||
| app = websocket.WebSocketApp( | ||||||||||||||||
| url, | ||||||||||||||||
| on_open=on_open, | ||||||||||||||||
| on_message=on_message, | ||||||||||||||||
| on_close=on_close, | ||||||||||||||||
| on_error=on_error, | ||||||||||||||||
| ) | ||||||||||||||||
| runner = gevent.spawn( | ||||||||||||||||
| app.run_forever, ping_interval=PING_INTERVAL_SECONDS, ping_timeout=PING_TIMEOUT_SECONDS | ||||||||||||||||
| ) | ||||||||||||||||
| runner.link(lambda _g: queue.put(('done', None))) | ||||||||||||||||
|
|
||||||||||||||||
| try: | ||||||||||||||||
| while True: | ||||||||||||||||
| kind, raw = queue.get() | ||||||||||||||||
| if kind == 'done': | ||||||||||||||||
| return | ||||||||||||||||
| if not raw: | ||||||||||||||||
| continue | ||||||||||||||||
| try: | ||||||||||||||||
| event = json.loads(raw) | ||||||||||||||||
| except json.JSONDecodeError as e: | ||||||||||||||||
| raw_bytes = raw if isinstance(raw, bytes) else str(raw).encode('utf-8', errors='replace') | ||||||||||||||||
| logger.warning( | ||||||||||||||||
| f'JetStream payload was not valid JSON ({e}); ' | ||||||||||||||||
| f'first 200 bytes: {raw_bytes[:200]!r}' | ||||||||||||||||
| ) | ||||||||||||||||
| continue | ||||||||||||||||
| if not isinstance(event, dict): | ||||||||||||||||
| logger.warning( | ||||||||||||||||
| f'JetStream payload parsed to non-object JSON ' | ||||||||||||||||
| f'(got {type(event).__name__}); skipping' | ||||||||||||||||
| ) | ||||||||||||||||
| continue | ||||||||||||||||
| try: | ||||||||||||||||
| action_id = self._next_action_id() | ||||||||||||||||
| except Exception: | ||||||||||||||||
| logger.exception('failed to mint action_id from snowflake-id-worker; skipping event') | ||||||||||||||||
| continue | ||||||||||||||||
|
Comment on lines
+146
to
+148
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
we should be more specific on the error. |
||||||||||||||||
| action = _event_to_action(event, action_id=action_id) | ||||||||||||||||
| if action is None: | ||||||||||||||||
| continue | ||||||||||||||||
| time_us = event.get('time_us') | ||||||||||||||||
| if time_us and isinstance(time_us, int) and time_us > 0: | ||||||||||||||||
| self._last_time_us = time_us | ||||||||||||||||
| metrics.increment('jetstream_input_stream.events', tags=[f'action_name:{action.action_name}']) | ||||||||||||||||
| yield NoopAckingContext(action) | ||||||||||||||||
| finally: | ||||||||||||||||
| try: | ||||||||||||||||
| app.close() | ||||||||||||||||
| except Exception: | ||||||||||||||||
| logger.info('ignored error while closing JetStream WebSocketApp', exc_info=True) | ||||||||||||||||
| runner.join(timeout=5) | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| def _event_to_action(event: Dict[str, Any], action_id: int) -> Optional[Action]: | ||||||||||||||||
| """Wraps a JetStream event as an Osprey action, or returns None if it should be skipped.""" | ||||||||||||||||
| kind = event.get('kind') | ||||||||||||||||
| if kind not in ('commit', 'identity'): | ||||||||||||||||
| return None | ||||||||||||||||
| time_us = event.get('time_us') | ||||||||||||||||
| if not isinstance(time_us, int) or time_us <= 0: | ||||||||||||||||
| return None | ||||||||||||||||
| if kind == 'commit': | ||||||||||||||||
| commit = event.get('commit') or {} | ||||||||||||||||
| operation = commit.get('operation') | ||||||||||||||||
| short = COLLECTION_NAMES.get(commit.get('collection', '')) | ||||||||||||||||
| if short is None or operation not in ('create', 'update', 'delete'): | ||||||||||||||||
| return None | ||||||||||||||||
| action_name = f'{operation}_{short}' | ||||||||||||||||
| else: | ||||||||||||||||
| action_name = 'identity' | ||||||||||||||||
| return Action( | ||||||||||||||||
| action_id=action_id, | ||||||||||||||||
| action_name=action_name, | ||||||||||||||||
| data=event, | ||||||||||||||||
| timestamp=datetime.fromtimestamp(time_us / 1_000_000, tz=timezone.utc), | ||||||||||||||||
| ) | ||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| from osprey.engine.executor.execution_context import Action | ||
| from osprey.worker.adaptor.plugin_manager import hookimpl_osprey | ||
| from osprey.worker.lib.config import Config | ||
| from osprey.worker.sinks.sink.input_stream import BaseInputStream | ||
| from osprey.worker.sinks.utils.acking_contexts import BaseAckingContext | ||
|
|
||
| from atproto_plugin.jetstream_input_stream import JetStreamInputStream | ||
|
|
||
|
|
||
| @hookimpl_osprey | ||
| def register_input_stream(config: Config) -> BaseInputStream[BaseAckingContext[Action]]: | ||
| endpoint = config.get_optional_str('OSPREY_JETSTREAM_ENDPOINT') | ||
| raw_collections = config.get_optional_str('OSPREY_JETSTREAM_WANTED_COLLECTIONS') | ||
| wanted = [c.strip() for c in raw_collections.split(',') if c.strip()] if raw_collections else None | ||
| return JetStreamInputStream(endpoint=endpoint, wanted_collections=wanted) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the main thing i'd love to get eyes on if anyone has python websocket experience...it doesn't need to be production stable since its really a testing/example guy (not to mention jetstream shouldn't be used for real-world moderation tasks anyway) but it should be at least mostly stable...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i did want to opt for a non-kafka option in here, because doing this helps break the misconception that osprey can only be used with kafka