From 4e0a20722dc3f10186bed3747c981bfc148c3f98 Mon Sep 17 00:00:00 2001 From: Robert Date: Sun, 10 May 2026 14:15:13 +0200 Subject: [PATCH 1/2] Add SSE change notifications --- docs/src/SUMMARY.md | 1 + docs/src/usage/change-notifications.md | 43 +++++++ docs/src/usage/docker-images.md | 2 + server/src/api/add_version.rs | 8 ++ server/src/api/events.rs | 148 +++++++++++++++++++++++++ server/src/api/mod.rs | 9 ++ server/src/args.rs | 36 ++++++ server/src/web.rs | 3 + 8 files changed, 250 insertions(+) create mode 100644 docs/src/usage/change-notifications.md create mode 100644 server/src/api/events.rs diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md index 3ce3908..6b08617 100644 --- a/docs/src/SUMMARY.md +++ b/docs/src/SUMMARY.md @@ -5,6 +5,7 @@ - [Docker Compose](./usage/docker-compose.md) - [Docker Images](./usage/docker-images.md) - [Binaries](./usage/binaries.md) + - [Change Notifications](./usage/change-notifications.md) - [Integration](./integration.md) - [Pre-built Images](./integration/pre-built.md) - [Rust Crates](./integration/crates.md) diff --git a/docs/src/usage/change-notifications.md b/docs/src/usage/change-notifications.md new file mode 100644 index 0000000..d03aabb --- /dev/null +++ b/docs/src/usage/change-notifications.md @@ -0,0 +1,43 @@ +# Change Notifications + +The HTTP server exposes `GET /v1/client/events` as a Server-Sent Events stream. +Like other client endpoints, the request must include `X-Client-Id`. + +This endpoint is disabled by default. Enable it with `--sync-events` or the +`SYNC_EVENTS=true` environment variable. + +When `AddVersion` accepts a new version for that client, the stream emits a +`version` event: + +```text +event: version +data: {"clientId":"...","versionId":"..."} +``` + +This endpoint is only an invalidation signal. Clients should perform a normal +TaskChampion sync after receiving an event. + +## Simple Listener + +This example runs a command for every received `version` event. + +```bash +#!/usr/bin/env bash +set -euo pipefail + +server_url="${TASKCHAMPION_SYNC_SERVER_URL:?set TASKCHAMPION_SYNC_SERVER_URL}" +client_id="${TASKCHAMPION_SYNC_CLIENT_ID:?set TASKCHAMPION_SYNC_CLIENT_ID}" + +curl -fsSN \ + -H "Accept: text/event-stream" \ + -H "X-Client-Id: ${client_id}" \ + "${server_url%/}/v1/client/events" | +while IFS= read -r line; do + case "${line}" in + data:*) + echo "TaskChampion changed: ${line#data: }" + task sync + ;; + esac +done +``` diff --git a/docs/src/usage/docker-images.md b/docs/src/usage/docker-images.md index 09a4c78..511d622 100644 --- a/docs/src/usage/docker-images.md +++ b/docs/src/usage/docker-images.md @@ -27,6 +27,8 @@ empty to allow all clients. - `CREATE_CLIENTS` (default `true`) - if true, automatically create clients on first sync. If this is set to false, it is up to you to initialize clients in the DB. +- `SYNC_EVENTS` (default `false`) - if true, enable the Server-Sent Events +change notification endpoint at `/v1/client/events`. ### Example diff --git a/server/src/api/add_version.rs b/server/src/api/add_version.rs index 2e79c7d..6013988 100644 --- a/server/src/api/add_version.rs +++ b/server/src/api/add_version.rs @@ -74,6 +74,7 @@ pub(crate) async fn service( rb.append_header((SNAPSHOT_REQUEST_HEADER, "urgency=high")); } }; + server_state.changes.notify(client_id, version_id); Ok(rb.finish()) } Ok((AddVersionResult::ExpectedParentVersion(parent_version_id), _)) => { @@ -106,6 +107,7 @@ mod test { web::{WebConfig, WebServer}, }; use actix_web::{http::StatusCode, test, App}; + use futures::StreamExt; use pretty_assertions::assert_eq; use taskchampion_sync_server_core::{InMemoryStorage, ServerConfig, Storage}; use uuid::Uuid; @@ -125,6 +127,7 @@ mod test { } let server = WebServer::new(ServerConfig::default(), WebConfig::default(), storage); + let mut changes = server.server_state.changes.subscribe(client_id); let app = App::new().configure(|sc| server.config(sc)); let app = test::init_service(app).await; @@ -145,6 +148,11 @@ mod test { // the passed parent version ID, at least let new_version_id = resp.headers().get("X-Version-Id").unwrap(); assert!(new_version_id != &version_id.to_string()); + let new_version_id = Uuid::parse_str(new_version_id.to_str().unwrap()).unwrap(); + + let event = changes.next().await.unwrap(); + assert_eq!(event.client_id, client_id); + assert_eq!(event.version_id, new_version_id); // Shapshot should be requested, since there is no existing snapshot let snapshot_request = resp.headers().get("X-Snapshot-Request").unwrap(); diff --git a/server/src/api/events.rs b/server/src/api/events.rs new file mode 100644 index 0000000..8a2a192 --- /dev/null +++ b/server/src/api/events.rs @@ -0,0 +1,148 @@ +use crate::api::{ServerState, CLIENT_ID_HEADER}; +use actix_web::{error, get, http::header, web, HttpRequest, HttpResponse, Result}; +use futures::{ + channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}, + StreamExt, +}; +use serde::Serialize; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; +use taskchampion_sync_server_core::{ClientId, VersionId}; + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct ChangeEvent { + pub(crate) client_id: ClientId, + pub(crate) version_id: VersionId, +} + +#[derive(Clone, Default)] +pub(crate) struct ChangeNotifier { + subscribers: Arc>>>>, +} + +impl ChangeNotifier { + pub(crate) fn subscribe(&self, client_id: ClientId) -> UnboundedReceiver { + let (tx, rx) = unbounded(); + self.subscribers + .lock() + .expect("change notifier mutex poisoned") + .entry(client_id) + .or_default() + .push(tx); + rx + } + + pub(crate) fn notify(&self, client_id: ClientId, version_id: VersionId) { + let event = ChangeEvent { + client_id, + version_id, + }; + let mut subscribers = self + .subscribers + .lock() + .expect("change notifier mutex poisoned"); + if let Some(client_subscribers) = subscribers.get_mut(&client_id) { + client_subscribers + .retain(|subscriber| subscriber.unbounded_send(event.clone()).is_ok()); + } + } +} + +#[get("/v1/client/events")] +pub(crate) async fn service( + req: HttpRequest, + server_state: web::Data>, +) -> Result { + if !server_state.web_config.sync_events { + return Err(error::ErrorNotFound("sync events are not enabled")); + } + + let client_id = server_state.client_id_header(&req)?; + let stream = server_state.changes.subscribe(client_id).map(|event| { + let json = serde_json::to_string(&event).expect("change event serializes"); + Ok::<_, actix_web::Error>(web::Bytes::from(format!( + "event: version\n\ + data: {json}\n\ + \n" + ))) + }); + + Ok(HttpResponse::Ok() + .append_header((header::CONTENT_TYPE, "text/event-stream")) + .append_header((header::CACHE_CONTROL, "no-store, max-age=0")) + .append_header((header::CONNECTION, "keep-alive")) + .append_header((CLIENT_ID_HEADER, client_id.to_string())) + .streaming(stream)) +} + +#[cfg(test)] +mod test { + use super::*; + use crate::web::{WebConfig, WebServer}; + use actix_web::{http::StatusCode, test, App}; + use taskchampion_sync_server_core::{InMemoryStorage, ServerConfig}; + use uuid::Uuid; + + #[actix_rt::test] + async fn notifier_delivers_events_for_matching_client() { + let notifier = ChangeNotifier::default(); + let client_id = Uuid::new_v4(); + let version_id = Uuid::new_v4(); + let mut rx = notifier.subscribe(client_id); + + notifier.notify(client_id, version_id); + let event = rx.next().await.unwrap(); + assert_eq!(event.client_id, client_id); + assert_eq!(event.version_id, version_id); + } + + #[actix_rt::test] + async fn events_endpoint_uses_client_id_header() { + let client_id = Uuid::new_v4(); + let server = WebServer::new( + ServerConfig::default(), + WebConfig { + sync_events: true, + ..WebConfig::default() + }, + InMemoryStorage::new(), + ); + let app = App::new().configure(|sc| server.config(sc)); + let app = test::init_service(app).await; + + let req = test::TestRequest::get() + .uri("/v1/client/events") + .append_header((CLIENT_ID_HEADER, client_id.to_string())) + .to_request(); + let resp = test::call_service(&app, req).await; + + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!( + resp.headers().get(header::CONTENT_TYPE).unwrap(), + "text/event-stream" + ); + } + + #[actix_rt::test] + async fn events_endpoint_is_disabled_by_default() { + let client_id = Uuid::new_v4(); + let server = WebServer::new( + ServerConfig::default(), + WebConfig::default(), + InMemoryStorage::new(), + ); + let app = App::new().configure(|sc| server.config(sc)); + let app = test::init_service(app).await; + + let req = test::TestRequest::get() + .uri("/v1/client/events") + .append_header((CLIENT_ID_HEADER, client_id.to_string())) + .to_request(); + let resp = test::call_service(&app, req).await; + + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } +} diff --git a/server/src/api/mod.rs b/server/src/api/mod.rs index b57fcab..39c53c4 100644 --- a/server/src/api/mod.rs +++ b/server/src/api/mod.rs @@ -5,9 +5,12 @@ use crate::web::WebConfig; mod add_snapshot; mod add_version; +mod events; mod get_child_version; mod get_snapshot; +pub(crate) use events::ChangeNotifier; + /// The content-type for history segments (opaque blobs of bytes) pub(crate) const HISTORY_SEGMENT_CONTENT_TYPE: &str = "application/vnd.taskchampion.history-segment"; @@ -31,6 +34,7 @@ pub(crate) const SNAPSHOT_REQUEST_HEADER: &str = "X-Snapshot-Request"; pub(crate) struct ServerState { pub(crate) server: Server, pub(crate) web_config: WebConfig, + pub(crate) changes: ChangeNotifier, } impl ServerState { @@ -60,6 +64,7 @@ pub(crate) fn api_scope() -> Scope { .service(add_version::service) .service(get_snapshot::service) .service(add_snapshot::service) + .service(events::service) } /// Convert a `anyhow::Error` to an Actix ISE @@ -89,8 +94,10 @@ mod test { web_config: WebConfig { client_id_allowlist: None, create_clients: true, + sync_events: false, ..WebConfig::default() }, + changes: ChangeNotifier::default(), }; let req = actix_web::test::TestRequest::default() .insert_header((CLIENT_ID_HEADER, client_id.to_string())) @@ -107,8 +114,10 @@ mod test { web_config: WebConfig { client_id_allowlist: Some([client_id_ok].into()), create_clients: true, + sync_events: false, ..WebConfig::default() }, + changes: ChangeNotifier::default(), }; let req = actix_web::test::TestRequest::default() .insert_header((CLIENT_ID_HEADER, client_id_ok.to_string())) diff --git a/server/src/args.rs b/server/src/args.rs index 033053b..61f3ba6 100644 --- a/server/src/args.rs +++ b/server/src/args.rs @@ -34,6 +34,13 @@ pub fn command() -> Command { .action(ArgAction::SetFalse) .required(false), ) + .arg( + arg!(--"sync-events" "Enable Server-Sent Events change notifications at /v1/client/events") + .env("SYNC_EVENTS") + .default_value("false") + .action(ArgAction::SetTrue) + .required(false), + ) .arg( arg!(--"snapshot-versions" "Target number of versions between snapshots") .value_parser(value_parser!(u32)) @@ -63,6 +70,7 @@ pub fn web_config_from_matches(matches: &ArgMatches) -> WebConfig { .get_many("allow-client-id") .map(|ids| ids.copied().collect()), create_clients: matches.get_one("create-clients").copied().unwrap_or(true), + sync_events: matches.get_one("sync-events").copied().unwrap_or(false), listen_addresses: matches .get_many::("listen") .unwrap() @@ -282,6 +290,34 @@ mod test { }); } + #[test] + fn command_sync_events_default() { + with_var_unset("SYNC_EVENTS", || { + let matches = command().get_matches_from(["tss", "--listen", "localhost:8080"]); + let web_config = web_config_from_matches(&matches); + assert_eq!(web_config.sync_events, false); + }); + } + + #[test] + fn command_sync_events_cmdline() { + with_var_unset("SYNC_EVENTS", || { + let matches = + command().get_matches_from(["tss", "--listen", "localhost:8080", "--sync-events"]); + let web_config = web_config_from_matches(&matches); + assert_eq!(web_config.sync_events, true); + }); + } + + #[test] + fn command_sync_events_env_true() { + with_var("SYNC_EVENTS", Some("true"), || { + let matches = command().get_matches_from(["tss", "--listen", "localhost:8080"]); + let web_config = web_config_from_matches(&matches); + assert_eq!(web_config.sync_events, true); + }); + } + #[actix_rt::test] async fn test_index_get() { let server = WebServer::new( diff --git a/server/src/web.rs b/server/src/web.rs index 3ff95c7..2688aa4 100644 --- a/server/src/web.rs +++ b/server/src/web.rs @@ -22,6 +22,7 @@ fn print_error(res: ServiceResponse) -> actix_web::Result>, pub create_clients: bool, + pub sync_events: bool, pub listen_addresses: Vec, } @@ -30,6 +31,7 @@ impl Default for WebConfig { Self { client_id_allowlist: Default::default(), create_clients: true, + sync_events: false, listen_addresses: vec![], } } @@ -57,6 +59,7 @@ impl WebServer { server_state: Arc::new(ServerState { server: Server::new(config, storage), web_config, + changes: Default::default(), }), } } From 868b4a60e2e263b3dfd1d399868cf1993675cd46 Mon Sep 17 00:00:00 2001 From: Robert Date: Sun, 10 May 2026 16:05:50 +0200 Subject: [PATCH 2/2] Remove version ID from SSE events --- docs/src/usage/change-notifications.md | 2 +- server/src/api/add_version.rs | 4 +--- server/src/api/events.rs | 14 ++++---------- 3 files changed, 6 insertions(+), 14 deletions(-) diff --git a/docs/src/usage/change-notifications.md b/docs/src/usage/change-notifications.md index d03aabb..576746a 100644 --- a/docs/src/usage/change-notifications.md +++ b/docs/src/usage/change-notifications.md @@ -11,7 +11,7 @@ When `AddVersion` accepts a new version for that client, the stream emits a ```text event: version -data: {"clientId":"...","versionId":"..."} +data: {"clientId":"..."} ``` This endpoint is only an invalidation signal. Clients should perform a normal diff --git a/server/src/api/add_version.rs b/server/src/api/add_version.rs index 6013988..baedb4d 100644 --- a/server/src/api/add_version.rs +++ b/server/src/api/add_version.rs @@ -74,7 +74,7 @@ pub(crate) async fn service( rb.append_header((SNAPSHOT_REQUEST_HEADER, "urgency=high")); } }; - server_state.changes.notify(client_id, version_id); + server_state.changes.notify(client_id); Ok(rb.finish()) } Ok((AddVersionResult::ExpectedParentVersion(parent_version_id), _)) => { @@ -148,11 +148,9 @@ mod test { // the passed parent version ID, at least let new_version_id = resp.headers().get("X-Version-Id").unwrap(); assert!(new_version_id != &version_id.to_string()); - let new_version_id = Uuid::parse_str(new_version_id.to_str().unwrap()).unwrap(); let event = changes.next().await.unwrap(); assert_eq!(event.client_id, client_id); - assert_eq!(event.version_id, new_version_id); // Shapshot should be requested, since there is no existing snapshot let snapshot_request = resp.headers().get("X-Snapshot-Request").unwrap(); diff --git a/server/src/api/events.rs b/server/src/api/events.rs index 8a2a192..5d39baf 100644 --- a/server/src/api/events.rs +++ b/server/src/api/events.rs @@ -9,13 +9,12 @@ use std::{ collections::HashMap, sync::{Arc, Mutex}, }; -use taskchampion_sync_server_core::{ClientId, VersionId}; +use taskchampion_sync_server_core::ClientId; #[derive(Clone, Debug, Serialize)] #[serde(rename_all = "camelCase")] pub(crate) struct ChangeEvent { pub(crate) client_id: ClientId, - pub(crate) version_id: VersionId, } #[derive(Clone, Default)] @@ -35,11 +34,8 @@ impl ChangeNotifier { rx } - pub(crate) fn notify(&self, client_id: ClientId, version_id: VersionId) { - let event = ChangeEvent { - client_id, - version_id, - }; + pub(crate) fn notify(&self, client_id: ClientId) { + let event = ChangeEvent { client_id }; let mut subscribers = self .subscribers .lock() @@ -90,13 +86,11 @@ mod test { async fn notifier_delivers_events_for_matching_client() { let notifier = ChangeNotifier::default(); let client_id = Uuid::new_v4(); - let version_id = Uuid::new_v4(); let mut rx = notifier.subscribe(client_id); - notifier.notify(client_id, version_id); + notifier.notify(client_id); let event = rx.next().await.unwrap(); assert_eq!(event.client_id, client_id); - assert_eq!(event.version_id, version_id); } #[actix_rt::test]