Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 135 additions & 108 deletions codex-rs/code-mode/src/cell_actor/mod.rs

Large diffs are not rendered by default.

213 changes: 150 additions & 63 deletions codex-rs/code-mode/src/cell_actor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,16 @@ async fn wait_for_notification(host: &RecordingHost) {
.expect("notification barrier timed out");
}

async fn wait_for_completion(host: &RecordingHost) {
tokio::time::timeout(Duration::from_secs(1), async {
while !host.committed.load(Ordering::Acquire) {
tokio::task::yield_now().await;
}
})
.await
.expect("completion barrier timed out");
}

#[tokio::test]
async fn completion_and_output_are_buffered_until_the_first_observation() {
let host = Arc::new(RecordingHost::default());
Expand Down Expand Up @@ -210,7 +220,12 @@ async fn completion_and_output_are_buffered_until_the_first_observation() {
async fn continuing_harness_advances_an_unobserved_pending_frontier() {
let host = Arc::new(RecordingHost::default());
let harness = spawn_cell_actor_harness_with_host(Arc::clone(&host));
harness.event_tx.send(RuntimeEvent::Pending).unwrap();
harness
.event_tx
.send(RuntimeEvent::Pending {
pending_tool_call_ids: Vec::new(),
})
.unwrap();
harness
.event_tx
.send(RuntimeEvent::Notify {
Expand All @@ -220,16 +235,10 @@ async fn continuing_harness_advances_an_unobserved_pending_frontier() {
.unwrap();
wait_for_notification(&host).await;

loop {
match harness.runtime_control_rx.try_recv() {
Ok(RuntimeControlCommand::Continue) => break,
Ok(command) => panic!("expected continue, got {command:?}"),
Err(std_mpsc::TryRecvError::Empty) => tokio::task::yield_now().await,
Err(std_mpsc::TryRecvError::Disconnected) => {
panic!("runtime control channel disconnected")
}
}
}
assert!(matches!(
harness.runtime_control_rx.try_recv(),
Ok(RuntimeControlCommand::Continue)
));
let termination = harness.handle.terminate();
drop(harness.event_tx);
assert_eq!(
Expand All @@ -245,7 +254,12 @@ async fn continuing_harness_advances_an_unobserved_pending_frontier() {
async fn pending_frontier_is_buffered_while_runtime_commands_are_queued() {
let host = Arc::new(RecordingHost::default());
let harness = spawn_pausable_cell_actor_harness_with_host(Arc::clone(&host));
harness.event_tx.send(RuntimeEvent::Pending).unwrap();
harness
.event_tx
.send(RuntimeEvent::Pending {
pending_tool_call_ids: Vec::new(),
})
.unwrap();
harness
.event_tx
.send(RuntimeEvent::Notify {
Expand All @@ -270,10 +284,11 @@ async fn pending_frontier_is_buffered_while_runtime_commands_are_queued() {
));
assert_eq!(
harness.handle.observe(ObserveMode::PendingFrontier).await,
Ok(CellEvent::Pending {
Ok(CellEvent::Pending(PendingFrontier {
generation: PendingGeneration::new(/*value*/ 1),
content_items: Vec::new(),
pending_tool_call_ids: Vec::new(),
})
}))
);
assert!(matches!(
harness.runtime_control_rx.try_recv(),
Expand All @@ -292,54 +307,75 @@ async fn pending_frontier_is_buffered_while_runtime_commands_are_queued() {
}

#[tokio::test]
async fn buffered_yield_observation_resumes_an_unobserved_pending_frontier() {
async fn termination_preserves_an_unobserved_pending_frontier() {
let host = Arc::new(RecordingHost::default());
let harness = spawn_pausable_cell_actor_harness_with_host(Arc::clone(&host));
harness.event_tx.send(RuntimeEvent::YieldRequested).unwrap();
harness.event_tx.send(RuntimeEvent::Pending).unwrap();
harness
.event_tx
.send(RuntimeEvent::ContentItem(
FunctionCallOutputContentItem::InputText {
text: "unobserved".to_string(),
},
))
.unwrap();
harness
.event_tx
.send(RuntimeEvent::Pending {
pending_tool_call_ids: Vec::new(),
})
.unwrap();
harness
.event_tx
.send(RuntimeEvent::Notify {
call_id: "notify-1".to_string(),
text: "pending processed".to_string(),
call_id: "pending-barrier".to_string(),
text: "barrier".to_string(),
})
.unwrap();
while !host.notified.load(Ordering::Acquire) {
tokio::task::yield_now().await;
}
wait_for_notification(&host).await;

let termination = harness.handle.terminate();
drop(harness.event_tx);
assert_eq!(
harness
.handle
.observe(ObserveMode::YieldAfter(Duration::from_secs(60)))
.await,
Ok(CellEvent::Yielded {
content_items: Vec::new(),
termination.await,
Ok(CellEvent::Terminated {
content_items: vec![OutputItem::Text {
text: "unobserved".to_string(),
}],
})
);
loop {
match harness.runtime_control_rx.try_recv() {
Ok(RuntimeControlCommand::Continue) => break,
Ok(command) => panic!("expected continue, got {command:?}"),
Err(std_mpsc::TryRecvError::Empty) => tokio::task::yield_now().await,
Err(std_mpsc::TryRecvError::Disconnected) => {
panic!("runtime control channel disconnected")
}
}
}
harness.task.await.unwrap();
}

host.notified.store(false, Ordering::Release);
harness.event_tx.send(RuntimeEvent::Pending).unwrap();
#[tokio::test]
async fn repeated_pending_observation_does_not_resume_an_unobserved_frontier() {
let host = Arc::new(RecordingHost::default());
let harness = spawn_pausable_cell_actor_harness_with_host(Arc::clone(&host));
harness.event_tx.send(RuntimeEvent::YieldRequested).unwrap();
harness
.event_tx
.send(RuntimeEvent::Pending {
pending_tool_call_ids: Vec::new(),
})
.unwrap();
harness
.event_tx
.send(RuntimeEvent::Notify {
call_id: "notify-2".to_string(),
text: "later pending processed".to_string(),
call_id: "notify-1".to_string(),
text: "pending processed".to_string(),
})
.unwrap();
while !host.notified.load(Ordering::Acquire) {
tokio::task::yield_now().await;
}

assert_eq!(
harness.handle.observe(ObserveMode::PendingFrontier).await,
Ok(CellEvent::Pending(PendingFrontier {
generation: PendingGeneration::new(/*value*/ 1),
content_items: Vec::new(),
pending_tool_call_ids: Vec::new(),
}))
);
assert!(matches!(
harness.runtime_control_rx.try_recv(),
Err(std_mpsc::TryRecvError::Empty)
Expand Down Expand Up @@ -425,7 +461,7 @@ async fn first_observation_preserves_a_yield_that_raced_with_creation() {
}

#[tokio::test]
async fn dropped_pending_observer_preserves_pre_observation_yield() {
async fn dropped_pending_observer_preserves_the_durable_frontier() {
let host = Arc::new(RecordingHost::default());
let harness = spawn_pausable_cell_actor_harness_with_host(Arc::clone(&host));
harness
Expand Down Expand Up @@ -461,7 +497,12 @@ async fn dropped_pending_observer_preserves_pre_observation_yield() {
Err(CellError::Busy)
);
drop(dropped_observation);
harness.event_tx.send(RuntimeEvent::Pending).unwrap();
harness
.event_tx
.send(RuntimeEvent::Pending {
pending_tool_call_ids: Vec::new(),
})
.unwrap();
harness
.event_tx
.send(RuntimeEvent::Notify {
Expand All @@ -472,29 +513,27 @@ async fn dropped_pending_observer_preserves_pre_observation_yield() {
wait_for_notification(&host).await;

assert_eq!(
tokio::time::timeout(
Duration::from_secs(1),
harness
.handle
.observe(ObserveMode::YieldAfter(Duration::from_secs(60))),
)
.await
.expect("initial yield was not preserved after failed pending delivery"),
Ok(CellEvent::Yielded {
content_items: vec![OutputItem::Text {
text: "before".to_string(),
}],
})
harness.handle.observe(ObserveMode::PendingFrontier).await,
Ok(CellEvent::Pending(PendingFrontier {
generation: PendingGeneration::new(/*value*/ 1),
content_items: vec![
OutputItem::Text {
text: "before".to_string(),
},
OutputItem::Text {
text: "after".to_string(),
},
],
pending_tool_call_ids: Vec::new(),
}))
);

let termination = harness.handle.terminate();
drop(harness.event_tx);
assert_eq!(
termination.await,
Ok(CellEvent::Terminated {
content_items: vec![OutputItem::Text {
text: "after".to_string(),
}],
content_items: Vec::new(),
})
);
harness.task.await.unwrap();
Expand Down Expand Up @@ -861,7 +900,12 @@ async fn dropped_pending_observer_preserves_the_frontier_for_the_next_observatio
input: Some(serde_json::json!({})),
})
.unwrap();
harness.event_tx.send(RuntimeEvent::Pending).unwrap();
harness
.event_tx
.send(RuntimeEvent::Pending {
pending_tool_call_ids: vec!["tool-1".to_string()],
})
.unwrap();
harness
.event_tx
.send(RuntimeEvent::Notify {
Expand All @@ -873,10 +917,11 @@ async fn dropped_pending_observer_preserves_the_frontier_for_the_next_observatio

assert_eq!(
harness.handle.observe(ObserveMode::PendingFrontier).await,
Ok(CellEvent::Pending {
Ok(CellEvent::Pending(PendingFrontier {
generation: PendingGeneration::new(/*value*/ 1),
content_items: Vec::new(),
pending_tool_call_ids: vec!["tool-1".to_string()],
})
}))
);
assert!(matches!(
harness.runtime_control_rx.try_recv(),
Expand All @@ -894,6 +939,48 @@ async fn dropped_pending_observer_preserves_the_frontier_for_the_next_observatio
harness.task.await.unwrap();
}

#[tokio::test]
async fn unexpected_runtime_loss_preserves_an_unobserved_pending_frontier() {
let host = Arc::new(RecordingHost::default());
let harness = spawn_pausable_cell_actor_harness_with_host(Arc::clone(&host));
harness
.event_tx
.send(RuntimeEvent::ContentItem(
FunctionCallOutputContentItem::InputText {
text: "pending output".to_string(),
},
))
.unwrap();
harness
.event_tx
.send(RuntimeEvent::Pending {
pending_tool_call_ids: vec!["tool-1".to_string()],
})
.unwrap();
harness
.event_tx
.send(RuntimeEvent::Notify {
call_id: "pending-frontier-barrier".to_string(),
text: "barrier".to_string(),
})
.unwrap();
wait_for_notification(&host).await;

drop(harness.event_tx);
wait_for_completion(&host).await;

assert_eq!(
harness.handle.observe(ObserveMode::PendingFrontier).await,
Ok(CellEvent::Completed {
content_items: vec![OutputItem::Text {
text: "pending output".to_string(),
}],
error_text: Some("exec runtime ended unexpectedly".to_string()),
})
);
harness.task.await.unwrap();
}

#[tokio::test]
async fn only_the_first_termination_claims_a_buffered_completion() {
let cell_state = CellState::new(CancellationToken::new());
Expand Down
Loading
Loading