Skip to content

Commit

Permalink
Return the result summary from run like methods (#221)
Browse files Browse the repository at this point in the history
  • Loading branch information
knutwalker authored Feb 12, 2025
1 parent c2b733d commit 7561bbe
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 35 deletions.
8 changes: 5 additions & 3 deletions lib/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ use {
crate::connection::{ConnectionInfo, Routing},
crate::graph::ConnectionPoolManager::Routed,
crate::routing::{ClusterRoutingTableProvider, RoutedConnectionManager},
crate::summary::ResultSummary,
log::debug,
};

use crate::graph::ConnectionPoolManager::Direct;
use crate::pool::ManagedConnection;
use crate::RunResult;
use crate::{
config::{Config, ConfigBuilder, Database, LiveConfig},
errors::Result,
Expand Down Expand Up @@ -163,7 +165,7 @@ impl Graph {
/// Use [`Graph::run`] for cases where you just want a write operation
///
/// use [`Graph::execute`] when you are interested in the result stream
pub async fn run(&self, q: impl Into<Query>) -> Result<()> {
pub async fn run(&self, q: impl Into<Query>) -> Result<RunResult> {
self.impl_run_on(self.config.db.clone(), q.into(), Operation::Write)
.await
}
Expand All @@ -185,7 +187,7 @@ impl Graph {
db: impl Into<Database>,
q: impl Into<Query>,
operation: Operation,
) -> Result<()> {
) -> Result<ResultSummary> {
self.impl_run_on(Some(db.into()), q.into(), operation).await
}

Expand All @@ -201,7 +203,7 @@ impl Graph {
db: Option<Database>,
q: Query,
operation: Operation,
) -> Result<()> {
) -> Result<RunResult> {
backoff::future::retry_notify(
self.pool.backoff(),
|| {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ pub use crate::errors::{
Error, Neo4jClientErrorKind, Neo4jError, Neo4jErrorKind, Neo4jSecurityErrorKind, Result,
};
pub use crate::graph::{query, Graph};
pub use crate::query::{Query, QueryParameter};
pub use crate::query::{Query, QueryParameter, RunResult};
pub use crate::row::{Node, Path, Point2D, Point3D, Relation, Row, UnboundedRelation};
pub use crate::stream::{DetachedRowStream, RowStream};
pub use crate::txn::Txn;
Expand Down
35 changes: 14 additions & 21 deletions lib/src/query.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::cell::{Cell, RefCell};

#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
use crate::bolt::{Discard, Summary, WrapExtra as _};
use crate::{bolt::Summary, summary::ResultSummary};
use crate::{
errors::Result,
messages::{BoltRequest, BoltResponse},
Expand All @@ -11,6 +11,11 @@ use crate::{
Error, Success,
};

#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
pub type RunResult = ResultSummary;
#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
pub type RunResult = ();

/// Abstracts a cypher query that is sent to neo4j server.
#[derive(Clone)]
pub struct Query {
Expand Down Expand Up @@ -83,7 +88,7 @@ impl Query {
&self.params
}

pub(crate) async fn run(self, connection: &mut ManagedConnection) -> Result<()> {
pub(crate) async fn run(self, connection: &mut ManagedConnection) -> Result<RunResult> {
let request = BoltRequest::run(&self.query, self.params, self.extra);
Self::try_run(request, connection)
.await
Expand All @@ -93,7 +98,7 @@ impl Query {
pub(crate) async fn run_retryable(
&self,
connection: &mut ManagedConnection,
) -> QueryResult<()> {
) -> QueryResult<RunResult> {
let request = BoltRequest::run(&self.query, self.params.clone(), self.extra.clone());
Self::try_run(request, connection).await
}
Expand All @@ -120,24 +125,12 @@ impl Query {
.map_err(unwrap_backoff)
}

async fn try_run(request: BoltRequest, connection: &mut ManagedConnection) -> QueryResult<()> {
let _ = Self::try_request(request, connection).await?;

#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
{
match connection.send_recv(BoltRequest::discard_all()).await {
Ok(BoltResponse::Success(_)) => Ok(()),
otherwise => wrap_error(otherwise, "DISCARD"),
}
}

#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
{
match connection.send_recv_as(Discard::all()).await {
Ok(Summary::Success(_discard_success)) => Ok(()),
otherwise => wrap_error(otherwise, "DISCARD"),
}
}
async fn try_run(
request: BoltRequest,
connection: &mut ManagedConnection,
) -> QueryResult<RunResult> {
let result = Self::try_execute(request, 4096, connection).await?;
Ok(result.finish(connection).await?)
}

async fn try_execute(
Expand Down
11 changes: 3 additions & 8 deletions lib/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
row::Row,
txn::TransactionHandle,
types::BoltList,
DeError,
DeError, RunResult,
};

use futures::{stream::try_unfold, TryStream};
Expand All @@ -26,11 +26,6 @@ type BoxedSummary = Box<ResultSummary>;
#[cfg(not(feature = "unstable-result-summary"))]
type BoxedSummary = ();

#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
type FinishResult = ResultSummary;
#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
type FinishResult = ();

/// An abstraction over a stream of rows, this is returned as a result of [`crate::Txn::execute`].
///
/// A stream needs a running transaction to be consumed.
Expand Down Expand Up @@ -250,7 +245,7 @@ impl RowStream {

/// Stop consuming the stream and return a summary, if available.
/// Stopping the stream will also discard any messages on the server side.
pub async fn finish(mut self, mut handle: impl TransactionHandle) -> Result<FinishResult> {
pub async fn finish(mut self, mut handle: impl TransactionHandle) -> Result<RunResult> {
self.buffer.clear();

#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
Expand Down Expand Up @@ -440,7 +435,7 @@ impl DetachedRowStream {

/// Stop consuming the stream and return a summary, if available.
/// Stopping the stream will also discard any messages on the server side.
pub async fn finish(mut self) -> Result<FinishResult> {
pub async fn finish(mut self) -> Result<RunResult> {
self.stream.finish(&mut self.connection).await
}

Expand Down
23 changes: 23 additions & 0 deletions lib/src/summary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,29 @@ pub struct Counters {
pub system_updates: u64,
}

impl Counters {
pub fn merge(&mut self, other: &Self) {
self.nodes_created += other.nodes_created;
self.nodes_deleted += other.nodes_deleted;
self.relationships_created += other.relationships_created;
self.relationships_deleted += other.relationships_deleted;
self.properties_set += other.properties_set;
self.labels_added += other.labels_added;
self.labels_removed += other.labels_removed;
self.indexes_added += other.indexes_added;
self.indexes_removed += other.indexes_removed;
self.constraints_added += other.constraints_added;
self.constraints_removed += other.constraints_removed;
self.system_updates += other.system_updates;
}
}

impl std::ops::AddAssign<&Counters> for Counters {
fn add_assign(&mut self, other: &Counters) {
self.merge(other);
}
}

#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(from = "SummaryBuilder")]
pub enum Streaming {
Expand Down
20 changes: 18 additions & 2 deletions lib/src/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
pool::ManagedConnection,
query::Query,
stream::RowStream,
Operation,
Operation, RunResult,
};

/// A handle which is used to control a transaction, created as a result of [`crate::Graph::start_txn`]
Expand Down Expand Up @@ -40,6 +40,22 @@ impl Txn {
}
}

#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
/// Runs multiple queries one after the other in the same connection,
/// merging all counters from each result summary.
pub async fn run_queries<Q: Into<Query>>(
&mut self,
queries: impl IntoIterator<Item = Q>,
) -> Result<crate::summary::Counters> {
let mut counters = crate::summary::Counters::default();
for query in queries {
let summary = self.run(query.into()).await?;
counters += summary.stats();
}
Ok(counters)
}

#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
/// Runs multiple queries one after the other in the same connection
pub async fn run_queries<Q: Into<Query>>(
&mut self,
Expand All @@ -52,7 +68,7 @@ impl Txn {
}

/// Runs a single query and discards the stream.
pub async fn run(&mut self, q: impl Into<Query>) -> Result<()> {
pub async fn run(&mut self, q: impl Into<Query>) -> Result<RunResult> {
let mut query = q.into();
if let Some(db) = self.db.as_ref() {
query = query.extra("db", db.to_string());
Expand Down

0 comments on commit 7561bbe

Please sign in to comment.