Skip to content

Commit

Permalink
double result on handle command
Browse files Browse the repository at this point in the history
  • Loading branch information
angelo-rendina-prima committed May 7, 2024
1 parent 8766a1c commit f4d43de
Show file tree
Hide file tree
Showing 13 changed files with 72 additions and 131 deletions.
14 changes: 3 additions & 11 deletions examples/aggregate_deletion/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,18 @@ use sqlx::{Pool, Postgres};
use uuid::Uuid;

use esrs::manager::AggregateManager;
use esrs::store::postgres::{PgStore, PgStoreBuilder, PgStoreError};
use esrs::store::postgres::{PgStore, PgStoreBuilder};
use esrs::store::{EventStore, StoreEvent};
use esrs::AggregateState;

use crate::common::basic::event_handler::BasicEventHandler;
use crate::common::basic::view::BasicView;
use crate::common::basic::{BasicAggregate, BasicCommand, BasicError, BasicEvent};
use crate::common::basic::{BasicAggregate, BasicCommand, BasicEvent};
use crate::common::util::new_pool;

#[path = "../common/lib.rs"]
mod common;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Aggregate(#[from] BasicError),
#[error(transparent)]
Store(#[from] PgStoreError),
}

#[tokio::main]
async fn main() {
let pool: Pool<Postgres> = new_pool().await;
Expand All @@ -50,7 +42,7 @@ async fn main() {

let manager = AggregateManager::new(store.clone());

manager.handle_command::<Error>(state, command).await.unwrap();
manager.handle_command(state, command).await.unwrap().unwrap();

let row = view.by_id(id, &pool).await.unwrap().unwrap();

Expand Down
14 changes: 3 additions & 11 deletions examples/event_bus/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use uuid::Uuid;
use esrs::bus::kafka::{KafkaEventBus, KafkaEventBusConfig};
use esrs::bus::rabbit::{RabbitEventBus, RabbitEventBusConfig};
use esrs::manager::AggregateManager;
use esrs::store::postgres::{PgStore, PgStoreBuilder, PgStoreError};
use esrs::store::postgres::{PgStore, PgStoreBuilder};
use esrs::AggregateState;

use crate::common::basic::event_handler::BasicEventHandler;
use crate::common::basic::view::BasicView;
use crate::common::basic::{BasicAggregate, BasicCommand, BasicError};
use crate::common::basic::{BasicAggregate, BasicCommand};
use crate::common::util::{new_pool, random_letters};
use crate::kafka::KafkaEventBusConsumer;
use crate::rabbit::RabbitEventBusConsumer;
Expand All @@ -32,14 +32,6 @@ mod common;
mod kafka;
mod rabbit;

#[derive(Debug, thiserror::Error)]
pub enum EventBusError {
#[error(transparent)]
Aggregate(#[from] BasicError),
#[error(transparent)]
Store(#[from] PgStoreError),
}

#[tokio::main]
async fn main() {
let pool: Pool<Postgres> = new_pool().await;
Expand Down Expand Up @@ -118,7 +110,7 @@ async fn main() {
let command = BasicCommand {
content: content.to_string(),
};
let _: Result<(), EventBusError> = manager.handle_command(aggregate_state, command).await;
manager.handle_command(aggregate_state, command).await.unwrap().unwrap();

let (rabbit_timeout_result, kafka_timeout_result) = tokio::join!(rabbit_join_handle, kafka_join_handle);

Expand Down
15 changes: 4 additions & 11 deletions examples/eventual_view/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,17 @@ use sqlx::{Pool, Postgres};
use uuid::Uuid;

use esrs::manager::AggregateManager;
use esrs::store::postgres::{PgStore, PgStoreBuilder, PgStoreError};
use esrs::store::postgres::{PgStore, PgStoreBuilder};
use esrs::AggregateState;

use crate::common::basic::event_handler::BasicEventHandler;
use crate::common::basic::view::BasicView;
use crate::common::basic::{BasicAggregate, BasicCommand, BasicError};
use crate::common::basic::{BasicAggregate, BasicCommand};
use crate::common::util::new_pool;

#[path = "../common/lib.rs"]
mod common;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Aggregate(#[from] BasicError),
#[error(transparent)]
Store(#[from] PgStoreError),
}

#[tokio::main]
async fn main() {
let pool: Pool<Postgres> = new_pool().await;
Expand All @@ -49,8 +41,9 @@ async fn main() {
};

AggregateManager::new(store)
.handle_command::<Error>(state, command)
.handle_command(state, command)
.await
.unwrap()
.unwrap();

let row = view.by_id(id, &pool).await.unwrap().unwrap();
Expand Down
6 changes: 4 additions & 2 deletions examples/locking_strategies/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ pub async fn increment_atomically(manager: Agg, aggregate_id: Uuid) -> Result<()
content: "whatever".to_string(),
},
)
.await
.await??;
Ok(())
}

/// Increment the value behind this `aggregate_id` with an optimistic locking strategy.
Expand All @@ -81,7 +82,8 @@ pub async fn increment_optimistically(manager: Agg, aggregate_id: Uuid) -> Resul
content: "whatever".to_string(),
},
)
.await
.await??;
Ok(())
}

/// Load the aggregate state for read-only purposes, preventing others (that use locking) from modifying it.
Expand Down
15 changes: 4 additions & 11 deletions examples/multi_aggregate_rebuild/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,12 @@ use crate::common::b::{AggregateB, CommandB, EventB};
use crate::common::shared::event_handler::SharedEventHandler;
use crate::common::shared::view::SharedView;
use crate::common::util::new_pool;
use crate::common::CommonError;
use crate::transactional_event_handler::SharedTransactionalEventHandler;

#[path = "../common/lib.rs"]
mod common;
mod transactional_event_handler;

#[derive(Debug, thiserror::Error)]
pub enum RebuilderError {
#[error(transparent)]
Aggregate(#[from] CommonError),
#[error(transparent)]
Store(#[from] PgStoreError),
}

#[tokio::main]
async fn main() {
let pool: Pool<Postgres> = new_pool().await;
Expand Down Expand Up @@ -184,8 +175,9 @@ async fn setup(shared_id: Uuid, pool: Pool<Postgres>, view: SharedView, transact

let manager: AggregateManager<PgStore<AggregateA>> = AggregateManager::new(pg_store_a);
manager
.handle_command::<RebuilderError>(AggregateState::default(), CommandA { v: 10, shared_id })
.handle_command(AggregateState::default(), CommandA { v: 10, shared_id })
.await
.unwrap()
.unwrap();

let pg_store_b: PgStore<AggregateB> = PgStoreBuilder::new(pool.clone())
Expand All @@ -202,8 +194,9 @@ async fn setup(shared_id: Uuid, pool: Pool<Postgres>, view: SharedView, transact

let manager: AggregateManager<PgStore<AggregateB>> = AggregateManager::new(pg_store_b);
manager
.handle_command::<RebuilderError>(AggregateState::default(), CommandB { v: 7, shared_id })
.handle_command(AggregateState::default(), CommandB { v: 7, shared_id })
.await
.unwrap()
.unwrap();

assert_eq!(view.by_id(shared_id, &pool).await.unwrap().unwrap().sum, 17);
Expand Down
12 changes: 1 addition & 11 deletions examples/readme/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,11 @@ async fn main() {
.expect("Failed to create PgStore");

let manager: AggregateManager<_> = AggregateManager::new(store);
let _: Result<(), Error> = manager
let _: Result<Result<(), BookError>, PgStoreError> = manager
.handle_command(Default::default(), BookCommand::Buy { num_of_copies: 1 })
.await;
}

//////////////////////////////
// Global error
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Aggregate(#[from] BookError),
#[error(transparent)]
Store(#[from] PgStoreError),
}

//////////////////////////////
// Aggregate

Expand Down
15 changes: 4 additions & 11 deletions examples/rebuilder/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ use uuid::Uuid;

use esrs::manager::AggregateManager;
use esrs::rebuilder::{PgRebuilder, Rebuilder};
use esrs::store::postgres::{PgStore, PgStoreBuilder, PgStoreError};
use esrs::store::postgres::{PgStore, PgStoreBuilder};
use esrs::AggregateState;

use crate::common::basic::view::BasicView;
use crate::common::basic::{BasicAggregate, BasicCommand, BasicError};
use crate::common::basic::{BasicAggregate, BasicCommand};
use crate::common::util::new_pool;
use crate::event_handler::{AnotherEventHandler, BasicEventHandlerV1, BasicEventHandlerV2};
use crate::transactional_event_handler::{BasicTransactionalEventHandlerV1, BasicTransactionalEventHandlerV2};
Expand All @@ -51,14 +51,6 @@ mod common;
mod event_handler;
mod transactional_event_handler;

#[derive(Debug, thiserror::Error)]
pub enum RebuilderError {
#[error(transparent)]
Aggregate(#[from] BasicError),
#[error(transparent)]
Store(#[from] PgStoreError),
}

#[tokio::main]
async fn main() {
let pool: Pool<Postgres> = new_pool().await;
Expand Down Expand Up @@ -96,13 +88,14 @@ async fn setup(aggregate_id: Uuid, view: BasicView, transactional_view: BasicVie

let manager: AggregateManager<PgStore<BasicAggregate>> = AggregateManager::new(pg_store);
manager
.handle_command::<RebuilderError>(
.handle_command(
AggregateState::with_id(aggregate_id),
BasicCommand {
content: "basic_command".to_string(),
},
)
.await
.unwrap()
.unwrap();

assert_eq!(
Expand Down
12 changes: 5 additions & 7 deletions examples/saga/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use esrs::store::postgres::PgStore;
use esrs::store::StoreEvent;

use crate::aggregate::{SagaAggregate, SagaCommand, SagaEvent};
use crate::SagaError;

#[derive(Clone)]
pub struct SagaEventHandler {
Expand All @@ -27,12 +26,11 @@ impl EventHandler<SagaAggregate> for SagaEventHandler {
Ok(Some(state)) => {
let mut guard = self.side_effect_mutex.lock().await;
*guard = true;
if let Err(err) = manager
.handle_command::<SagaError>(state, SagaCommand::RegisterMutation)
.await
{
eprintln!("Error while handling register mutation command: {:?}", err)
}
match manager.handle_command(state, SagaCommand::RegisterMutation).await {
Err(err) => eprintln!("Operational error while handling register mutation command: {:?}", err),
Ok(Err(err)) => eprintln!("Register mutation command denied: {:?}", err),
Ok(Ok(_)) => {}
};
}
Ok(None) => {
eprintln!("Something went wrong getting aggregate state")
Expand Down
7 changes: 5 additions & 2 deletions examples/saga/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ async fn main() {
let state: AggregateState<()> = AggregateState::default();
let id: Uuid = *state.id();

let result: Result<(), SagaError> = manager.handle_command(state, SagaCommand::RequestMutation).await;
assert!(result.is_ok());
manager
.handle_command(state, SagaCommand::RequestMutation)
.await
.unwrap()
.unwrap();

let events = store.by_aggregate_id(id).await.unwrap();

Expand Down
17 changes: 5 additions & 12 deletions examples/shared_view/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,18 @@ use sqlx::{Pool, Postgres};
use uuid::Uuid;

use esrs::manager::AggregateManager;
use esrs::store::postgres::{PgStore, PgStoreBuilder, PgStoreError};
use esrs::store::postgres::{PgStore, PgStoreBuilder};
use esrs::AggregateState;

use crate::common::a::{AggregateA, CommandA};
use crate::common::b::{AggregateB, CommandB};
use crate::common::shared::event_handler::SharedEventHandler;
use crate::common::shared::view::SharedView;
use crate::common::util::new_pool;
use crate::common::CommonError;

#[path = "../common/lib.rs"]
mod common;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Aggregate(#[from] CommonError),
#[error(transparent)]
Store(#[from] PgStoreError),
}

#[tokio::main]
async fn main() {
let pool: Pool<Postgres> = new_pool().await;
Expand Down Expand Up @@ -53,15 +44,17 @@ async fn main() {
let state_a: AggregateState<i32> = AggregateState::new();
let aggregate_id_a: Uuid = *state_a.id();
AggregateManager::new(store_a)
.handle_command::<Error>(state_a, CommandA { v: 5, shared_id })
.handle_command(state_a, CommandA { v: 5, shared_id })
.await
.unwrap()
.unwrap();

let state_b: AggregateState<i32> = AggregateState::new();
let aggregate_id_b: Uuid = *state_b.id();
AggregateManager::new(store_b)
.handle_command::<Error>(state_b, CommandB { v: 7, shared_id })
.handle_command(state_b, CommandB { v: 7, shared_id })
.await
.unwrap()
.unwrap();

let shared_view = shared_view.by_id(shared_id, &pool).await.unwrap().unwrap();
Expand Down
17 changes: 4 additions & 13 deletions examples/transactional_view/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,19 @@ use sqlx::{Pool, Postgres};
use uuid::Uuid;

use esrs::manager::AggregateManager;
use esrs::store::postgres::{PgStore, PgStoreBuilder, PgStoreError};
use esrs::store::postgres::{PgStore, PgStoreBuilder};
use esrs::store::EventStore;
use esrs::AggregateState;

use crate::common::basic::view::{BasicView, BasicViewRow};
use crate::common::basic::{BasicAggregate, BasicCommand, BasicError};
use crate::common::basic::{BasicAggregate, BasicCommand};
use crate::common::util::new_pool;
use crate::transactional_event_handler::BasicTransactionalEventHandler;

#[path = "../common/lib.rs"]
mod common;
mod transactional_event_handler;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Aggregate(#[from] BasicError),
#[error(transparent)]
Store(#[from] PgStoreError),
}

#[tokio::main]
async fn main() {
let pool: Pool<Postgres> = new_pool().await;
Expand All @@ -61,7 +53,7 @@ async fn main() {

let manager: AggregateManager<PgStore<BasicAggregate>> = AggregateManager::new(store.clone());

let result: Result<(), Error> = manager.handle_command(state, command).await;
let result = manager.handle_command(state, command).await;

assert!(result.is_err());
// No events have been stored. Transactional event handler rollbacked the value
Expand All @@ -75,8 +67,7 @@ async fn main() {
let state: AggregateState<()> = AggregateState::new();
let id: Uuid = *state.id();

let result: Result<(), Error> = manager.handle_command(state, command).await;
assert!(result.is_ok());
manager.handle_command(state, command).await.unwrap().unwrap();

let view: BasicViewRow = view.by_id(id, &pool).await.unwrap().unwrap();
assert_eq!(view.content, content);
Expand Down
Loading

0 comments on commit f4d43de

Please sign in to comment.