Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 1 addition & 27 deletions crates/buzz-acp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1605,21 +1605,6 @@ async fn tokio_main() -> Result<()> {
// their sessions stripped when they return to the pool.
removed_channels.insert(ch);
typing_channels.remove(&ch);
// Best-effort: clean up 👀 on drained events.
// Note: the relay revokes membership before
// emitting the notification, so this DELETE may
// 403 on non-open channels. Stale 👀 in that
// case is a known limitation — fix belongs in
// the relay (clean up bot reactions on removal).
if !drained_ids.is_empty() {
let rc = ctx.rest_client.clone();
let ids = drained_ids.clone();
tokio::spawn(async move {
for eid in &ids {
pool::reaction_remove(&rc, eid, "👀").await;
}
});
}
if !drained_ids.is_empty() || invalidated > 0 {
tracing::info!(
channel_id = %ch,
Expand Down Expand Up @@ -1783,24 +1768,13 @@ async fn tokio_main() -> Result<()> {
// Capture author pubkey before queue.push() moves
// buzz_event.event (needed for mode gate below).
let author_hex = buzz_event.event.pubkey.to_hex();
let event_id_hex = buzz_event.event.id.to_hex();
let accepted = queue.push(QueuedEvent {
channel_id: buzz_event.channel_id,
event: buzz_event.event,
received_at: std::time::Instant::now(),
prompt_tag,
});
// 👀 — immediate "seen" reaction, only if the event
// was actually queued (not dropped by DedupMode::Drop).
// Fire-and-forget: on rare fast-failure paths the
// guard's cleanup may race with this add, leaving a
// cosmetic stale 👀. Acceptable — see ReactionGuard docs.
if accepted {
let rc = ctx.rest_client.clone();
tokio::spawn(async move {
pool::reaction_add(&rc, &event_id_hex, "👀").await;
});
}
// ── Multiple-event-handling mode gate ─────────────
// Event is already queued. If mode requires it AND
// the channel has an in-flight task, fire cancel.
if accepted && queue.is_channel_in_flight(buzz_event.channel_id) {
Expand Down
292 changes: 5 additions & 287 deletions crates/buzz-acp/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,15 +775,7 @@ pub async fn run_prompt_task(
}),
);

// Collects event IDs up front. On drop (any exit path — normal, early
// return, or panic), spawns best-effort cleanup of both 👀 and 💬.
// See `ReactionGuard` docs for ordering guarantees and known edge cases.
let reaction_ids: Vec<String> = batch
.as_ref()
.map(|b| b.events.iter().map(|be| be.event.id.to_hex()).collect())
.unwrap_or_default();
let _reaction_guard = ReactionGuard::new(ctx.rest_client.clone(), reaction_ids.clone());

// ── Turn completion guard ─────────────────────────────────────────────
// Emits `turn_completed` on any exit path. Captures observer handle and
// metadata now, before the agent is moved into PromptResult.
let _turn_guard = TurnCompletionGuard::new(
Expand Down Expand Up @@ -1134,16 +1126,7 @@ pub async fn run_prompt_task(
return;
};

// 💬 — fire-and-forget so the prompt fires immediately.
// The guard's cleanup (spawned on drop) removes 💬 after the turn completes.
// A brief race where 💬 appears slightly after the agent starts is acceptable.
if !reaction_ids.is_empty() {
let rest = ctx.rest_client.clone();
let ids = reaction_ids.clone();
tokio::spawn(async move {
react_working(&rest, &ids).await;
});
}
// ── Send the actual prompt ────────────────────────────────────────────

// Slash-command pass-through sends the bare command as the first text
// block (so connector detection fires), then each prompt section as its
Expand Down Expand Up @@ -1452,7 +1435,6 @@ pub async fn run_prompt_task(
});
}
}
// _reaction_guard drops here → spawns clear_reactions for all exit paths.
}

/// Retry wrapper for context fetches: one retry with `CONTEXT_FETCH_RETRY_DELAY`
Expand Down Expand Up @@ -2060,67 +2042,7 @@ fn log_stop_reason(source: &PromptSource, stop_reason: &StopReason) {
}
}

//
// Two-phase lifecycle visible to users:
// 👀 "seen" — event was queued and an agent will handle it
// 💬 "working" — agent is actively prompting
//
// 💬 is awaited inline in `run_prompt_task` before the prompt fires, so
// add-before-remove ordering is structural. 👀 is fire-and-forget from
// `main.rs` at queue-push time for immediate responsiveness; on rare
// fast-failure paths the guard's cleanup may race with the 👀 add,
// leaving a cosmetic stale 👀 (see `ReactionGuard` docs).
//
// Cleanup is fire-and-forget via `ReactionGuard` (spawned on drop).
// Failures are debug-logged and ignored — reactions are cosmetic.

