Skip to content

Commit

Permalink
scribe for postgres only
Browse files Browse the repository at this point in the history
  • Loading branch information
angelo-rendina-prima committed Mar 22, 2024
1 parent 04c87d8 commit 661eee6
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 36 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ version = "0.14.0"
all-features = true

[features]
default = []
default = ["postgres"]
sql = ["sqlx"]
postgres = ["sql", "sqlx/postgres", "typed-builder", "tokio"]
rebuilder = []
Expand Down
10 changes: 6 additions & 4 deletions examples/store_crud/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,13 @@
//! This operation enables you to delete all events associated with a particular aggregate ID from
//! the event store.
use std::convert::TryInto;

use chrono::Utc;
use sqlx::types::Json;
use sqlx::{Pool, Postgres};
use uuid::Uuid;

use esrs::sql::event::DbEvent;
use esrs::store::postgres::{PgStore, PgStoreBuilder};
use esrs::store::postgres::{JsonScribe, PgStore, PgStoreBuilder};
use esrs::store::{EventStore, StoreEvent};
use esrs::AggregateState;

Expand Down Expand Up @@ -144,5 +142,9 @@ async fn get_event_by_event_id(id: Uuid, table_name: &str, pool: &Pool<Postgres>
.fetch_optional(pool)
.await
.unwrap()
.map(|v| v.try_into().unwrap())
.map(|v| {
v.to_store_event::<_, JsonScribe>()
.expect("Should read Json")
.expect("Should be populated")
})
}
1 change: 1 addition & 0 deletions src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ where
pub trait TransactionalEventHandler<A, Er, Ex>: Sync
where
A: Aggregate,
Ex: Send,
{
/// Handle an event in a transactional fashion and perform a read side crate, update or delete.
/// If an error is returned the transaction will be aborted and the handling of a command by an
Expand Down
36 changes: 19 additions & 17 deletions src/sql/event.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::convert::TryInto;

use chrono::{DateTime, Utc};
use serde_json::Value;
use uuid::Uuid;

use crate::event::Event;
use crate::store::postgres::Scribe;
use crate::store::StoreEvent;
use crate::types::SequenceNumber;

Expand All @@ -19,20 +17,24 @@ pub struct DbEvent {
pub version: Option<i32>,
}

impl<E: Event> TryInto<StoreEvent<E>> for DbEvent {
type Error = serde_json::Error;

fn try_into(self) -> Result<StoreEvent<E>, Self::Error> {
Ok(StoreEvent {
id: self.id,
aggregate_id: self.aggregate_id,
#[cfg(feature = "upcasting")]
payload: E::upcast(self.payload, self.version)?,
#[cfg(not(feature = "upcasting"))]
payload: serde_json::from_value::<E>(self.payload)?,
occurred_on: self.occurred_on,
sequence_number: self.sequence_number,
version: self.version,
impl DbEvent {
pub fn to_store_event<E, S>(self) -> Result<Option<StoreEvent<E>>, serde_json::Error>
where
S: Scribe<E>,
{
Ok(match S::deserialize(self.payload)? {
None => None,
Some(event) => Some(StoreEvent {
id: self.id,
aggregate_id: self.aggregate_id,
#[cfg(feature = "upcasting")]
payload: E::upcast(self.payload, self.version)?,
#[cfg(not(feature = "upcasting"))]
payload: event,
occurred_on: self.occurred_on,
sequence_number: self.sequence_number,
version: self.version,
}),
})
}
}
5 changes: 3 additions & 2 deletions src/store/postgres/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::bus::EventBus;
use crate::handler::{EventHandler, TransactionalEventHandler};
use crate::sql::migrations::{Migrations, MigrationsHandler};
use crate::sql::statements::{Statements, StatementsHandler};
use crate::store::postgres::{InnerPgStore, PgStoreError};
use crate::store::postgres::{InnerPgStore, PgStoreError, Scribe};
use crate::Aggregate;

use super::PgStore;
Expand Down Expand Up @@ -100,7 +100,7 @@ where
/// # Errors
///
/// Will return an `Err` if there's an error running [`Migrations`].
pub async fn try_build(self) -> Result<PgStore<A>, sqlx::Error> {
pub async fn try_build<S: Scribe<A::Event>>(self) -> Result<PgStore<A, S>, sqlx::Error> {
if self.run_migrations {
Migrations::run::<A>(&self.pool).await?;
}
Expand All @@ -113,6 +113,7 @@ where
transactional_event_handlers: self.transactional_event_handlers,
event_buses: self.event_buses,
}),
_scribe: std::marker::PhantomData,
})
}
}
62 changes: 50 additions & 12 deletions src/store/postgres/event_store.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::convert::TryInto;
use std::marker::PhantomData;
use std::sync::Arc;

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::stream::BoxStream;
use futures::StreamExt;
use serde::de::DeserializeOwned;
use serde::Serialize;
use sqlx::pool::PoolConnection;
use sqlx::postgres::{PgAdvisoryLock, PgAdvisoryLockGuard, PgAdvisoryLockKey};
use sqlx::types::Json;
Expand All @@ -13,7 +15,6 @@ use tokio::sync::RwLock;
use uuid::Uuid;

use crate::bus::EventBus;
use crate::event::Event;
use crate::handler::{EventHandler, TransactionalEventHandler};
use crate::sql::event::DbEvent;
use crate::sql::statements::{Statements, StatementsHandler};
Expand All @@ -22,16 +23,39 @@ use crate::store::{EventStore, EventStoreLockGuard, StoreEvent, UnlockOnDrop};
use crate::types::SequenceNumber;
use crate::{Aggregate, AggregateState};

pub trait Scribe<E> {
fn serialize(event: &E) -> serde_json::Result<serde_json::Value>;

fn deserialize(value: serde_json::Value) -> serde_json::Result<Option<E>>;
}

pub struct JsonScribe;

impl<E> Scribe<E> for JsonScribe
where
E: Serialize + DeserializeOwned,
{
fn serialize(event: &E) -> serde_json::Result<serde_json::Value> {
serde_json::to_value(event)
}

fn deserialize(value: serde_json::Value) -> serde_json::Result<Option<E>> {
serde_json::from_value(value)
}
}

/// Default Postgres implementation for the [`EventStore`]. Use this struct in order to have a
/// pre-made implementation of an [`EventStore`] persisting on Postgres.
///
/// The store is protected by an [`Arc`] that allows it to be cloneable still having the same memory
/// reference.
pub struct PgStore<A>
pub struct PgStore<A, S = JsonScribe>
where
A: Aggregate,
S: Scribe<A::Event>,
{
pub(super) inner: Arc<InnerPgStore<A>>,
pub(super) _scribe: PhantomData<S>,
}

pub(super) struct InnerPgStore<A>
Expand All @@ -46,10 +70,11 @@ where
pub(super) event_buses: Vec<Box<dyn EventBus<A> + Send>>,
}

