Skip to content

Commit

Permalink
Add description of schema example
Browse files Browse the repository at this point in the history
  • Loading branch information
Johnabell committed Mar 25, 2024
1 parent dea2073 commit e249895
Showing 1 changed file with 71 additions and 19 deletions.
90 changes: 71 additions & 19 deletions examples/schema/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,26 @@
//! This example demonstrates several stages of the evolution of a system.
//!
//! The first part for an event store with no schema (where the `Aggregate::Event` type implement
//! `Event`.
//!
//! The next evolution of the system is the introduction of the `Schema` to the event store. This
//! removes the need for `Aggregate::Event` to implement `Event` instead relies on the
//! `From<Aggregate::Event>` and `Into<Option<Aggregate::Event>>` on `Schema` and the fact that
//! `Schema` implements event.
//!
//! The next evolution demonstrates how an event can be deprecated using the `Schema` mechanism. A
//! new event variant is introduced and one is deprecated. The use of the schema enables the rest
//! of the system to not have to concern itself with the deprecated events. The only place it still
//! exists is as a variant of the `Schema` type.
//!
//! The final part is an alternative way for the system to evolve using the `Schema` mechanism as a
//! way to upcast from one shape of and event to another.
use std::marker::PhantomData;

use esrs::manager::AggregateManager;
use esrs::store::postgres::{PgStore, PgStoreBuilder};
use esrs::store::EventStore;
use esrs::Aggregate;
use esrs::{Aggregate, AggregateState};
use serde::{Deserialize, Serialize};
use uuid::Uuid;

Expand Down Expand Up @@ -44,20 +61,20 @@ struct SchemaState<EventType> {

enum SchemaCommand {}

#[derive(Default, Debug)]
#[derive(Default)]
enum SchemaEventUpcasted {
#[default]
EmptyEvent,
EventB {
EventA {
contents: String,
count: u64,
},
EventC {
contents: String,
EventB {
count: u64,
},
}

#[derive(Default, Debug)]
#[derive(Default)]
enum SchemaEvent {
#[default]
EmptyEvent,
Expand All @@ -70,6 +87,21 @@ enum SchemaEvent {
},
}

#[derive(Default, Deserialize, Serialize)]
enum SchemaEventNoSchema {
#[default]
EmptyEvent,
EventA {
contents: String,
},
EventB {
count: u64,
},
}

#[cfg(feature = "upcasting")]
impl esrs::event::Upcaster for SchemaEventNoSchema {}

#[derive(Default)]
enum SchemaEventOld {
#[default]
Expand Down Expand Up @@ -108,7 +140,7 @@ impl From<SchemaEventUpcasted> for Schema {
match value {
SchemaEventUpcasted::EmptyEvent => Schema::EmptyEvent,
SchemaEventUpcasted::EventB { count } => Schema::EventB { count },
SchemaEventUpcasted::EventC { contents, count } => Schema::EventC { contents, count },
SchemaEventUpcasted::EventA { contents, count } => Schema::EventC { contents, count },
}
}
}
Expand Down Expand Up @@ -149,9 +181,9 @@ impl From<Schema> for Option<SchemaEventUpcasted> {
fn from(val: Schema) -> Self {
match val {
Schema::EmptyEvent => Some(SchemaEventUpcasted::EmptyEvent),
Schema::EventA { contents } => Some(SchemaEventUpcasted::EventC { contents, count: 1 }),
Schema::EventA { contents } => Some(SchemaEventUpcasted::EventA { contents, count: 1 }),
Schema::EventB { count } => Some(SchemaEventUpcasted::EventB { count }),
Schema::EventC { contents, count } => Some(SchemaEventUpcasted::EventC { contents, count }),
Schema::EventC { contents, count } => Some(SchemaEventUpcasted::EventA { contents, count }),
}
}
}
Expand All @@ -161,7 +193,24 @@ async fn main() {
let pool = new_pool().await;
let aggregate_id: Uuid = Uuid::new_v4();

// Before deprecation of EventA
// Before the schema
let schemaless_store: PgStore<SchemaAggregate<SchemaEventNoSchema>> =
PgStoreBuilder::new(pool.clone()).try_build().await.unwrap();

let events = vec![
SchemaEventNoSchema::EmptyEvent,
SchemaEventNoSchema::EventA {
contents: "this is soon to be deprecated".to_owned(),
},
SchemaEventNoSchema::EventB { count: 42 },
];

let mut state = AggregateState::with_id(aggregate_id);
let events = schemaless_store.persist(&mut state, events).await.unwrap();

assert_eq!(events.len(), 3);

// With schema before deprecation of EventA
let old_store: PgStore<SchemaAggregate<SchemaEventOld>, Schema> = PgStoreBuilder::new(pool.clone())
.with_schema::<Schema>()
.try_build()
Expand All @@ -178,9 +227,11 @@ async fn main() {

let manager = AggregateManager::new(old_store.clone());
let mut state = manager.load(aggregate_id).await.unwrap().unwrap();
let events = old_store.persist(&mut state, events).await.unwrap();
let _ = old_store.persist(&mut state, events).await.unwrap();

assert_eq!(events.len(), 3);
let events = manager.load(aggregate_id).await.unwrap().unwrap().into_inner().events;

assert_eq!(events.len(), 6);

// After deprecation of EventA and addition of EventC
let new_store: PgStore<SchemaAggregate<SchemaEvent>, Schema> = PgStoreBuilder::new(pool.clone())
Expand All @@ -191,7 +242,7 @@ async fn main() {

let events = new_store.by_aggregate_id(aggregate_id).await.unwrap();

assert_eq!(events.len(), 2);
assert_eq!(events.len(), 4);

let events = vec![
SchemaEvent::EmptyEvent,
Expand All @@ -208,10 +259,10 @@ async fn main() {

let events = manager.load(aggregate_id).await.unwrap().unwrap().into_inner().events;

// The deprecated event is skipped
assert_eq!(events.len(), 5);
// The deprecated events are skipped
assert_eq!(events.len(), 7);

// After upcasting of EventA to EventC
// After upcasting of EventA
let upcasting_store: PgStore<SchemaAggregate<SchemaEventUpcasted>, Schema> = PgStoreBuilder::new(pool.clone())
.with_schema::<Schema>()
.try_build()
Expand All @@ -221,8 +272,9 @@ async fn main() {
let events = manager.load(aggregate_id).await.unwrap().unwrap().into_inner().events;

// All the events are visible
assert_eq!(events.len(), 6);
assert_eq!(events.len(), 9);

// The event has been upcasted
assert!(matches!(events[1], SchemaEventUpcasted::EventC { count: 1, .. }));
// The events have been upcasted
assert!(matches!(events[1], SchemaEventUpcasted::EventA { count: 1, .. }));
assert!(matches!(events[4], SchemaEventUpcasted::EventA { count: 1, .. }));
}

0 comments on commit e249895

Please sign in to comment.