/// Drop guard that spawns reaction cleanup on any exit path.
///
/// Created at the top of `run_prompt_task`. On drop — normal return, early
/// return, or panic — spawns fire-and-forget removal of both 👀 and 💬.
///
/// ## Ordering
///
/// 💬 (`react_working`) is fire-and-forget (spawned before the prompt fires).
/// A brief race where 💬 appears slightly after the agent starts is acceptable.
///
/// 👀 (`react_seen`) is fire-and-forget from `main.rs` at queue-push time.
/// On rare fast-failure paths (e.g., `session_new` error on an idle agent),
/// the cleanup spawn may race with the 👀 add, leaving a stale 👀. This is
/// accepted as a cosmetic edge case — the message will be retried and the
/// stale 👀 is harmless.
struct ReactionGuard {
rest: Option<crate::relay::RestClient>,
ids: Vec<String>,
}

impl ReactionGuard {
fn new(rest: crate::relay::RestClient, ids: Vec<String>) -> Self {
Self {
rest: if ids.is_empty() { None } else { Some(rest) },
ids,
}
}
}

impl Drop for ReactionGuard {
fn drop(&mut self) {
// Guard against drop outside a tokio runtime (e.g., in unit tests or
// during process teardown before the runtime is fully initialized).
// `run_prompt_task` is always spawned via `JoinSet::spawn`, so a
// runtime handle is normally available; `try_current` is the safe
// fallback for the rare cases it isn't.
if let Some(rest) = self.rest.take() {
let ids = std::mem::take(&mut self.ids);
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(clear_reactions(rest, ids));
}
// If no runtime is available, reactions are left as-is — they are
// cosmetic indicators and the stale state is harmless.
}
}
}

// ── Turn liveness emission ───────────────────────────────────────────────────
// Periodically emits a `turn_liveness` observer event while a turn is in-flight,
// so the desktop can prune turns whose host died without unwinding (kill -9 /
// crash) far sooner than the no-activity backstop. Runs as a non-resolving
Expand Down Expand Up @@ -2205,178 +2127,7 @@ impl Drop for TurnCompletionGuard {
}
}

const REACTION_SEEN: &str = "👀";
const REACTION_WORKING: &str = "💬";

/// Best-effort timeout for a single reaction REST call.
const REACTION_TIMEOUT: Duration = Duration::from_millis(500);

/// Percent-encode a string for use in a URL path segment (used in tests only).
#[cfg(test)]
fn pct_encode(s: &str) -> String {
let mut out = String::with_capacity(s.len() * 3);
for byte in s.bytes() {
match byte {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
out.push(byte as char);
}
_ => {
use std::fmt::Write;
let _ = write!(out, "%{byte:02X}");
}
}
}
out
}

/// Best-effort: add a reaction via a signed Nostr kind-7 event (NIP-25).
///
/// Builds a reaction event with `buzz_sdk::build_reaction`, signs it with
/// the keys already stored in `RestClient`, and submits via `POST /events`.
/// Returns immediately on timeout or any error — reactions are cosmetic.
pub(crate) async fn reaction_add(rest: &crate::relay::RestClient, event_id: &str, emoji: &str) {
let target_id = match nostr::EventId::from_hex(event_id) {
Ok(id) => id,
Err(e) => {
tracing::debug!(event_id, emoji, "reaction add: invalid event ID: {e}");
return;
}
};
let builder = match buzz_sdk::build_reaction(target_id, emoji) {
Ok(b) => b,
Err(e) => {
tracing::warn!(event_id, emoji, "reaction add: build failed: {e}");
return;
}
};
let event = match builder.sign_with_keys(&rest.keys) {
Ok(e) => e,
Err(e) => {
tracing::warn!(event_id, emoji, "reaction add: sign failed: {e}");
return;
}
};
match tokio::time::timeout(REACTION_TIMEOUT, rest.submit_event(&event)).await {
Ok(Ok(_)) => {}
Ok(Err(e)) => tracing::debug!(event_id, emoji, "reaction add failed: {e}"),
Err(_) => tracing::debug!(event_id, emoji, "reaction add timed out"),
}
}

