Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- [Docker Compose](./usage/docker-compose.md)
- [Docker Images](./usage/docker-images.md)
- [Binaries](./usage/binaries.md)
- [Change Notifications](./usage/change-notifications.md)
- [Integration](./integration.md)
- [Pre-built Images](./integration/pre-built.md)
- [Rust Crates](./integration/crates.md)
Expand Down
43 changes: 43 additions & 0 deletions docs/src/usage/change-notifications.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Change Notifications

The HTTP server exposes `GET /v1/client/events` as a Server-Sent Events stream.
Like other client endpoints, the request must include `X-Client-Id`.

This endpoint is disabled by default. Enable it with `--sync-events` or the
`SYNC_EVENTS=true` environment variable.

When `AddVersion` accepts a new version for that client, the stream emits a
`version` event:

```text
event: version
data: {"clientId":"..."}
```

This endpoint is only an invalidation signal. Clients should perform a normal
TaskChampion sync after receiving an event.

## Simple Listener

This example runs a command for every received `version` event.

```bash
#!/usr/bin/env bash
set -euo pipefail

server_url="${TASKCHAMPION_SYNC_SERVER_URL:?set TASKCHAMPION_SYNC_SERVER_URL}"
client_id="${TASKCHAMPION_SYNC_CLIENT_ID:?set TASKCHAMPION_SYNC_CLIENT_ID}"

curl -fsSN \
-H "Accept: text/event-stream" \
-H "X-Client-Id: ${client_id}" \
"${server_url%/}/v1/client/events" |
while IFS= read -r line; do
case "${line}" in
data:*)
echo "TaskChampion changed: ${line#data: }"
task sync
;;
esac
done
```
2 changes: 2 additions & 0 deletions docs/src/usage/docker-images.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ empty to allow all clients.
- `CREATE_CLIENTS` (default `true`) - if true, automatically create clients on
first sync. If this is set to false, it is up to you to initialize clients in
the DB.
- `SYNC_EVENTS` (default `false`) - if true, enable the Server-Sent Events
change notification endpoint at `/v1/client/events`.

### Example

