diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 6de8461..356a66b 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -1,5 +1,16 @@ # Duroxide-PG Copilot Instructions +## Important Guidelines + +> **DO NOT** commit, push to remote, or publish to crates.io unless explicitly asked by the user. +> +> Always wait for explicit user confirmation before: +> - Running `git commit` +> - Pushing commits to remote branches +> - Creating pull requests +> - Publishing to crates.io +> - Any other action that affects version control or external systems + ## Project Overview This is a **PostgreSQL provider** for [Duroxide](https://github.com/affandar/duroxide), a durable task orchestration framework for Rust. It implements the `Provider` and `ProviderAdmin` traits, storing orchestration state, history, and work queues in PostgreSQL using stored procedures. diff --git a/CHANGELOG.md b/CHANGELOG.md index 35c9513..22c37cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,27 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.1.13] - 2026-01-07 + +### Fixed + +- `cleanup_schema()` now drops all stored procedures in addition to tables (fixes #4) + - Previously only dropped tables, leaving stored procedures orphaned in public schema + - Essential for test isolation when using public schema (DROP SCHEMA CASCADE only runs for custom schemas) + - Adds DROP FUNCTION statements for all 22 stored procedures +- `prune_executions_bulk()` now includes running instances, matching SQLite behavior + - Running instances may have old ContinuedAsNew executions that need pruning + - The underlying `prune_executions()` safely skips the current execution regardless of status + +### Changed + +- Update to duroxide 0.1.11 (bug fix for version extraction during completion-only replay) + - No breaking changes for provider implementations + +### Notes + +- Total validation tests: 99 (unchanged) + ## [0.1.12] - 2026-01-06 ### Fixed diff --git a/Cargo.toml b/Cargo.toml index 3a80489..2b6a320 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = [".", "pg-stress"] [package] name = "duroxide-pg" -version = "0.1.12" +version = "0.1.13" edition = "2021" authors = ["Affan Dar "] description = "A PostgreSQL-based provider implementation for Duroxide, a durable task orchestration framework" @@ -28,7 +28,7 @@ exclude = [ ] [dependencies] -duroxide = { version = "0.1.10", features = ["provider-test"] } +duroxide = { version = "0.1.11", features = ["provider-test"] } async-trait = "0.1" tokio = { version = "1", features = ["full"] } sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "postgres", "chrono"], default-features = false } diff --git a/README.md b/README.md index b87ed15..bb42db7 100644 --- a/README.md +++ b/README.md @@ -51,18 +51,18 @@ let provider = PostgresProvider::new_with_schema( ## Features -- Automatic schema migration on startup +- **Automatic schema migration** on startup with version tracking - see [migrations/README.md](migrations/README.md) - Connection pooling via sqlx - Custom schema support for multi-tenant isolation - Full implementation of the Duroxide `Provider` and `ProviderAdmin` traits - Poison message detection with attempt count tracking - Lock renewal for long-running orchestrations and activities -## Latest Release (0.1.12) +## Latest Release (0.1.13) -- Fix migration system to handle function signature changes (DROP before CREATE) -- Resolves test failures when running against existing public schema -- 135 provider validation tests passing +- Fix `cleanup_schema()` to drop stored procedures in public schema (fixes #4) +- Update to duroxide 0.1.11 (bug fix release, no breaking changes) +- 99 provider validation tests passing - See [CHANGELOG.md](CHANGELOG.md) for full version history ## License diff --git a/migrations/0002_create_stored_procedures.sql b/migrations/0002_create_stored_procedures.sql index 0034758..b416ba3 100644 --- a/migrations/0002_create_stored_procedures.sql +++ b/migrations/0002_create_stored_procedures.sql @@ -14,14 +14,17 @@ BEGIN -- ============================================================================ -- Procedure: cleanup_schema - -- Drops all tables in the schema (for testing only) - -- SAFETY: Never drops the "public" schema itself, only tables within it + -- Drops all tables AND stored procedures in the schema (for testing only) + -- SAFETY: Never drops the "public" schema itself, only objects within it + -- NOTE: Function drops are essential for public schema cleanup since + -- DROP SCHEMA CASCADE only runs for non-public schemas EXECUTE format('DROP FUNCTION IF EXISTS %I.cleanup_schema()', v_schema_name); EXECUTE format(' CREATE OR REPLACE FUNCTION %I.cleanup_schema() RETURNS VOID AS $cleanup$ BEGIN + -- Drop tables first DROP TABLE IF EXISTS %I.instances CASCADE; DROP TABLE IF EXISTS %I.executions CASCADE; DROP TABLE IF EXISTS %I.history CASCADE; @@ -29,10 +32,41 @@ BEGIN DROP TABLE IF EXISTS %I.worker_queue CASCADE; DROP TABLE IF EXISTS %I.instance_locks CASCADE; DROP TABLE IF EXISTS %I._duroxide_migrations CASCADE; + + -- Drop all stored procedures (required for public schema cleanup) + DROP FUNCTION IF EXISTS %I.cleanup_schema(); + DROP FUNCTION IF EXISTS %I.list_instances(); + DROP FUNCTION IF EXISTS %I.list_executions(TEXT); + DROP FUNCTION IF EXISTS %I.latest_execution_id(TEXT); + DROP FUNCTION IF EXISTS %I.list_instances_by_status(TEXT); + DROP FUNCTION IF EXISTS %I.get_instance_info(TEXT); + DROP FUNCTION IF EXISTS %I.get_execution_info(TEXT, BIGINT); + DROP FUNCTION IF EXISTS %I.get_system_metrics(); + DROP FUNCTION IF EXISTS %I.get_queue_depths(BIGINT); + DROP FUNCTION IF EXISTS %I.enqueue_worker_work(TEXT); + DROP FUNCTION IF EXISTS %I.ack_worker(TEXT, TEXT, TEXT); + DROP FUNCTION IF EXISTS %I.renew_work_item_lock(TEXT, BIGINT, BIGINT); + DROP FUNCTION IF EXISTS %I.fetch_work_item(BIGINT, BIGINT); + DROP FUNCTION IF EXISTS %I.abandon_work_item(TEXT, BIGINT, BOOLEAN); + DROP FUNCTION IF EXISTS %I.enqueue_orchestrator_work(TEXT, TEXT, TIMESTAMPTZ, TEXT, TEXT, BIGINT); + DROP FUNCTION IF EXISTS %I.fetch_orchestration_item(BIGINT, BIGINT); + DROP FUNCTION IF EXISTS %I.ack_orchestration_item(TEXT, BIGINT, JSONB, JSONB, JSONB, JSONB); + DROP FUNCTION IF EXISTS %I.abandon_orchestration_item(TEXT, BIGINT); + DROP FUNCTION IF EXISTS %I.abandon_orchestration_item(TEXT, BIGINT, BOOLEAN); + DROP FUNCTION IF EXISTS %I.renew_orchestration_item_lock(TEXT, BIGINT, BIGINT); + DROP FUNCTION IF EXISTS %I.fetch_history(TEXT); + DROP FUNCTION IF EXISTS %I.fetch_history_with_execution(TEXT, BIGINT); + DROP FUNCTION IF EXISTS %I.append_history(TEXT, BIGINT, JSONB); END; $cleanup$ LANGUAGE plpgsql; ', v_schema_name, v_schema_name, v_schema_name, v_schema_name, - v_schema_name, v_schema_name, v_schema_name, v_schema_name); + v_schema_name, v_schema_name, v_schema_name, v_schema_name, + v_schema_name, v_schema_name, v_schema_name, v_schema_name, + v_schema_name, v_schema_name, v_schema_name, v_schema_name, + v_schema_name, v_schema_name, v_schema_name, v_schema_name, + v_schema_name, v_schema_name, v_schema_name, v_schema_name, + v_schema_name, v_schema_name, v_schema_name, v_schema_name, + v_schema_name, v_schema_name, v_schema_name); -- ============================================================================ -- Simple Query Procedures (Phase 3) diff --git a/migrations/README.md b/migrations/README.md index 6dcee3c..33e98f2 100644 --- a/migrations/README.md +++ b/migrations/README.md @@ -1,6 +1,20 @@ # Migration System -This project uses a custom migration system for PostgreSQL schema management. Migrations are stored as SQL files in the `migrations/` directory and are automatically applied when the provider is initialized. +This project uses a custom migration system for PostgreSQL schema management with **automatic versioning and tracking**. Migrations are stored as SQL files in the `migrations/` directory and are automatically applied when the provider is initialized. + +## Schema Versioning + +duroxide-pg tracks schema versions automatically: + +- Each migration has a **version number** (extracted from filename, e.g., `0001_initial_schema.sql` → version 1) +- Applied migrations are recorded in a **`_duroxide_migrations` table** within each schema +- The migration runner **automatically applies pending migrations** on provider startup +- **Multi-tenant safe**: Each PostgreSQL schema has its own independent migration history + +This means you can: +- Deploy new versions without manual schema updates +- Roll forward safely (new migrations apply automatically) +- Inspect migration history via `SELECT * FROM {schema}._duroxide_migrations` ## Migration Files diff --git a/src/provider.rs b/src/provider.rs index a9bfddb..91bbb36 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -1618,14 +1618,17 @@ impl ProviderAdmin for PostgresProvider { filter: InstanceFilter, options: PruneOptions, ) -> Result { - // Build query to find matching instances in terminal states + // Find matching instances (all statuses - prune_executions protects current execution) + // Note: We include Running instances because long-running orchestrations (e.g., with + // ContinueAsNew) may have old executions that need pruning. The underlying prune_executions + // call safely skips the current execution regardless of its status. let mut sql = format!( r#" SELECT i.instance_id FROM {}.instances i LEFT JOIN {}.executions e ON i.instance_id = e.instance_id AND i.current_execution_id = e.execution_id - WHERE e.status IN ('Completed', 'Failed', 'ContinuedAsNew') + WHERE 1=1 "#, self.schema_name, self.schema_name ); diff --git a/tests/regression_tests.rs b/tests/regression_tests.rs index b6499e0..5f12a46 100644 --- a/tests/regression_tests.rs +++ b/tests/regression_tests.rs @@ -290,3 +290,248 @@ async fn test_parallel_suborchestrations_stress() { "Expected all {NUM_PARENTS} to succeed, got {success}" ); } + +// ============================================================================= +// Bug: prune_executions_bulk skipping running instances +// ============================================================================= +// +// Reporter: duroxide-pg development team +// Date: January 7, 2026 +// Reference: duroxide issue #44 +// +// Summary: +// Long-running orchestrations that use ContinueAsNew (e.g., eternal orchestrations) +// accumulate many historical executions. The prune_executions_bulk API should +// allow pruning these old executions even while the orchestration is Running. +// +// There were TWO related bugs: +// 1. provider.rs: prune_executions_bulk was filtering to only terminal instances +// 2. migration 0010: prune_executions stored procedure had `AND e.status != 'Running'` +// +// Fix: +// 1. provider.rs: Changed WHERE clause from status filter to `WHERE 1=1` +// 2. migration 0010: Removed the `AND e.status != 'Running'` clause +// +// The current execution is already protected by `AND e.execution_id != v_current_execution_id` +// so the additional status check was redundant and prevented legitimate pruning. +// +// ============================================================================= + +/// Regression test for prune_executions_bulk including running instances. +/// +/// This test creates an orchestration that does multiple ContinueAsNew cycles +/// then pauses waiting for an external event. While in the Running state with +/// multiple historical executions, we call prune_executions_bulk and verify: +/// 1. The running instance IS processed (not skipped) +/// 2. Old executions ARE pruned +/// 3. The current (running) execution is NOT pruned +#[tokio::test] +async fn test_prune_bulk_includes_running_instances() { + use duroxide::providers::{InstanceFilter, PruneOptions}; + use std::sync::atomic::{AtomicU32, Ordering}; + + let schema = unique_schema_name(); + let database_url = get_database_url(); + + let provider = PostgresProvider::new_with_schema(&database_url, Some(&schema)) + .await + .expect("Failed to create provider"); + let store: Arc = Arc::new(provider); + + // Track which execution we're on + let execution_counter = Arc::new(AtomicU32::new(0)); + let execution_counter_clone = execution_counter.clone(); + + // Activity that signals execution number + let activity_registry = ActivityRegistry::builder() + .register( + "SignalExecution", + move |_ctx: ActivityContext, input: String| { + let counter = execution_counter_clone.clone(); + async move { + let exec_num: u32 = input.parse().unwrap_or(0); + counter.store(exec_num, Ordering::SeqCst); + Ok(format!("exec-{exec_num}")) + } + }, + ) + .build(); + + // Orchestration that does many ContinueAsNew cycles, pausing mid-chain + let orchestration_registry = OrchestrationRegistry::builder() + .register( + "LongRunningWithContinueAsNew", + |ctx: OrchestrationContext, count_str: String| async move { + let count: u32 = count_str.parse().unwrap_or(0); + + // Signal which execution we're on + ctx.schedule_activity("SignalExecution", count.to_string()) + .into_activity() + .await?; + + if count == 5 { + // On execution 5, wait for external signal to proceed + // This keeps us "Running" with multiple old executions + let _signal = ctx.schedule_wait("proceed").into_event().await; + } + + if count < 10 { + ctx.continue_as_new((count + 1).to_string()).await + } else { + Ok(format!("Final: {count}")) + } + }, + ) + .build(); + + let options = RuntimeOptions { + dispatcher_min_poll_interval: Duration::from_millis(50), + ..Default::default() + }; + + let rt = runtime::Runtime::start_with_options( + store.clone(), + Arc::new(activity_registry), + orchestration_registry, + options, + ) + .await; + + let client = Client::new(store.clone()); + + // Start the orchestration + client + .start_orchestration("bulk-prune-running", "LongRunningWithContinueAsNew", "0") + .await + .expect("Failed to start orchestration"); + + // Wait for execution 5 (will be waiting for external event) + let deadline = std::time::Instant::now() + Duration::from_secs(15); + while execution_counter.load(Ordering::SeqCst) < 5 { + if std::time::Instant::now() > deadline { + rt.shutdown(None).await; + let _ = cleanup_schema(&schema).await; + panic!("Orchestration never reached execution 5"); + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + + // Give a moment for the orchestration to reach the wait state + tokio::time::sleep(Duration::from_millis(200)).await; + + // Verify we're running with multiple executions + let info = client + .get_instance_info("bulk-prune-running") + .await + .expect("Failed to get instance info"); + assert_eq!( + info.status, "Running", + "Should be running and waiting for event" + ); + + let executions_before = client + .list_executions("bulk-prune-running") + .await + .expect("Failed to list executions"); + assert!( + executions_before.len() >= 5, + "Should have at least 5 executions, got {}", + executions_before.len() + ); + + // KEY TEST: Use prune_executions_bulk with no filter + // Previously this would skip running instances entirely + let result = client + .prune_executions_bulk( + InstanceFilter { + instance_ids: None, // All instances + completed_before: None, + limit: Some(100), + }, + PruneOptions { + keep_last: Some(2), + ..Default::default() + }, + ) + .await + .expect("prune_executions_bulk failed"); + + // Bulk prune should have processed our running instance + assert!( + result.instances_processed >= 1, + "Should process at least 1 instance (the running one)" + ); + + // Old executions should have been deleted + assert!( + result.executions_deleted >= 3, + "Should have pruned old executions, got {} deleted", + result.executions_deleted + ); + + // Verify the current execution (running) is still there + let info_after = client + .get_instance_info("bulk-prune-running") + .await + .expect("Failed to get instance info after prune"); + assert_eq!( + info_after.status, "Running", + "Instance should still be running" + ); + + // Should have at most 2 executions now + let executions_after = client + .list_executions("bulk-prune-running") + .await + .expect("Failed to list executions after prune"); + assert!( + executions_after.len() <= 2, + "Should have at most 2 executions after prune, got {}", + executions_after.len() + ); + assert!( + executions_after.contains(&info_after.current_execution_id), + "Current execution must be preserved" + ); + + // Resume the orchestration by sending the external event + client + .raise_event("bulk-prune-running", "proceed", "go!") + .await + .expect("Failed to raise event"); + + // Wait for completion + let deadline = std::time::Instant::now() + Duration::from_secs(15); + loop { + if let Ok(info) = client.get_instance_info("bulk-prune-running").await { + if info.status == "Completed" { + break; + } + } + if std::time::Instant::now() > deadline { + rt.shutdown(None).await; + let _ = cleanup_schema(&schema).await; + panic!("Orchestration never completed after event"); + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + + // Verify final state + let final_info = client + .get_instance_info("bulk-prune-running") + .await + .expect("Failed to get final instance info"); + assert_eq!(final_info.status, "Completed"); + assert!( + final_info + .output + .as_ref() + .map(|o| o.contains("Final: 10")) + .unwrap_or(false), + "Should complete with final value after event, got: {:?}", + final_info.output + ); + + rt.shutdown(None).await; + let _ = cleanup_schema(&schema).await; +}