/// Best-effort: remove a reaction via a signed kind:5 (NIP-09) deletion event.
///
/// Queries kind:7 reactions by our pubkey targeting the event, finds the matching
/// emoji, then submits a signed kind:5 deletion via `POST /events`.
/// Returns immediately on timeout or any error — reactions are cosmetic.
pub(crate) async fn reaction_remove(rest: &crate::relay::RestClient, event_id: &str, emoji: &str) {
use nostr::{Alphabet, SingleLetterTag};

// Step 1: query our kind:7 reactions targeting this event.
let my_pubkey = rest.keys.public_key();
let e_tag = SingleLetterTag::lowercase(Alphabet::E);
let filter = nostr::Filter::new()
.kind(nostr::Kind::Reaction)
.author(my_pubkey)
.custom_tags(e_tag, [event_id]);

let resp = match tokio::time::timeout(Duration::from_millis(1_000), rest.query(&[filter])).await
{
Ok(Ok(v)) => v,
Ok(Err(e)) => {
tracing::debug!(event_id, emoji, "reaction remove: query failed: {e}");
return;
}
Err(_) => {
tracing::debug!(event_id, emoji, "reaction remove: query timed out");
return;
}
};

// Find our reaction event with matching emoji content.
let reid = resp.as_array().and_then(|events| {
events.iter().find_map(|ev| {
let content = ev.get("content")?.as_str()?;
if content != emoji {
return None;
}
ev.get("id")?.as_str().map(|s| s.to_string())
})
});

let reid = match reid {
Some(id) => id,
None => {
tracing::debug!(event_id, emoji, "reaction remove: no reaction event found");
return;
}
};

// Step 2: build and submit a signed kind:5 deletion for the reaction event.
let target_id = match nostr::EventId::from_hex(&reid) {
Ok(id) => id,
Err(e) => {
tracing::debug!(
event_id,
emoji,
"reaction remove: invalid reaction event ID: {e}"
);
return;
}
};
let builder = match buzz_sdk::build_remove_reaction(target_id) {
Ok(b) => b,
Err(e) => {
tracing::warn!(event_id, emoji, "reaction remove: build failed: {e}");
return;
}
};
let event = match builder.sign_with_keys(&rest.keys) {
Ok(e) => e,
Err(e) => {
tracing::warn!(event_id, emoji, "reaction remove: sign failed: {e}");
return;
}
};
match tokio::time::timeout(Duration::from_millis(1_000), rest.submit_event(&event)).await {
Ok(Ok(_)) => {}
Ok(Err(e)) => tracing::debug!(event_id, emoji, "reaction remove failed: {e}"),
Err(_) => tracing::debug!(event_id, emoji, "reaction remove timed out"),
}
}

/// Maximum concurrent reaction HTTP requests per fan-out call.
/// Prevents unbounded parallelism when a large batch of events arrives.
const REACTION_CONCURRENCY: usize = 10;

/// Add 💬 to all events, capped at `REACTION_CONCURRENCY` concurrent requests.
/// Awaited inline before the prompt fires.
async fn react_working(rest: &crate::relay::RestClient, event_ids: &[String]) {
for chunk in event_ids.chunks(REACTION_CONCURRENCY) {
futures_util::future::join_all(
chunk
.iter()
.map(|eid| reaction_add(rest, eid, REACTION_WORKING)),
)
.await;
}
}

/// Fire-and-forget: remove both 👀 and 💬 from all events. Spawned on turn complete.
/// Capped at `REACTION_CONCURRENCY` concurrent requests per chunk to avoid
/// unbounded HTTP fan-out on large batches.
async fn clear_reactions(rest: crate::relay::RestClient, event_ids: Vec<String>) {
// Each event needs two removals (👀 and 💬); pair them and chunk by
// REACTION_CONCURRENCY pairs so the total concurrent requests stay bounded.
for chunk in event_ids.chunks(REACTION_CONCURRENCY) {
futures_util::future::join_all(chunk.iter().flat_map(|eid| {
[
reaction_remove(&rest, eid, REACTION_SEEN),
reaction_remove(&rest, eid, REACTION_WORKING),
]
}))
.await;
}
}
// ─── Unit Tests ──────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
Expand Down Expand Up @@ -2844,40 +2595,7 @@ mod tests {
assert_eq!(msg.pubkey, "unknown");
}

#[test]
fn test_pct_encode_hex_passthrough() {
let hex = "a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2";
assert_eq!(pct_encode(hex), hex);
}

#[test]
fn test_pct_encode_emoji() {
// 👀 = U+1F440 = F0 9F 91 80 in UTF-8
assert_eq!(pct_encode("👀"), "%F0%9F%91%80");
}

#[test]
fn test_pct_encode_emoji_speech_balloon() {
// 💬 = U+1F4AC = F0 9F 92 AC in UTF-8
assert_eq!(pct_encode("💬"), "%F0%9F%92%AC");
}

#[test]
fn test_pct_encode_empty() {
assert_eq!(pct_encode(""), "");
}

#[test]
fn test_pct_encode_unreserved_passthrough() {
assert_eq!(pct_encode("AZaz09-_.~"), "AZaz09-_.~");
}

#[test]
fn test_pct_encode_reserved_chars() {
assert_eq!(pct_encode("/"), "%2F");
assert_eq!(pct_encode("+"), "%2B");
assert_eq!(pct_encode(" "), "%20");
}
// ── SessionState tests ───────────────────────────────────────────────

fn make_state() -> (SessionState, Uuid, Uuid) {
let ch_a = Uuid::new_v4();
Expand Down
Loading