diff --git a/AGENTS.md b/AGENTS.md index 966c59e9..0ffedf7b 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -12,14 +12,16 @@ Top-level modules: - `osprey_coordinator/` — Rust gRPC coordinator (tokio, tonic, etcd, rdkafka). Rust code belongs here. - `proto/osprey/rpc/` — protobuf source of truth for `osprey_rpc` and `osprey_coordinator` types. - `example_plugins/` — reference plugins (UDFs, output sinks, labels service) using the pluggy-based plugin system. Do not add production code here. +- `example_atproto_plugins/` — reference plugin demonstrating a custom input stream that consumes the Bluesky firehose. Stack `docker-compose.atproto.yaml` on top of the main compose file (or use `./run-atproto.sh`) to run Osprey against live ATProto traffic. Do not add production code here. - `example_rules/` — sample SML rules and YAML config. +- `example_atproto_rules/` — sample SML rules paired with `example_atproto_plugins/`. Reference files: `docs/DEVELOPMENT.md` (setup), `example_plugins/src/register_plugins.py` (plugin patterns), `example_plugins/src/services/labels_service.py` (labels service example). ## Design - API: gRPC between `osprey_coordinator` and workers; HTTP/Flask for `osprey-ui-api` (port 5004); protobuf definitions under `proto/osprey/rpc/` are authoritative. -- Rules: SML (Osprey's rule language) with user-defined functions registered via pluggy hooks (`@hookimpl_osprey`): `register_udfs`, `register_output_sinks`, `register_labels_service_or_provider`. +- Rules: SML (Osprey's rule language) with user-defined functions registered via pluggy hooks (`@hookimpl_osprey`): `register_udfs`, `register_output_sinks`, `register_labels_service_or_provider`, `register_input_stream` (custom event source; see `example_atproto_plugins/`). - Data model conventions: Pydantic for models, SQLAlchemy for persistence (versions pinned in `pyproject.toml`). ## Build and run diff --git a/CHANGELOG.md b/CHANGELOG.md index c7d7677b..3b24b478 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add `ParseInt` UDF — converts a numeric string to an integer ([#190](https://github.com/roostorg/osprey/pull/190) by [@bealsbe](https://github.com/bealsbe)) - Add `StringSlice` UDF which extracts a substring by index range ([#189](https://github.com/roostorg/osprey/pull/189) by [@bealsbe](https://github.com/bealsbe)) - Add `InExperiment` UDF which checks if an entity is in an experiment ([#203](https://github.com/roostorg/osprey/pull/203) by [@bealsbe](https://github.com/bealsbe)) +- Add ATProto JetStream example plugins and rules ([#236](https://github.com/roostorg/osprey/pull/236) by [@haileyok](https://github.com/haileyok)) ### 🐛 Bug fixes - Default to selecting all for event stream ([#194](https://github.com/roostorg/osprey/pull/194) by [@chimosky](https://github.com/chimosky)) diff --git a/docker-compose.atproto.yaml b/docker-compose.atproto.yaml new file mode 100644 index 00000000..c26469e6 --- /dev/null +++ b/docker-compose.atproto.yaml @@ -0,0 +1,21 @@ +# Override that swaps the synthetic Kafka producer for the live Bluesky JetStream +# firehose, via the example_atproto_plugins package. Stack on top of the main +# compose file: +# +# docker compose -f docker-compose.yaml -f docker-compose.atproto.yaml up +# +# Or use the convenience wrapper: ./run-atproto.sh +services: + osprey-worker: + environment: + OSPREY_INPUT_STREAM_SOURCE: plugin + OSPREY_RULES_PATH: ./example_atproto_rules + volumes: + - ./example_atproto_rules:/osprey/example_atproto_rules + - ./example_atproto_plugins:/osprey/example_atproto_plugins + + osprey-ui-api: + environment: + OSPREY_RULES_PATH: /osprey/example_atproto_rules + volumes: + - ./example_atproto_rules:/osprey/example_atproto_rules diff --git a/example_atproto_plugins/README.md b/example_atproto_plugins/README.md new file mode 100644 index 00000000..c844a43e --- /dev/null +++ b/example_atproto_plugins/README.md @@ -0,0 +1,72 @@ +# example_atproto_plugins + +A sample Osprey plugin that consumes ATProto's [JetStream](https://docs.bsky.app/blog/jetstream) as the input event source. It gives you: + +- a `register_input_stream` hook implementation that subscribes to JetStream over WebSocket and yields Osprey `Action`s with the JetStream JSON event passed through as-is, +- realistic per-second event volume from the live Bluesky network, which is useful for load and soak testing changes that the synthetic 1-event/second producer doesn't exercise, +- a companion `example_atproto_rules/` tree showing how to organize rules against ATProto event shapes, with file structure modeled on [haileyok/atproto-ruleset](https://github.com/haileyok/atproto-ruleset). + +## Running + +From the repo root: + +```sh +./run-atproto.sh +``` + +This brings up the full Osprey local stack (Druid, Postgres, MinIO, Kafka) along with a JetStream websocket override, and swaps the worker's input source from Kafka to the JetStream plugin, pointing it at `example_atproto_rules` instead of `example_rules`. First-run startup takes a few minutes. + +## Configuration + +| Env var | Default | Description | +| --- | --- | --- | +| `OSPREY_INPUT_STREAM_SOURCE` | (must be) `plugin` | Selects the plugin-provided stream. | +| `OSPREY_JETSTREAM_ENDPOINT` | `wss://jetstream2.us-west.bsky.network/subscribe` | JetStream WebSocket URL. | +| `OSPREY_JETSTREAM_WANTED_COLLECTIONS` | `app.bsky.feed.post,app.bsky.feed.like,app.bsky.feed.repost,app.bsky.graph.follow,app.bsky.actor.profile` | Comma-separated collections to subscribe to (server-side filter). | + +## Action shape + +The JetStream JSON event is passed through unchanged as the Action's `data` dict, so rules read JetStream-native paths directly. `action_name` is `_` for commit events (`create_post`, `delete_like`, `update_profile`, …) using the short names defined in `COLLECTION_NAMES`, or `identity` for identity events. + +### Commit events (e.g. `create_post`, `delete_like`) + +``` +{ + "did": "did:plc:...", + "time_us": 1714500000000000, + "kind": "commit", + "commit": { + "rev": "...", + "operation": "create" | "update" | "delete", + "collection": "app.bsky.feed.post", + "rkey": "...", + "cid": "...", + "record": { ... raw ATProto record ... } + } +} +``` + +### Identity events (`action_name='identity'`) + +``` +{ + "did": "did:plc:...", + "time_us": ..., + "kind": "identity", + "identity": {"did": "...", "handle": "...", "seq": ..., "time": "..."} +} +``` + +Account events, commits for collections not in `COLLECTION_NAMES`, and commits with operations other than `create` / `update` / `delete` are skipped. + +### UI default features + +`example_atproto_rules/config/ui_config.yaml` declares the per-action default features the Osprey UI surfaces in the event stream — e.g. `PostText` for `create_post`, `IdentityHandle` for `identity`, `Subject` for like / repost / follow events. Add new entries there to expose more fields without touching rule code. + +`action_id` is minted from `snowflake-id-worker` in batches of 250. The plugin therefore needs `SNOWFLAKE_API_ENDPOINT` to be set (the local docker-compose stack provides it). + +## Caveats + +- **Not production-ready.** No durable cursor on process restart, no zstd compression, no DID-level filtering. Good for sample / load-testing purposes; not a drop-in for a real ATProto deployment. +- **No event enrichment.** JetStream only carries what's in the commit itself; rulesets that depend on handle / profile / account age (such as much of [atproto-ruleset](https://github.com/haileyok/atproto-ruleset)) are fed by a separate enrichment pipeline, not JetStream directly. This plugin emits JetStream-native paths ($.did, $.commit.collection, etc.); enrichment-fed rulesets would need an enrichment service in front of this one or a different plugin. +- **Connection health.** WebSocket-level PING/PONG keepalive runs every 20s with a 10s pong timeout (`websocket-client`'s `WebSocketApp.run_forever(ping_interval, ping_timeout)`). A stalled or dead connection is detected within ~30s and triggers a reconnect from the last seen `time_us` cursor. diff --git a/example_atproto_plugins/__init__.py b/example_atproto_plugins/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/example_atproto_plugins/pyproject.toml b/example_atproto_plugins/pyproject.toml new file mode 100644 index 00000000..246b1a0f --- /dev/null +++ b/example_atproto_plugins/pyproject.toml @@ -0,0 +1,18 @@ +[project] +name = "example_atproto_plugins" +version = "0.1.0" +description = "Example Osprey plugin that consumes Bluesky's ATProto JetStream firehose" +requires-python = ">=3.11" +dependencies = [ + "pluggy==1.5.0", + "websocket-client==1.8.0", +] + +[tool.setuptools] +package-dir = {"" = "src"} + +[tool.setuptools.packages.find] +where = ["src"] + +[project.entry-points.osprey_plugin] +atproto_plugins = "atproto_plugin.register_plugins" diff --git a/example_atproto_plugins/src/atproto_plugin/__init__.py b/example_atproto_plugins/src/atproto_plugin/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py new file mode 100644 index 00000000..5921f95c --- /dev/null +++ b/example_atproto_plugins/src/atproto_plugin/jetstream_input_stream.py @@ -0,0 +1,187 @@ +import json +import time +from datetime import datetime, timezone +from typing import Any, Dict, Iterator, List, Optional +from urllib.parse import urlencode + +import gevent +import websocket +from gevent.queue import Queue +from osprey.engine.executor.execution_context import Action +from osprey.worker.lib.backoff import Backoff +from osprey.worker.lib.instruments import metrics +from osprey.worker.lib.osprey_shared.logging import get_logger +from osprey.worker.lib.snowflake import generate_snowflake_batch +from osprey.worker.sinks.sink.input_stream import BaseInputStream +from osprey.worker.sinks.utils.acking_contexts import BaseAckingContext, NoopAckingContext + +logger = get_logger() + + +DEFAULT_ENDPOINT = 'wss://jetstream2.us-west.bsky.network/subscribe' +COLLECTION_NAMES = { + 'app.bsky.feed.post': 'post', + 'app.bsky.feed.like': 'like', + 'app.bsky.feed.repost': 'repost', + 'app.bsky.graph.follow': 'follow', + 'app.bsky.actor.profile': 'profile', +} +DEFAULT_COLLECTIONS = tuple(COLLECTION_NAMES.keys()) +SNOWFLAKE_BATCH_SIZE = 250 +PING_INTERVAL_SECONDS = 20 +PING_TIMEOUT_SECONDS = 10 + + +class JetStreamInputStream(BaseInputStream[BaseAckingContext[Action]]): + """An Osprey event input stream that subscribes to the ATProto JetStream websocket and yields + Osprey actions. + + The JetStream JSON event is passed through unchanged as the Action's data dict, so rules may + target the JetStream-native paths directly, like $.did, $.kind, or $.commit.operation. + """ + + def __init__( + self, + endpoint: Optional[str] = None, + wanted_collections: Optional[List[str]] = None, + reconnect_seconds: float = 2.0, + max_reconnect_seconds: float = 60.0, + ): + super().__init__() + self._endpoint = endpoint or DEFAULT_ENDPOINT + self._wanted_collections = list(wanted_collections) if wanted_collections else list(DEFAULT_COLLECTIONS) + self._backoff = Backoff(min_delay=reconnect_seconds, max_delay=max_reconnect_seconds) + self._last_time_us: Optional[int] = None + self._snowflake_buffer: List[int] = [] + + def _next_action_id(self) -> int: + if not self._snowflake_buffer: + batch = generate_snowflake_batch(count=SNOWFLAKE_BATCH_SIZE, retries=3) + self._snowflake_buffer = [s.to_int() for s in batch] + return self._snowflake_buffer.pop() + + def _build_url(self) -> str: + params = [('wantedCollections', c) for c in self._wanted_collections] + if self._last_time_us is not None: + params.append(('cursor', str(self._last_time_us))) + return f'{self._endpoint}?{urlencode(params)}' + + def _gen(self) -> Iterator[BaseAckingContext[Action]]: + while True: + had_event = False + try: + url = self._build_url() + for ctx in self._stream_one_connection(url): + had_event = True + yield ctx + except Exception as e: + logger.exception(f'JetStream stream error: {e}') + + if had_event: + self._backoff.succeed() + delay = self._backoff.current + else: + delay = self._backoff.fail() + logger.info(f'Reconnecting in {delay:.1f}s') + time.sleep(delay) + + def _stream_one_connection(self, url: str) -> Iterator[BaseAckingContext[Action]]: + # WebSocketApp drives PING/PONG keepalive on its own greenlet; we bridge its + # callback API into this generator via a gevent.queue.Queue. The 'done' sentinel + # is pushed by on_close/on_error, and also as a safety net by greenlet.link in + # case run_forever exits without firing on_close (e.g., uncaught exception). + queue: 'Queue[tuple[str, Optional[bytes]]]' = Queue() + + def on_open(ws: Any) -> None: + logger.info('JetStream connection established') + + def on_message(ws: Any, raw: Any) -> None: + queue.put(('message', raw)) + + def on_close(ws: Any, status: Any, msg: Any) -> None: + logger.info(f'JetStream connection closed (status={status}); will reconnect') + queue.put(('done', None)) + + def on_error(ws: Any, err: Any) -> None: + logger.warning(f'JetStream connection error: {err}; will reconnect') + queue.put(('done', None)) + + logger.info(f'Connecting to JetStream at {url}') + app = websocket.WebSocketApp( + url, + on_open=on_open, + on_message=on_message, + on_close=on_close, + on_error=on_error, + ) + runner = gevent.spawn( + app.run_forever, ping_interval=PING_INTERVAL_SECONDS, ping_timeout=PING_TIMEOUT_SECONDS + ) + runner.link(lambda _g: queue.put(('done', None))) + + try: + while True: + kind, raw = queue.get() + if kind == 'done': + return + if not raw: + continue + try: + event = json.loads(raw) + except json.JSONDecodeError as e: + raw_bytes = raw if isinstance(raw, bytes) else str(raw).encode('utf-8', errors='replace') + logger.warning( + f'JetStream payload was not valid JSON ({e}); ' + f'first 200 bytes: {raw_bytes[:200]!r}' + ) + continue + if not isinstance(event, dict): + logger.warning( + f'JetStream payload parsed to non-object JSON ' + f'(got {type(event).__name__}); skipping' + ) + continue + try: + action_id = self._next_action_id() + except Exception: + logger.exception('failed to mint action_id from snowflake-id-worker; skipping event') + continue + action = _event_to_action(event, action_id=action_id) + if action is None: + continue + time_us = event.get('time_us') + if time_us and isinstance(time_us, int) and time_us > 0: + self._last_time_us = time_us + metrics.increment('jetstream_input_stream.events', tags=[f'action_name:{action.action_name}']) + yield NoopAckingContext(action) + finally: + try: + app.close() + except Exception: + logger.info('ignored error while closing JetStream WebSocketApp', exc_info=True) + runner.join(timeout=5) + + +def _event_to_action(event: Dict[str, Any], action_id: int) -> Optional[Action]: + """Wraps a JetStream event as an Osprey action, or returns None if it should be skipped.""" + kind = event.get('kind') + if kind not in ('commit', 'identity'): + return None + time_us = event.get('time_us') + if not isinstance(time_us, int) or time_us <= 0: + return None + if kind == 'commit': + commit = event.get('commit') or {} + operation = commit.get('operation') + short = COLLECTION_NAMES.get(commit.get('collection', '')) + if short is None or operation not in ('create', 'update', 'delete'): + return None + action_name = f'{operation}_{short}' + else: + action_name = 'identity' + return Action( + action_id=action_id, + action_name=action_name, + data=event, + timestamp=datetime.fromtimestamp(time_us / 1_000_000, tz=timezone.utc), + ) diff --git a/example_atproto_plugins/src/atproto_plugin/register_plugins.py b/example_atproto_plugins/src/atproto_plugin/register_plugins.py new file mode 100644 index 00000000..dc6b0736 --- /dev/null +++ b/example_atproto_plugins/src/atproto_plugin/register_plugins.py @@ -0,0 +1,15 @@ +from osprey.engine.executor.execution_context import Action +from osprey.worker.adaptor.plugin_manager import hookimpl_osprey +from osprey.worker.lib.config import Config +from osprey.worker.sinks.sink.input_stream import BaseInputStream +from osprey.worker.sinks.utils.acking_contexts import BaseAckingContext + +from atproto_plugin.jetstream_input_stream import JetStreamInputStream + + +@hookimpl_osprey +def register_input_stream(config: Config) -> BaseInputStream[BaseAckingContext[Action]]: + endpoint = config.get_optional_str('OSPREY_JETSTREAM_ENDPOINT') + raw_collections = config.get_optional_str('OSPREY_JETSTREAM_WANTED_COLLECTIONS') + wanted = [c.strip() for c in raw_collections.split(',') if c.strip()] if raw_collections else None + return JetStreamInputStream(endpoint=endpoint, wanted_collections=wanted) diff --git a/example_atproto_plugins/tests/__init__.py b/example_atproto_plugins/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/example_atproto_plugins/tests/test_jetstream_input_stream.py b/example_atproto_plugins/tests/test_jetstream_input_stream.py new file mode 100644 index 00000000..f8c43e3b --- /dev/null +++ b/example_atproto_plugins/tests/test_jetstream_input_stream.py @@ -0,0 +1,224 @@ +from unittest import mock + +import pytest +from atproto_plugin.jetstream_input_stream import JetStreamInputStream, _event_to_action + +SAMPLE_POST_COMMIT = { + 'did': 'did:plc:abc123', + 'time_us': 1714500000000000, + 'kind': 'commit', + 'commit': { + 'rev': '3kf...', + 'operation': 'create', + 'collection': 'app.bsky.feed.post', + 'rkey': 'abcdefg', + 'cid': 'bafyrei...', + 'record': { + '$type': 'app.bsky.feed.post', + 'text': 'this is a test post', + 'createdAt': '2024-04-30T12:00:00Z', + }, + }, +} + + +def test_post_commit_passes_through_jetstream_payload(): + action = _event_to_action(SAMPLE_POST_COMMIT, action_id=42) + + assert action is not None + assert action.action_id == 42 + assert action.action_name == 'create_post' + # Action.data is the raw JetStream event — rules read $.did, $.commit.*, etc. + assert action.data is SAMPLE_POST_COMMIT + assert action.data['did'] == 'did:plc:abc123' + assert action.data['commit']['operation'] == 'create' + assert action.data['commit']['collection'] == 'app.bsky.feed.post' + assert action.data['commit']['record']['text'] == 'this is a test post' + + +def test_like_commit_passes_through_jetstream_payload(): + like = { + **SAMPLE_POST_COMMIT, + 'commit': { + **SAMPLE_POST_COMMIT['commit'], + 'collection': 'app.bsky.feed.like', + 'record': {'subject': {'uri': 'at://...', 'cid': 'bafy...'}}, + }, + } + action = _event_to_action(like, action_id=1) + + assert action is not None + assert action.action_name == 'create_like' + assert action.data['commit']['collection'] == 'app.bsky.feed.like' + assert 'text' not in action.data['commit']['record'] + + +def test_delete_commit_passes_through_without_record(): + delete = { + **SAMPLE_POST_COMMIT, + 'commit': { + 'operation': 'delete', + 'collection': 'app.bsky.feed.post', + 'rkey': 'abcdefg', + 'rev': '3kf...', + }, + } + action = _event_to_action(delete, action_id=2) + + assert action is not None + assert action.action_name == 'delete_post' + assert action.data['commit']['operation'] == 'delete' + assert 'record' not in action.data['commit'] + + +def test_identity_event_passes_through(): + identity = { + 'did': 'did:plc:xyz', + 'time_us': 1714500000000001, + 'kind': 'identity', + 'identity': {'did': 'did:plc:xyz', 'handle': 'someone.bsky.social', 'seq': 100}, + } + action = _event_to_action(identity, action_id=3) + + assert action is not None + assert action.action_name == 'identity' + assert action.data['identity']['handle'] == 'someone.bsky.social' + + +def test_account_event_is_skipped(): + account = { + 'did': 'did:plc:xyz', + 'time_us': 1714500000000002, + 'kind': 'account', + 'account': {'active': True, 'did': 'did:plc:xyz', 'seq': 100}, + } + assert _event_to_action(account, action_id=4) is None + + +def test_event_without_time_us_is_skipped(): + bad = {k: v for k, v in SAMPLE_POST_COMMIT.items() if k != 'time_us'} + assert _event_to_action(bad, action_id=5) is None + + +def test_event_with_zero_time_us_is_skipped(): + zero_time = {**SAMPLE_POST_COMMIT, 'time_us': 0} + assert _event_to_action(zero_time, action_id=5) is None + + +def test_event_with_negative_time_us_is_skipped(): + neg_time = {**SAMPLE_POST_COMMIT, 'time_us': -1} + assert _event_to_action(neg_time, action_id=5) is None + + +def test_unknown_kind_is_skipped(): + weird = {'did': 'did:plc:xyz', 'time_us': 1, 'kind': 'something-new'} + assert _event_to_action(weird, action_id=6) is None + + +@pytest.mark.parametrize( + 'collection,operation,expected_name', + [ + ('app.bsky.feed.post', 'create', 'create_post'), + ('app.bsky.feed.post', 'update', 'update_post'), + ('app.bsky.feed.post', 'delete', 'delete_post'), + ('app.bsky.feed.like', 'create', 'create_like'), + ('app.bsky.feed.like', 'delete', 'delete_like'), + ('app.bsky.feed.repost', 'create', 'create_repost'), + ('app.bsky.feed.repost', 'delete', 'delete_repost'), + ('app.bsky.graph.follow', 'create', 'create_follow'), + ('app.bsky.graph.follow', 'delete', 'delete_follow'), + ('app.bsky.actor.profile', 'update', 'update_profile'), + ], +) +def test_commit_action_name_combines_operation_and_collection(collection, operation, expected_name): + event = { + **SAMPLE_POST_COMMIT, + 'commit': {**SAMPLE_POST_COMMIT['commit'], 'collection': collection, 'operation': operation}, + } + action = _event_to_action(event, action_id=1) + assert action is not None + assert action.action_name == expected_name + + +def test_commit_for_unmapped_collection_is_skipped(): + event = { + **SAMPLE_POST_COMMIT, + 'commit': {**SAMPLE_POST_COMMIT['commit'], 'collection': 'app.bsky.graph.starterpack'}, + } + assert _event_to_action(event, action_id=1) is None + + +def test_commit_with_unexpected_operation_is_skipped(): + event = { + **SAMPLE_POST_COMMIT, + 'commit': {**SAMPLE_POST_COMMIT['commit'], 'operation': 'wat'}, + } + assert _event_to_action(event, action_id=1) is None + + +def test_build_url_includes_wanted_collections(): + stream = JetStreamInputStream( + endpoint='wss://example.com/sub', + wanted_collections=['app.bsky.feed.post', 'app.bsky.feed.like'], + ) + url = stream._build_url() + assert 'wss://example.com/sub' in url + assert 'wantedCollections=app.bsky.feed.post' in url + assert 'wantedCollections=app.bsky.feed.like' in url + assert 'cursor=' not in url + + +def test_build_url_includes_cursor_when_last_time_us_set(): + stream = JetStreamInputStream( + endpoint='wss://example.com/sub', + wanted_collections=['app.bsky.feed.post'], + ) + stream._last_time_us = 1714500000000000 + url = stream._build_url() + assert 'wss://example.com/sub' in url + assert 'wantedCollections=app.bsky.feed.post' in url + assert 'cursor=1714500000000000' in url + + +def test_stream_one_connection_handles_malformed_json_and_gracefully_closes(): + class FakeWebSocketApp: + def __init__(self, url, on_open=None, on_message=None, on_close=None, on_error=None): + self.url = url + self.on_open = on_open + self.on_message = on_message + self.on_close = on_close + self.closed = False + + def run_forever(self, **kwargs): + self.on_open(self) + self.on_message(self, b'not-json') + self.on_message( + self, + b'{"did": "did:plc:x", "time_us": 1714500000000000, "kind": "commit",' + b' "commit": {"operation": "create", "collection": "app.bsky.feed.post",' + b' "rkey": "abc", "rev": "3kf...", "cid": "bafy...",' + b' "record": {"$type": "app.bsky.feed.post", "text": "hello"}}}', + ) + self.on_close(self, 1000, 'normal') + + def close(self): + self.closed = True + + captured = {} + + def factory(*args, **kwargs): + captured['app'] = FakeWebSocketApp(*args, **kwargs) + return captured['app'] + + stream = JetStreamInputStream(reconnect_seconds=0.1) + # Pre-fill so _next_action_id() doesn't hit the snowflake-id-worker. + stream._snowflake_buffer = [12345] + + with mock.patch('websocket.WebSocketApp', factory): + actions = list(stream._stream_one_connection('wss://example.com/sub')) + + assert len(actions) == 1 + assert actions[0]._item.action_id == 12345 + assert actions[0]._item.action_name == 'create_post' + assert actions[0]._item.data['did'] == 'did:plc:x' + assert captured['app'].closed diff --git a/example_atproto_rules/config/labels.yaml b/example_atproto_rules/config/labels.yaml new file mode 100644 index 00000000..799d5495 --- /dev/null +++ b/example_atproto_rules/config/labels.yaml @@ -0,0 +1,5 @@ +labels: + test-poster: + valid_for: [UserId] + connotation: neutral + description: ATProto user who posted a record containing the word 'test' diff --git a/example_atproto_rules/config/ui_config.yaml b/example_atproto_rules/config/ui_config.yaml new file mode 100644 index 00000000..8b1d6036 --- /dev/null +++ b/example_atproto_rules/config/ui_config.yaml @@ -0,0 +1,12 @@ +ui_config: + default_summary_features: + - actions: ['*'] + features: [UserId] + - actions: ['create_post', 'update_post'] + features: [PostText, Rkey] + - actions: ['delete_post'] + features: [Rkey] + - actions: ['create_like', 'delete_like', 'create_repost', 'delete_repost', 'create_follow', 'delete_follow'] + features: [Subject, Rkey] + - actions: ['identity'] + features: [IdentityHandle] diff --git a/example_atproto_rules/main.sml b/example_atproto_rules/main.sml new file mode 100644 index 00000000..63227315 --- /dev/null +++ b/example_atproto_rules/main.sml @@ -0,0 +1,10 @@ +Import( + rules=[ + 'models/base.sml', + 'models/identity.sml', + 'models/record/base.sml', + 'models/record/post.sml', + ], +) + +Require(rule='rules/index.sml') diff --git a/example_atproto_rules/models/base.sml b/example_atproto_rules/models/base.sml new file mode 100644 index 00000000..cafc322b --- /dev/null +++ b/example_atproto_rules/models/base.sml @@ -0,0 +1,20 @@ +ActionName = GetActionName() + +UserId: Entity[str] = EntityJson( + type='UserId', + path='$.did', + required=False, +) + +OperationKind: Optional[str] = JsonData( + path='$.commit.operation', + required=False, +) + +IsOperation = OperationKind != None + +Second: int = 1 +Minute: int = Second * 60 +Hour: int = Minute * 60 +Day: int = Hour * 24 +Week: int = Day * 7 diff --git a/example_atproto_rules/models/identity.sml b/example_atproto_rules/models/identity.sml new file mode 100644 index 00000000..73be9cce --- /dev/null +++ b/example_atproto_rules/models/identity.sml @@ -0,0 +1,6 @@ +Import(rules=['models/base.sml']) + +IdentityHandle: str = JsonData( + path='$.identity.handle', + required=False, +) diff --git a/example_atproto_rules/models/record/base.sml b/example_atproto_rules/models/record/base.sml new file mode 100644 index 00000000..f768b31a --- /dev/null +++ b/example_atproto_rules/models/record/base.sml @@ -0,0 +1,37 @@ +Import(rules=['models/base.sml']) + +IsCreate = OperationKind == 'create' +IsUpdate = OperationKind == 'update' +IsDelete = OperationKind == 'delete' + +Collection: str = JsonData( + path='$.commit.collection', + required=False, +) + +Rkey: str = JsonData( + path='$.commit.rkey', + required=False, +) + +Cid: str = JsonData( + path='$.commit.cid', + required=False, +) + +# Some collections (like, repost) have `record.subject = {uri, cid}`; others (follow) +# have `record.subject = "did:plc:..."`. Prefer the URI when present, else fall back to +# the raw subject value (coerced to str — for follows that's the DID; for the dict shape +# the URI path resolves first so the coerced repr is never used). +SubjectUri: Optional[str] = JsonData( + path='$.commit.record.subject.uri', + required=False, +) + +SubjectRaw: Optional[str] = JsonData( + path='$.commit.record.subject', + required=False, + coerce_type=True, +) + +Subject: str = ResolveOptional(optional_value=SubjectUri, default_value=SubjectRaw) diff --git a/example_atproto_rules/models/record/post.sml b/example_atproto_rules/models/record/post.sml new file mode 100644 index 00000000..405eda0c --- /dev/null +++ b/example_atproto_rules/models/record/post.sml @@ -0,0 +1,6 @@ +Import(rules=['models/base.sml']) + +PostText: str = JsonData( + path='$.commit.record.text', + required=False, +) diff --git a/example_atproto_rules/rules/index.sml b/example_atproto_rules/rules/index.sml new file mode 100644 index 00000000..113cb722 --- /dev/null +++ b/example_atproto_rules/rules/index.sml @@ -0,0 +1,6 @@ +Import(rules=['models/base.sml']) + +Require( + rule='rules/record/index.sml', + require_if=IsOperation, +) diff --git a/example_atproto_rules/rules/record/index.sml b/example_atproto_rules/rules/record/index.sml new file mode 100644 index 00000000..0fabb9d5 --- /dev/null +++ b/example_atproto_rules/rules/record/index.sml @@ -0,0 +1,11 @@ +Import( + rules=[ + 'models/base.sml', + 'models/record/base.sml', + ], +) + +Require( + rule='rules/record/post/index.sml', + require_if=(IsCreate or IsUpdate) and Collection == 'app.bsky.feed.post', +) diff --git a/example_atproto_rules/rules/record/post/index.sml b/example_atproto_rules/rules/record/post/index.sml new file mode 100644 index 00000000..1d84f15e --- /dev/null +++ b/example_atproto_rules/rules/record/post/index.sml @@ -0,0 +1,9 @@ +Import( + rules=[ + 'models/base.sml', + 'models/record/base.sml', + 'models/record/post.sml', + ], +) + +Require(rule='rules/record/post/post_contains_test.sml') diff --git a/example_atproto_rules/rules/record/post/post_contains_test.sml b/example_atproto_rules/rules/record/post/post_contains_test.sml new file mode 100644 index 00000000..886f068c --- /dev/null +++ b/example_atproto_rules/rules/record/post/post_contains_test.sml @@ -0,0 +1,21 @@ +Import( + rules=[ + 'models/base.sml', + 'models/record/base.sml', + 'models/record/post.sml', + ], +) + +PostContainsTestRule = Rule( + when_all=[ + TextContains(text=PostText, phrase='test'), + ], + description='ATProto post contains the word "test"', +) + +WhenRules( + rules_any=[PostContainsTestRule], + then=[ + LabelAdd(entity=UserId, label='test-poster'), + ], +) diff --git a/osprey_worker/Dockerfile b/osprey_worker/Dockerfile index 39f9aa22..b2ee0f89 100644 --- a/osprey_worker/Dockerfile +++ b/osprey_worker/Dockerfile @@ -35,10 +35,11 @@ ADD LICENSE.md /osprey/LICENSE.md ADD osprey_rpc/pyproject.toml /osprey/osprey_rpc/pyproject.toml ADD osprey_worker/pyproject.toml /osprey/osprey_worker/pyproject.toml ADD example_plugins/pyproject.toml /osprey/example_plugins/pyproject.toml +ADD example_atproto_plugins/pyproject.toml /osprey/example_atproto_plugins/pyproject.toml # Create minimal package structure required by uv -RUN mkdir -p /osprey/osprey_worker /osprey/osprey_rpc /osprey/example_plugins/src && \ - touch /osprey/osprey_worker/__init__.py /osprey/osprey_rpc/__init__.py /osprey/example_plugins/src/__init__.py +RUN mkdir -p /osprey/osprey_worker /osprey/osprey_rpc /osprey/example_plugins/src /osprey/example_atproto_plugins/src/atproto_plugin && \ + touch /osprey/osprey_worker/__init__.py /osprey/osprey_rpc/__init__.py /osprey/example_plugins/src/__init__.py /osprey/example_atproto_plugins/src/atproto_plugin/__init__.py # Install dependencies first (this layer will be cached when only source code changes) RUN pip install --upgrade pip uv && \ @@ -50,9 +51,11 @@ RUN . .venv/bin/activate && update-tld-names # Add source code after dependencies are installed ADD example_rules /osprey/example_rules +ADD example_atproto_rules /osprey/example_atproto_rules ADD osprey_worker /osprey/osprey_worker ADD osprey_rpc /osprey/osprey_rpc ADD example_plugins /osprey/example_plugins +ADD example_atproto_plugins /osprey/example_atproto_plugins COPY entrypoint.sh /osprey/entrypoint.sh diff --git a/pyproject.toml b/pyproject.toml index 503445bb..df826a29 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -133,12 +133,14 @@ nostril = { url = "https://github.com/casics/nostril/archive/v1.2.0.tar.gz" } osprey_rpc = { workspace = true } osprey_worker = { workspace = true } example_plugins = { workspace = true } +example_atproto_plugins = { workspace = true } [tool.uv.workspace] members = [ "osprey_rpc", "osprey_worker", "example_plugins", + "example_atproto_plugins", ] @@ -171,10 +173,10 @@ ignore = [ ] [tool.ruff.lint.isort] -known-first-party = ["osprey_worker", "osprey_rpc", "example_plugins"] +known-first-party = ["osprey_worker", "osprey_rpc", "example_plugins", "example_atproto_plugins"] [tool.fawltydeps] -code = ["osprey_worker/src", "osprey_rpc/src", "example_plugins/src"] +code = ["osprey_worker/src", "osprey_rpc/src", "example_plugins/src", "example_atproto_plugins/src"] deps = ["pyproject.toml"] ignore_unused = [ # Type stubs: used by mypy, never imported directly @@ -239,6 +241,7 @@ mypy_path = [ "osprey_rpc/src", "osprey_worker/src", "example_plugins/src", + "example_atproto_plugins/src", ] # Strict mode includes the following flags. When these are all True, they can be replaced with strict mode. diff --git a/run-atproto.sh b/run-atproto.sh new file mode 100755 index 00000000..ba299324 --- /dev/null +++ b/run-atproto.sh @@ -0,0 +1,5 @@ +#!/bin/bash +# Run Osprey against the live Bluesky JetStream firehose by stacking +# docker-compose.atproto.yaml on top of the main compose file. +set -e +exec docker compose -f docker-compose.yaml -f docker-compose.atproto.yaml "${@:-up}" diff --git a/uv.lock b/uv.lock index d410aafb..4c8915c1 100644 --- a/uv.lock +++ b/uv.lock @@ -21,6 +21,7 @@ resolution-markers = [ [manifest] members = [ + "example-atproto-plugins", "example-plugins", "osprey-rpc", "osprey-worker", @@ -495,6 +496,21 @@ dependencies = [ { name = "six" }, ] +[[package]] +name = "example-atproto-plugins" +version = "0.1.0" +source = { editable = "example_atproto_plugins" } +dependencies = [ + { name = "pluggy" }, + { name = "websocket-client" }, +] + +[package.metadata] +requires-dist = [ + { name = "pluggy", specifier = "==1.5.0" }, + { name = "websocket-client", specifier = "==1.8.0" }, +] + [[package]] name = "example-plugins" version = "0.1.0" @@ -2554,6 +2570,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/fd/84/fd2ba7aafacbad3c4201d395674fc6348826569da3c0937e75505ead3528/wcwidth-0.2.13-py2.py3-none-any.whl", hash = "sha256:3da69048e4540d84af32131829ff948f1e022c1c6bdb8d6102117aac784f6859", size = 34166, upload-time = "2024-01-06T02:10:55.763Z" }, ] +[[package]] +name = "websocket-client" +version = "1.8.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/e6/30/fba0d96b4b5fbf5948ed3f4681f7da2f9f64512e1d303f94b4cc174c24a5/websocket_client-1.8.0.tar.gz", hash = "sha256:3239df9f44da632f96012472805d40a23281a991027ce11d2f45a6f24ac4c3da", size = 54648, upload-time = "2024-04-23T22:16:16.976Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5a/84/44687a29792a70e111c5c477230a72c4b957d88d16141199bf9acb7537a3/websocket_client-1.8.0-py3-none-any.whl", hash = "sha256:17b44cc997f5c498e809b22cdf2d9c7a9e71c02c8cc2b6c56e7c2d1239bfa526", size = 58826, upload-time = "2024-04-23T22:16:14.422Z" }, +] + [[package]] name = "werkzeug" version = "1.0.1"