Skip to content

Commit

Permalink
Use explicit methods on the conversion trait in place of making use o…
Browse files Browse the repository at this point in the history
…f from and to
  • Loading branch information
Johnabell committed Mar 25, 2024
1 parent e249895 commit 0bef40c
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 58 deletions.
54 changes: 24 additions & 30 deletions examples/schema/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,61 +125,55 @@ enum Schema {
#[cfg(feature = "upcasting")]
impl esrs::event::Upcaster for Schema {}

impl From<SchemaEvent> for Schema {
fn from(value: SchemaEvent) -> Self {
impl esrs::store::postgres::Schema<SchemaEvent> for Schema {
fn write(value: SchemaEvent) -> Self {
match value {
SchemaEvent::EmptyEvent => Schema::EmptyEvent,
SchemaEvent::EventB { count } => Schema::EventB { count },
SchemaEvent::EventC { contents, count } => Schema::EventC { contents, count },
}
}
}

impl From<SchemaEventUpcasted> for Schema {
fn from(value: SchemaEventUpcasted) -> Self {
match value {
SchemaEventUpcasted::EmptyEvent => Schema::EmptyEvent,
SchemaEventUpcasted::EventB { count } => Schema::EventB { count },
SchemaEventUpcasted::EventA { contents, count } => Schema::EventC { contents, count },
fn read(self) -> Option<SchemaEvent> {
match self {
Self::EmptyEvent => Some(SchemaEvent::EmptyEvent),
Self::EventA { .. } => None,
Self::EventB { count } => Some(SchemaEvent::EventB { count }),
Self::EventC { contents, count } => Some(SchemaEvent::EventC { contents, count }),
}
}
}

impl From<SchemaEventOld> for Schema {
fn from(value: SchemaEventOld) -> Self {
impl esrs::store::postgres::Schema<SchemaEventOld> for Schema {
fn write(value: SchemaEventOld) -> Self {
match value {
SchemaEventOld::EmptyEvent => Schema::EmptyEvent,
SchemaEventOld::EventA { contents } => Schema::EventA { contents },
SchemaEventOld::EventB { count } => Schema::EventB { count },
}
}
}

impl From<Schema> for Option<SchemaEventOld> {
fn from(val: Schema) -> Self {
match val {
Schema::EmptyEvent => Some(SchemaEventOld::EmptyEvent),
Schema::EventA { contents } => Some(SchemaEventOld::EventA { contents }),
Schema::EventB { count } => Some(SchemaEventOld::EventB { count }),
Schema::EventC { .. } => panic!("not supported"),
fn read(self) -> Option<SchemaEventOld> {
match self {
Self::EmptyEvent => Some(SchemaEventOld::EmptyEvent),
Self::EventA { contents } => Some(SchemaEventOld::EventA { contents }),
Self::EventB { count } => Some(SchemaEventOld::EventB { count }),
Self::EventC { .. } => panic!("not supported"),
}
}
}

impl From<Schema> for Option<SchemaEvent> {
fn from(val: Schema) -> Self {
match val {
Schema::EmptyEvent => Some(SchemaEvent::EmptyEvent),
Schema::EventA { .. } => None,
Schema::EventB { count } => Some(SchemaEvent::EventB { count }),
Schema::EventC { contents, count } => Some(SchemaEvent::EventC { contents, count }),
impl esrs::store::postgres::Schema<SchemaEventUpcasted> for Schema {
fn write(value: SchemaEventUpcasted) -> Self {
match value {
SchemaEventUpcasted::EmptyEvent => Schema::EmptyEvent,
SchemaEventUpcasted::EventB { count } => Schema::EventB { count },
SchemaEventUpcasted::EventA { contents, count } => Schema::EventC { contents, count },
}
}
}

impl From<Schema> for Option<SchemaEventUpcasted> {
fn from(val: Schema) -> Self {
match val {
fn read(self) -> Option<SchemaEventUpcasted> {
match self {
Schema::EmptyEvent => Some(SchemaEventUpcasted::EmptyEvent),
Schema::EventA { contents } => Some(SchemaEventUpcasted::EventA { contents, count: 1 }),
Schema::EventB { count } => Some(SchemaEventUpcasted::EventB { count }),
Expand Down
10 changes: 5 additions & 5 deletions src/sql/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use serde_json::Value;
use uuid::Uuid;

use crate::event::Event;
use crate::store::postgres::Converter;
use crate::store::postgres::Schema;
use crate::store::StoreEvent;
use crate::types::SequenceNumber;

Expand All @@ -21,14 +21,14 @@ pub struct DbEvent {
}

impl DbEvent {
pub fn try_into_store_event<E, Schema>(self) -> Result<Option<StoreEvent<E>>, serde_json::Error>
pub fn try_into_store_event<E, S>(self) -> Result<Option<StoreEvent<E>>, serde_json::Error>
where
Schema: Converter<E>,
S: Schema<E>,
{
#[cfg(feature = "upcasting")]
let payload = Schema::upcast(self.payload, self.version)?.into();
let payload = S::upcast(self.payload, self.version)?.read();
#[cfg(not(feature = "upcasting"))]
let payload = serde_json::from_value::<Schema>(self.payload)?.into();
let payload = serde_json::from_value::<S>(self.payload)?.read();

Ok(match payload {
None => None,
Expand Down
8 changes: 4 additions & 4 deletions src/store/postgres/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::sql::statements::{Statements, StatementsHandler};
use crate::store::postgres::{InnerPgStore, PgStoreError};
use crate::Aggregate;

use super::{Converter, PgStore};
use super::{PgStore, Schema};

/// Struct used to build a brand new [`PgStore`].
pub struct PgStoreBuilder<A, Schema = <A as Aggregate>::Event>
Expand Down Expand Up @@ -46,7 +46,7 @@ where
}
}

impl<A, Schema> PgStoreBuilder<A, Schema>
impl<A, S> PgStoreBuilder<A, S>
where
A: Aggregate,
{
Expand Down Expand Up @@ -103,7 +103,7 @@ where
/// Set the schema of the underlying PgStore.
pub fn with_schema<NewSchema>(self) -> PgStoreBuilder<A, NewSchema>
where
NewSchema: Converter<A::Event> + Event + Send + Sync,
NewSchema: Schema<A::Event> + Event + Send + Sync,
{
PgStoreBuilder {
pool: self.pool,
Expand All @@ -125,7 +125,7 @@ where
/// # Errors
///
/// Will return an `Err` if there's an error running [`Migrations`].
pub async fn try_build(self) -> Result<PgStore<A, Schema>, sqlx::Error> {
pub async fn try_build(self) -> Result<PgStore<A, S>, sqlx::Error> {
if self.run_migrations {
Migrations::run::<A>(&self.pool).await?;
}
Expand Down
131 changes: 112 additions & 19 deletions src/store/postgres/event_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,100 @@ use crate::store::{EventStore, EventStoreLockGuard, StoreEvent, UnlockOnDrop};
use crate::types::SequenceNumber;
use crate::{Aggregate, AggregateState};

pub trait Converter<E>: From<E> + Into<Option<E>> + Event {}
/// To support decoupling between the Aggregate::Event type and the schema of the DB table
/// in `PgStore` you can create a schema type that implements `Event` and `Schema`
/// where `E = Aggregate::Event`.
///
/// Note: Although `Schema::read` returns an `Option` for any given event and implementation.
///
/// The following must hold
///
/// ```rust
/// # use serde::{Serialize, Deserialize};
/// # use esrs::store::postgres::Schema as SchemaTrait;
/// #
/// # #[derive(Clone, Eq, PartialEq, Debug)]
/// # struct Event {
/// # a: u32,
/// # }
/// #
/// # #[derive(Serialize, Deserialize)]
/// # struct Schema {
/// # a: u32,
/// # }
/// #
/// # #[cfg(feature = "upcasting")]
/// # impl esrs::event::Upcaster for Schema {}
/// #
/// # impl SchemaTrait<Event> for Schema {
/// # fn write(Event { a }: Event) -> Self {
/// # Self { a }
/// # }
/// #
/// # fn read(self) -> Option<Event> {
/// # Some(Event { a: self.a })
/// # }
/// # }
/// #
/// # let event = Event { a: 42 };
/// assert_eq!(Some(event.clone()), Schema::write(event).read());
/// ```
pub trait Schema<E>: Event {
/// Converts the event into the schema type.
fn write(event: E) -> Self;

/// Converts the schema into the event type.
///
/// This returns an option to enable skipping deprecated event which are persisted in the DB.
///
/// Note: Although `Schema::read` returns an `Option` for any given event and implementation.
///
/// The following must hold
///
/// ```rust
/// # use serde::{Serialize, Deserialize};
/// # use esrs::store::postgres::Schema as SchemaTrait;
/// #
/// # #[derive(Clone, Eq, PartialEq, Debug)]
/// # struct Event {
/// # a: u32,
/// # }
/// #
/// # #[derive(Serialize, Deserialize)]
/// # struct Schema {
/// # a: u32,
/// # }
/// #
/// # #[cfg(feature = "upcasting")]
/// # impl esrs::event::Upcaster for Schema {}
/// #
/// # impl SchemaTrait<Event> for Schema {
/// # fn write(Event { a }: Event) -> Self {
/// # Self { a }
/// # }
/// #
/// # fn read(self) -> Option<Event> {
/// # Some(Event { a: self.a })
/// # }
/// # }
/// #
/// # let event = Event { a: 42 };
/// assert_eq!(Some(event.clone()), Schema::write(event).read());
/// ```
fn read(self) -> Option<E>;
}

impl<T, E> Converter<E> for T where T: From<E> + Into<Option<E>> + Event {}
impl<E> Schema<E> for E
where
E: Event,
{
fn write(event: E) -> Self {
event
}
fn read(self) -> Option<E> {
Some(self)
}
}

/// Default Postgres implementation for the [`EventStore`]. Use this struct in order to have a
/// pre-made implementation of an [`EventStore`] persisting on Postgres.
Expand All @@ -33,14 +124,15 @@ impl<T, E> Converter<E> for T where T: From<E> + Into<Option<E>> + Event {}
/// reference.
///
/// To decouple persistence from the event types, it is possible to optionally, specify the
/// Database event schema for this store as a serializable type.
/// Database event schema for this store as a type that implements `Event` and
/// `Schema<Aggregate::Event>`.
///
/// When events are persisted, they will first be converted via the `From` trait into the `Schema`
/// type, then serialized.
/// When events are persisted, they will first be converted to the `Schema` type using
/// `Schema::write` then serialized using the `Serialize` implementation on `Schema`.
///
/// When events are read from the store, they will first be deserialized into the `Schema` and then
/// they can be converted into and option of the domain aggregate event. In this way it is possible
/// to deprecate events in core part of your application by returning `None` when converting.
/// When events are read from the store, they will first be deserialized into the `Schema` type and
/// then converted into an `Option<Aggregate::Event>` using `Schema::read`. In this way it is possible
/// to remove deprecate events in core part of your application by returning `None` from `Schema::read`.
pub struct PgStore<A, Schema = <A as Aggregate>::Event>
where
A: Aggregate,
Expand All @@ -61,11 +153,11 @@ where
pub(super) event_buses: Vec<Box<dyn EventBus<A> + Send>>,
}

impl<A, Schema> PgStore<A, Schema>
impl<A, S> PgStore<A, S>
where
A: Aggregate,
A::Event: Send + Sync,
Schema: Converter<A::Event> + Event + Send + Sync,
S: Schema<A::Event> + Event + Send + Sync,
{
/// Returns the name of the event store table
pub fn table_name(&self) -> &str {
Expand Down Expand Up @@ -99,10 +191,10 @@ where
let id: Uuid = Uuid::new_v4();

#[cfg(feature = "upcasting")]
let version: Option<i32> = Schema::current_version();
let version: Option<i32> = S::current_version();
#[cfg(not(feature = "upcasting"))]
let version: Option<i32> = None;
let schema = Schema::from(event);
let schema = S::write(event);

let _ = sqlx::query(self.inner.statements.insert())
.bind(id)
Expand All @@ -117,8 +209,9 @@ where
Ok(StoreEvent {
id,
aggregate_id,
payload: schema.into().expect(
"This should always be true for converters assert event == Converter::from(event).into().unwrap()",
payload: schema.read().expect(
"For any type that implements Schema the following contract should be upheld:\
assert_eq!(Some(event.clone()), Schema::write(event).read())",
),
occurred_on,
sequence_number,
Expand All @@ -135,7 +228,7 @@ where
Box::pin({
sqlx::query_as::<_, DbEvent>(self.inner.statements.select_all())
.fetch(executor)
.map(|res| Ok(res?.try_into_store_event::<_, Schema>()?))
.map(|res| Ok(res?.try_into_store_event::<_, S>()?))
.map(Result::transpose)
.filter_map(std::future::ready)
})
Expand All @@ -158,12 +251,12 @@ pub struct PgStoreLockGuard {
impl UnlockOnDrop for PgStoreLockGuard {}

#[async_trait]
impl<A, Schema> EventStore for PgStore<A, Schema>
impl<A, S> EventStore for PgStore<A, S>
where
A: Aggregate,
A::State: Send,
A::Event: Send + Sync,
Schema: Converter<A::Event> + Event + Send + Sync,
S: Schema<A::Event> + Event + Send + Sync,
{
type Aggregate = A;
type Error = PgStoreError;
Expand All @@ -186,7 +279,7 @@ where
.fetch_all(&self.inner.pool)
.await?
.into_iter()
.map(|event| Ok(event.try_into_store_event::<_, Schema>()?))
.map(|event| Ok(event.try_into_store_event::<_, S>()?))
.filter_map(Result::transpose)
.collect::<Result<Vec<StoreEvent<A::Event>>, Self::Error>>()?)
}
Expand Down Expand Up @@ -323,7 +416,7 @@ impl<T: Aggregate> std::fmt::Debug for PgStore<T> {
}
}

impl<A, Schema> Clone for PgStore<A, Schema>
impl<A, S> Clone for PgStore<A, S>
where
A: Aggregate,
{
Expand Down

0 comments on commit 0bef40c

Please sign in to comment.