Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Duroxide-PG Copilot Instructions

## Important Guidelines

> **DO NOT** commit, push to remote, or publish to crates.io unless explicitly asked by the user.
>
> Always wait for explicit user confirmation before:
> - Running `git commit`
> - Pushing commits to remote branches
> - Creating pull requests
> - Publishing to crates.io
> - Any other action that affects version control or external systems

## Project Overview
This is a **PostgreSQL provider** for [Duroxide](https://github.com/affandar/duroxide), a durable task orchestration framework for Rust. It implements the `Provider` and `ProviderAdmin` traits, storing orchestration state, history, and work queues in PostgreSQL using stored procedures.

Expand Down
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,27 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.1.13] - 2026-01-07

### Fixed

- `cleanup_schema()` now drops all stored procedures in addition to tables (fixes #4)
- Previously only dropped tables, leaving stored procedures orphaned in public schema
- Essential for test isolation when using public schema (DROP SCHEMA CASCADE only runs for custom schemas)
- Adds DROP FUNCTION statements for all 22 stored procedures
- `prune_executions_bulk()` now includes running instances, matching SQLite behavior
- Running instances may have old ContinuedAsNew executions that need pruning
- The underlying `prune_executions()` safely skips the current execution regardless of status

### Changed

- Update to duroxide 0.1.11 (bug fix for version extraction during completion-only replay)
- No breaking changes for provider implementations

### Notes

- Total validation tests: 99 (unchanged)

## [0.1.12] - 2026-01-06

### Fixed
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = [".", "pg-stress"]

[package]
name = "duroxide-pg"
version = "0.1.12"
version = "0.1.13"
edition = "2021"
authors = ["Affan Dar <affandar@gmail.com>"]
description = "A PostgreSQL-based provider implementation for Duroxide, a durable task orchestration framework"
Expand All @@ -28,7 +28,7 @@ exclude = [
]

[dependencies]
duroxide = { version = "0.1.10", features = ["provider-test"] }
duroxide = { version = "0.1.11", features = ["provider-test"] }
async-trait = "0.1"
tokio = { version = "1", features = ["full"] }
sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "postgres", "chrono"], default-features = false }
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,18 @@ let provider = PostgresProvider::new_with_schema(

## Features

- Automatic schema migration on startup
- **Automatic schema migration** on startup with version tracking - see [migrations/README.md](migrations/README.md)
- Connection pooling via sqlx
- Custom schema support for multi-tenant isolation
- Full implementation of the Duroxide `Provider` and `ProviderAdmin` traits
- Poison message detection with attempt count tracking
- Lock renewal for long-running orchestrations and activities

## Latest Release (0.1.12)
## Latest Release (0.1.13)

- Fix migration system to handle function signature changes (DROP before CREATE)
- Resolves test failures when running against existing public schema
- 135 provider validation tests passing
- Fix `cleanup_schema()` to drop stored procedures in public schema (fixes #4)
- Update to duroxide 0.1.11 (bug fix release, no breaking changes)
- 99 provider validation tests passing
- See [CHANGELOG.md](CHANGELOG.md) for full version history

## License
Expand Down
40 changes: 37 additions & 3 deletions migrations/0002_create_stored_procedures.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,59 @@ BEGIN
-- ============================================================================

-- Procedure: cleanup_schema
-- Drops all tables in the schema (for testing only)
-- SAFETY: Never drops the "public" schema itself, only tables within it
-- Drops all tables AND stored procedures in the schema (for testing only)
-- SAFETY: Never drops the "public" schema itself, only objects within it
-- NOTE: Function drops are essential for public schema cleanup since
-- DROP SCHEMA CASCADE only runs for non-public schemas
EXECUTE format('DROP FUNCTION IF EXISTS %I.cleanup_schema()', v_schema_name);

EXECUTE format('
CREATE OR REPLACE FUNCTION %I.cleanup_schema()
RETURNS VOID AS $cleanup$
BEGIN
-- Drop tables first
DROP TABLE IF EXISTS %I.instances CASCADE;
DROP TABLE IF EXISTS %I.executions CASCADE;
DROP TABLE IF EXISTS %I.history CASCADE;
DROP TABLE IF EXISTS %I.orchestrator_queue CASCADE;
DROP TABLE IF EXISTS %I.worker_queue CASCADE;
DROP TABLE IF EXISTS %I.instance_locks CASCADE;
DROP TABLE IF EXISTS %I._duroxide_migrations CASCADE;

-- Drop all stored procedures (required for public schema cleanup)
DROP FUNCTION IF EXISTS %I.cleanup_schema();
DROP FUNCTION IF EXISTS %I.list_instances();
DROP FUNCTION IF EXISTS %I.list_executions(TEXT);
DROP FUNCTION IF EXISTS %I.latest_execution_id(TEXT);
DROP FUNCTION IF EXISTS %I.list_instances_by_status(TEXT);
DROP FUNCTION IF EXISTS %I.get_instance_info(TEXT);
DROP FUNCTION IF EXISTS %I.get_execution_info(TEXT, BIGINT);
DROP FUNCTION IF EXISTS %I.get_system_metrics();
DROP FUNCTION IF EXISTS %I.get_queue_depths(BIGINT);
DROP FUNCTION IF EXISTS %I.enqueue_worker_work(TEXT);
DROP FUNCTION IF EXISTS %I.ack_worker(TEXT, TEXT, TEXT);
DROP FUNCTION IF EXISTS %I.renew_work_item_lock(TEXT, BIGINT, BIGINT);
DROP FUNCTION IF EXISTS %I.fetch_work_item(BIGINT, BIGINT);
DROP FUNCTION IF EXISTS %I.abandon_work_item(TEXT, BIGINT, BOOLEAN);
DROP FUNCTION IF EXISTS %I.enqueue_orchestrator_work(TEXT, TEXT, TIMESTAMPTZ, TEXT, TEXT, BIGINT);
DROP FUNCTION IF EXISTS %I.fetch_orchestration_item(BIGINT, BIGINT);
DROP FUNCTION IF EXISTS %I.ack_orchestration_item(TEXT, BIGINT, JSONB, JSONB, JSONB, JSONB);
DROP FUNCTION IF EXISTS %I.abandon_orchestration_item(TEXT, BIGINT);
DROP FUNCTION IF EXISTS %I.abandon_orchestration_item(TEXT, BIGINT, BOOLEAN);
DROP FUNCTION IF EXISTS %I.renew_orchestration_item_lock(TEXT, BIGINT, BIGINT);
DROP FUNCTION IF EXISTS %I.fetch_history(TEXT);
DROP FUNCTION IF EXISTS %I.fetch_history_with_execution(TEXT, BIGINT);
DROP FUNCTION IF EXISTS %I.append_history(TEXT, BIGINT, JSONB);
END;
$cleanup$ LANGUAGE plpgsql;
', v_schema_name, v_schema_name, v_schema_name, v_schema_name,
v_schema_name, v_schema_name, v_schema_name, v_schema_name);
v_schema_name, v_schema_name, v_schema_name, v_schema_name,
v_schema_name, v_schema_name, v_schema_name, v_schema_name,
v_schema_name, v_schema_name, v_schema_name, v_schema_name,
v_schema_name, v_schema_name, v_schema_name, v_schema_name,
v_schema_name, v_schema_name, v_schema_name, v_schema_name,
v_schema_name, v_schema_name, v_schema_name, v_schema_name,
v_schema_name, v_schema_name, v_schema_name);

-- ============================================================================
-- Simple Query Procedures (Phase 3)
Expand Down
16 changes: 15 additions & 1 deletion migrations/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
# Migration System

This project uses a custom migration system for PostgreSQL schema management. Migrations are stored as SQL files in the `migrations/` directory and are automatically applied when the provider is initialized.
This project uses a custom migration system for PostgreSQL schema management with **automatic versioning and tracking**. Migrations are stored as SQL files in the `migrations/` directory and are automatically applied when the provider is initialized.

## Schema Versioning

duroxide-pg tracks schema versions automatically:

- Each migration has a **version number** (extracted from filename, e.g., `0001_initial_schema.sql` → version 1)
- Applied migrations are recorded in a **`_duroxide_migrations` table** within each schema
- The migration runner **automatically applies pending migrations** on provider startup
- **Multi-tenant safe**: Each PostgreSQL schema has its own independent migration history

This means you can:
- Deploy new versions without manual schema updates
- Roll forward safely (new migrations apply automatically)
- Inspect migration history via `SELECT * FROM {schema}._duroxide_migrations`

## Migration Files

Expand Down
7 changes: 5 additions & 2 deletions src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1618,14 +1618,17 @@ impl ProviderAdmin for PostgresProvider {
filter: InstanceFilter,
options: PruneOptions,
) -> Result<PruneResult, ProviderError> {
// Build query to find matching instances in terminal states
// Find matching instances (all statuses - prune_executions protects current execution)
// Note: We include Running instances because long-running orchestrations (e.g., with
// ContinueAsNew) may have old executions that need pruning. The underlying prune_executions
// call safely skips the current execution regardless of its status.
let mut sql = format!(
r#"
SELECT i.instance_id
FROM {}.instances i
LEFT JOIN {}.executions e ON i.instance_id = e.instance_id
AND i.current_execution_id = e.execution_id
WHERE e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
WHERE 1=1
"#,
self.schema_name, self.schema_name
);
Expand Down
Loading