impl<A> PgStore<A>
impl<A, S> PgStore<A, S>
where
A: Aggregate,
A::Event: Event + Sync,
// A::Event: Sync,
S: Scribe<A::Event>,
{
/// Returns the name of the event store table
pub fn table_name(&self) -> &str {
Expand Down Expand Up @@ -90,10 +115,12 @@ where
#[cfg(not(feature = "upcasting"))]
let version: Option<i32> = None;

let payload = S::serialize(&event)?;

let _ = sqlx::query(self.inner.statements.insert())
.bind(id)
.bind(aggregate_id)
.bind(Json(&event))
.bind(Json(payload))
.bind(occurred_on)
.bind(sequence_number)
.bind(version)
Expand All @@ -119,7 +146,11 @@ where
Box::pin({
sqlx::query_as::<_, DbEvent>(self.inner.statements.select_all())
.fetch(executor)
.map(|res| Ok(res?.try_into()?))
.filter_map(|res| async {
res.map_err(PgStoreError::from)
.and_then(|db_event| db_event.to_store_event::<_, S>().map_err(Into::into))
.transpose()
})
})
}
}
Expand All @@ -140,11 +171,12 @@ pub struct PgStoreLockGuard {
impl UnlockOnDrop for PgStoreLockGuard {}

#[async_trait]
impl<A> EventStore for PgStore<A>
impl<A, S> EventStore for PgStore<A, S>
where
A: Aggregate,
A::State: Send,
A::Event: Event + Send + Sync,
A::Event: Send + Sync,
S: Scribe<A::Event> + Sync,
{
type Aggregate = A;
type Error = PgStoreError;
Expand All @@ -167,7 +199,7 @@ where
.fetch_all(&self.inner.pool)
.await?
.into_iter()
.map(|event| Ok(event.try_into()?))
.filter_map(|event| event.to_store_event::<_, S>().map_err(Into::into).transpose())
.collect::<Result<Vec<StoreEvent<A::Event>>, Self::Error>>()?)
}

Expand Down Expand Up @@ -293,21 +325,27 @@ where

/// Debug implementation for [`PgStore`]. It just shows the statements, that are the only thing
/// that might be useful to debug.
impl<T: Aggregate> std::fmt::Debug for PgStore<T> {
impl<A, S> std::fmt::Debug for PgStore<A, S>
where
A: Aggregate,
S: Scribe<A::Event>,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PgStore")
.field("statements", &self.inner.statements)
.finish()
}
}

impl<A> Clone for PgStore<A>
impl<A, S> Clone for PgStore<A, S>
where
A: Aggregate,
S: Scribe<A::Event>,
{
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
_scribe: PhantomData,
}
}
}

0 comments on commit 661eee6

Please sign in to comment.