Expand Down
6 changes: 6 additions & 0 deletions server/src/api/add_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub(crate) async fn service(
rb.append_header((SNAPSHOT_REQUEST_HEADER, "urgency=high"));
}
};
server_state.changes.notify(client_id);
Ok(rb.finish())
}
Ok((AddVersionResult::ExpectedParentVersion(parent_version_id), _)) => {
Expand Down Expand Up @@ -106,6 +107,7 @@ mod test {
web::{WebConfig, WebServer},
};
use actix_web::{http::StatusCode, test, App};
use futures::StreamExt;
use pretty_assertions::assert_eq;
use taskchampion_sync_server_core::{InMemoryStorage, ServerConfig, Storage};
use uuid::Uuid;
Expand All @@ -125,6 +127,7 @@ mod test {
}

let server = WebServer::new(ServerConfig::default(), WebConfig::default(), storage);
let mut changes = server.server_state.changes.subscribe(client_id);
let app = App::new().configure(|sc| server.config(sc));
let app = test::init_service(app).await;

Expand All @@ -146,6 +149,9 @@ mod test {
let new_version_id = resp.headers().get("X-Version-Id").unwrap();
assert!(new_version_id != &version_id.to_string());

let event = changes.next().await.unwrap();
assert_eq!(event.client_id, client_id);

// Shapshot should be requested, since there is no existing snapshot
let snapshot_request = resp.headers().get("X-Snapshot-Request").unwrap();
assert_eq!(snapshot_request, "urgency=high");
Expand Down
142 changes: 142 additions & 0 deletions server/src/api/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
use crate::api::{ServerState, CLIENT_ID_HEADER};
use actix_web::{error, get, http::header, web, HttpRequest, HttpResponse, Result};
use futures::{
channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
StreamExt,
};
use serde::Serialize;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use taskchampion_sync_server_core::ClientId;

#[derive(Clone, Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct ChangeEvent {
pub(crate) client_id: ClientId,
}

#[derive(Clone, Default)]
pub(crate) struct ChangeNotifier {
subscribers: Arc<Mutex<HashMap<ClientId, Vec<UnboundedSender<ChangeEvent>>>>>,
}

impl ChangeNotifier {
pub(crate) fn subscribe(&self, client_id: ClientId) -> UnboundedReceiver<ChangeEvent> {
let (tx, rx) = unbounded();
self.subscribers
.lock()
.expect("change notifier mutex poisoned")
.entry(client_id)
.or_default()
.push(tx);
rx
}

pub(crate) fn notify(&self, client_id: ClientId) {
let event = ChangeEvent { client_id };
let mut subscribers = self
.subscribers
.lock()
.expect("change notifier mutex poisoned");
if let Some(client_subscribers) = subscribers.get_mut(&client_id) {
client_subscribers
.retain(|subscriber| subscriber.unbounded_send(event.clone()).is_ok());
}
}
}

#[get("/v1/client/events")]
pub(crate) async fn service(
req: HttpRequest,
server_state: web::Data<Arc<ServerState>>,
) -> Result<HttpResponse> {
if !server_state.web_config.sync_events {
return Err(error::ErrorNotFound("sync events are not enabled"));
}

let client_id = server_state.client_id_header(&req)?;
let stream = server_state.changes.subscribe(client_id).map(|event| {
let json = serde_json::to_string(&event).expect("change event serializes");
Ok::<_, actix_web::Error>(web::Bytes::from(format!(
"event: version\n\
data: {json}\n\
\n"
)))
});

Ok(HttpResponse::Ok()
.append_header((header::CONTENT_TYPE, "text/event-stream"))
.append_header((header::CACHE_CONTROL, "no-store, max-age=0"))
.append_header((header::CONNECTION, "keep-alive"))
.append_header((CLIENT_ID_HEADER, client_id.to_string()))
.streaming(stream))
}

#[cfg(test)]
mod test {
use super::*;
use crate::web::{WebConfig, WebServer};
use actix_web::{http::StatusCode, test, App};
use taskchampion_sync_server_core::{InMemoryStorage, ServerConfig};
use uuid::Uuid;

#[actix_rt::test]
async fn notifier_delivers_events_for_matching_client() {
let notifier = ChangeNotifier::default();
let client_id = Uuid::new_v4();
let mut rx = notifier.subscribe(client_id);

notifier.notify(client_id);
let event = rx.next().await.unwrap();
assert_eq!(event.client_id, client_id);
}

#[actix_rt::test]
async fn events_endpoint_uses_client_id_header() {
let client_id = Uuid::new_v4();
let server = WebServer::new(
ServerConfig::default(),
WebConfig {
sync_events: true,
..WebConfig::default()
},
InMemoryStorage::new(),
);
let app = App::new().configure(|sc| server.config(sc));
let app = test::init_service(app).await;

let req = test::TestRequest::get()
.uri("/v1/client/events")
.append_header((CLIENT_ID_HEADER, client_id.to_string()))
.to_request();
let resp = test::call_service(&app, req).await;

assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers().get(header::CONTENT_TYPE).unwrap(),
"text/event-stream"
);
}

#[actix_rt::test]
async fn events_endpoint_is_disabled_by_default() {
let client_id = Uuid::new_v4();
let server = WebServer::new(
ServerConfig::default(),
WebConfig::default(),
InMemoryStorage::new(),
);
let app = App::new().configure(|sc| server.config(sc));
let app = test::init_service(app).await;

let req = test::TestRequest::get()
.uri("/v1/client/events")
.append_header((CLIENT_ID_HEADER, client_id.to_string()))
.to_request();
let resp = test::call_service(&app, req).await;

assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
}
9 changes: 9 additions & 0 deletions server/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ use crate::web::WebConfig;

mod add_snapshot;
mod add_version;
mod events;
mod get_child_version;
mod get_snapshot;

pub(crate) use events::ChangeNotifier;

/// The content-type for history segments (opaque blobs of bytes)
pub(crate) const HISTORY_SEGMENT_CONTENT_TYPE: &str =
"application/vnd.taskchampion.history-segment";
Expand All @@ -31,6 +34,7 @@ pub(crate) const SNAPSHOT_REQUEST_HEADER: &str = "X-Snapshot-Request";
pub(crate) struct ServerState {
pub(crate) server: Server,
pub(crate) web_config: WebConfig,
pub(crate) changes: ChangeNotifier,
}

impl ServerState {
Expand Down Expand Up @@ -60,6 +64,7 @@ pub(crate) fn api_scope() -> Scope {
.service(add_version::service)
.service(get_snapshot::service)
.service(add_snapshot::service)
.service(events::service)
}

/// Convert a `anyhow::Error` to an Actix ISE
Expand Down Expand Up @@ -89,8 +94,10 @@ mod test {
web_config: WebConfig {
client_id_allowlist: None,
create_clients: true,
sync_events: false,
..WebConfig::default()
},
changes: ChangeNotifier::default(),
};
let req = actix_web::test::TestRequest::default()
.insert_header((CLIENT_ID_HEADER, client_id.to_string()))
Expand All @@ -107,8 +114,10 @@ mod test {
web_config: WebConfig {
client_id_allowlist: Some([client_id_ok].into()),
create_clients: true,
sync_events: false,
..WebConfig::default()
},
changes: ChangeNotifier::default(),
};
let req = actix_web::test::TestRequest::default()
.insert_header((CLIENT_ID_HEADER, client_id_ok.to_string()))
Expand Down
36 changes: 36 additions & 0 deletions server/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ pub fn command() -> Command {
.action(ArgAction::SetFalse)
.required(false),
)
.arg(
arg!(--"sync-events" "Enable Server-Sent Events change notifications at /v1/client/events")
.env("SYNC_EVENTS")
.default_value("false")
.action(ArgAction::SetTrue)
.required(false),
)
.arg(
arg!(--"snapshot-versions" <NUM> "Target number of versions between snapshots")
.value_parser(value_parser!(u32))
Expand Down Expand Up @@ -63,6 +70,7 @@ pub fn web_config_from_matches(matches: &ArgMatches) -> WebConfig {
.get_many("allow-client-id")
.map(|ids| ids.copied().collect()),
create_clients: matches.get_one("create-clients").copied().unwrap_or(true),
sync_events: matches.get_one("sync-events").copied().unwrap_or(false),
listen_addresses: matches
.get_many::<String>("listen")
.unwrap()
Expand Down Expand Up @@ -282,6 +290,34 @@ mod test {
});
}

#[test]
fn command_sync_events_default() {
with_var_unset("SYNC_EVENTS", || {
let matches = command().get_matches_from(["tss", "--listen", "localhost:8080"]);
let web_config = web_config_from_matches(&matches);
assert_eq!(web_config.sync_events, false);
});
}

#[test]
fn command_sync_events_cmdline() {
with_var_unset("SYNC_EVENTS", || {
let matches =
command().get_matches_from(["tss", "--listen", "localhost:8080", "--sync-events"]);
let web_config = web_config_from_matches(&matches);
assert_eq!(web_config.sync_events, true);
});
}

#[test]
fn command_sync_events_env_true() {
with_var("SYNC_EVENTS", Some("true"), || {
let matches = command().get_matches_from(["tss", "--listen", "localhost:8080"]);
let web_config = web_config_from_matches(&matches);
assert_eq!(web_config.sync_events, true);
});
}

#[actix_rt::test]
async fn test_index_get() {
let server = WebServer::new(
Expand Down
3 changes: 3 additions & 0 deletions server/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ fn print_error<B>(res: ServiceResponse<B>) -> actix_web::Result<ErrorHandlerResp
pub struct WebConfig {
pub client_id_allowlist: Option<HashSet<Uuid>>,
pub create_clients: bool,
pub sync_events: bool,
pub listen_addresses: Vec<String>,
}

Expand All @@ -30,6 +31,7 @@ impl Default for WebConfig {
Self {
client_id_allowlist: Default::default(),
create_clients: true,
sync_events: false,
listen_addresses: vec![],
}
}
Expand Down Expand Up @@ -57,6 +59,7 @@ impl WebServer {
server_state: Arc::new(ServerState {
server: Server::new(config, storage),
web_config,
changes: Default::default(),
}),
}
}
Expand Down
Loading