Skip to content

Commit 1d633b7

Browse files
add sync/async enum
1 parent 245fa9a commit 1d633b7

29 files changed

+231
-252
lines changed

src/client/executor.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ use crate::{
5555
CommandErrorBody,
5656
CommitTransaction,
5757
Operation,
58+
OperationResponse,
5859
Retryability,
5960
},
6061
options::{ChangeStreamOptions, SelectionCriteria},
@@ -766,14 +767,15 @@ impl Client {
766767
}
767768
};
768769

769-
match op
770-
.handle_response(
771-
response,
772-
connection.stream_description()?,
773-
session.as_deref_mut(),
774-
)
775-
.await
776-
{
770+
let response_result = match op.handle_response(
771+
response,
772+
connection.stream_description()?,
773+
session.as_deref_mut(),
774+
) {
775+
OperationResponse::Sync(result) => result,
776+
OperationResponse::Async(future) => future.await,
777+
};
778+
match response_result {
777779
Ok(response) => Ok(response),
778780
Err(mut err) => {
779781
err.add_labels_and_update_pin(

src/coll.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1137,7 +1137,7 @@ where
11371137

11381138
impl<T> Collection<T>
11391139
where
1140-
T: Serialize + Send + Sync,
1140+
T: Serialize,
11411141
{
11421142
#[allow(clippy::needless_option_as_deref)]
11431143
async fn insert_many_common(

src/operation.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,28 @@ const MAX_ENCRYPTED_WRITE_SIZE: usize = 2_097_152;
8383
// The amount of overhead bytes to account for when building a document sequence.
8484
const COMMAND_OVERHEAD_SIZE: usize = 16_000;
8585

86+
pub(crate) enum OperationResponse<'a, O> {
87+
Sync(Result<O>),
88+
Async(BoxFuture<'a, Result<O>>),
89+
}
90+
91+
impl<'a, O> OperationResponse<'a, O> {
92+
fn get_sync_result(self) -> Result<O> {
93+
match self {
94+
Self::Sync(result) => result,
95+
Self::Async(_) => unreachable!(),
96+
}
97+
}
98+
}
99+
100+
macro_rules! sync {
101+
($result:block) => {
102+
let result = || $result;
103+
OperationResponse::Sync(result())
104+
};
105+
}
106+
use sync;
107+
86108
/// A trait modeling the behavior of a server side operation.
87109
///
88110
/// No methods in this trait should have default behaviors to ensure that wrapper operations
@@ -111,7 +133,7 @@ pub(crate) trait Operation {
111133
response: RawCommandResponse,
112134
description: &'a StreamDescription,
113135
session: Option<&'a mut ClientSession>,
114-
) -> BoxFuture<'a, Result<Self::O>>;
136+
) -> OperationResponse<'a, Self::O>;
115137

116138
/// Interpret an error encountered while sending the built command to the server, potentially
117139
/// recovering.
@@ -425,7 +447,7 @@ pub(crate) trait OperationWithDefaults {
425447
response: RawCommandResponse,
426448
description: &'a StreamDescription,
427449
session: Option<&'a mut ClientSession>,
428-
) -> BoxFuture<'a, Result<Self::O>>;
450+
) -> OperationResponse<'a, Self::O>;
429451

430452
/// Interpret an error encountered while sending the built command to the server, potentially
431453
/// recovering.
@@ -492,7 +514,7 @@ impl<T: OperationWithDefaults> Operation for T {
492514
response: RawCommandResponse,
493515
description: &'a StreamDescription,
494516
session: Option<&'a mut ClientSession>,
495-
) -> BoxFuture<'a, Result<Self::O>> {
517+
) -> OperationResponse<'a, Self::O> {
496518
self.handle_response(response, description, session)
497519
}
498520
fn handle_error(&self, error: Error) -> Result<Self::O> {

src/operation/abort_transaction.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use bson::Document;
2-
use futures_util::FutureExt;
32

43
use crate::{
54
bson::doc,
@@ -9,11 +8,10 @@ use crate::{
98
operation::Retryability,
109
options::WriteConcern,
1110
selection_criteria::SelectionCriteria,
12-
BoxFuture,
1311
ClientSession,
1412
};
1513

16-
use super::{OperationWithDefaults, WriteConcernOnlyBody};
14+
use super::{sync, OperationResponse, OperationWithDefaults, WriteConcernOnlyBody};
1715

1816
pub(crate) struct AbortTransaction {
1917
write_concern: Option<WriteConcern>,
@@ -52,19 +50,28 @@ impl OperationWithDefaults for AbortTransaction {
5250
))
5351
}
5452

55-
fn handle_response(
56-
&self,
53+
fn handle_response<'a>(
54+
&'a self,
5755
response: RawCommandResponse,
58-
_description: &StreamDescription,
59-
_session: Option<&mut ClientSession>,
60-
) -> BoxFuture<'static, Result<Self::O>> {
61-
async move {
56+
_description: &'a StreamDescription,
57+
_session: Option<&'a mut ClientSession>,
58+
) -> OperationResponse<'a, Self::O> {
59+
sync! {{
6260
let response: WriteConcernOnlyBody = response.body()?;
6361
response.validate()
64-
}
65-
.boxed()
62+
}}
6663
}
6764

65+
// handle_response!((
66+
// &self,
67+
// response: RawCommandResponse,
68+
// description: &StreamDescription,
69+
// session: Option<&mut ClientSession
70+
// ) -> Result<Self::O> {
71+
// let response: WriteConcernOnlyBody = response.body()?;
72+
// response.validate()
73+
// });
74+
6875
fn selection_criteria(&self) -> Option<&SelectionCriteria> {
6976
match &self.pinned {
7077
Some(TransactionPin::Mongos(s)) => Some(s),

src/operation/aggregate.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,21 @@ use crate::{
88
error::Result,
99
operation::{append_options, remove_empty_write_concern, Retryability},
1010
options::{AggregateOptions, SelectionCriteria, WriteConcern},
11-
BoxFuture,
1211
ClientSession,
1312
Namespace,
1413
};
1514

1615
use super::{
16+
sync,
1717
CursorBody,
18+
OperationResponse,
1819
OperationWithDefaults,
1920
WriteConcernOnlyBody,
2021
SERVER_4_2_0_WIRE_VERSION,
2122
SERVER_4_4_0_WIRE_VERSION,
2223
};
2324

2425
pub(crate) use change_stream::ChangeStreamAggregate;
25-
use futures_util::FutureExt;
2626

2727
#[derive(Debug)]
2828
pub(crate) struct Aggregate {
@@ -97,8 +97,8 @@ impl OperationWithDefaults for Aggregate {
9797
response: RawCommandResponse,
9898
description: &'a StreamDescription,
9999
_session: Option<&mut ClientSession>,
100-
) -> BoxFuture<'a, Result<Self::O>> {
101-
async move {
100+
) -> OperationResponse<'static, Self::O> {
101+
sync! {{
102102
let cursor_response: CursorBody = response.body()?;
103103

104104
if self.is_out_or_merge() {
@@ -122,8 +122,7 @@ impl OperationWithDefaults for Aggregate {
122122
self.options.as_ref().and_then(|opts| opts.max_await_time),
123123
comment,
124124
))
125-
}
126-
.boxed()
125+
}}
127126
}
128127

129128
fn selection_criteria(&self) -> Option<&SelectionCriteria> {

src/operation/aggregate/change_stream.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
use futures_util::FutureExt;
2-
31
use crate::{
42
bson::{doc, Document},
53
change_stream::{event::ResumeToken, ChangeStreamData, WatchArgs},
@@ -8,11 +6,10 @@ use crate::{
86
error::Result,
97
operation::{append_options, OperationWithDefaults, Retryability},
108
options::{ChangeStreamOptions, SelectionCriteria, WriteConcern},
11-
BoxFuture,
129
ClientSession,
1310
};
1411

15-
use super::Aggregate;
12+
use super::{sync, Aggregate, OperationResponse};
1613

1714
pub(crate) struct ChangeStreamAggregate {
1815
inner: Aggregate,
@@ -91,16 +88,15 @@ impl OperationWithDefaults for ChangeStreamAggregate {
9188
response: RawCommandResponse,
9289
description: &'a StreamDescription,
9390
session: Option<&'a mut ClientSession>,
94-
) -> BoxFuture<'a, Result<Self::O>> {
95-
async move {
91+
) -> OperationResponse<'static, Self::O> {
92+
sync! {{
9693
let op_time = response
9794
.raw_body()
9895
.get("operationTime")?
9996
.and_then(bson::RawBsonRef::as_timestamp);
10097
let spec = self
10198
.inner
102-
.handle_response(response, description, session)
103-
.await?;
99+
.handle_response(response, description, session).get_sync_result()?;
104100

105101
let mut data = ChangeStreamData {
106102
resume_token: ResumeToken::initial(self.args.options.as_ref(), &spec),
@@ -120,8 +116,7 @@ impl OperationWithDefaults for ChangeStreamAggregate {
120116
}
121117

122118
Ok((spec, data))
123-
}
124-
.boxed()
119+
}}
125120
}
126121

127122
fn selection_criteria(&self) -> Option<&SelectionCriteria> {

src/operation/bulk_write.rs

Lines changed: 58 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use crate::{
2121
error::{Error, ErrorKind, Result, WriteError},
2222
operation::OperationWithDefaults,
2323
results::{DeleteResult, InsertOneResult, UpdateResult},
24-
BoxFuture,
2524
Client,
2625
ClientSession,
2726
Cursor,
@@ -31,6 +30,7 @@ use crate::{
3130

3231
use super::{
3332
CursorInfo,
33+
OperationResponse,
3434
Retryability,
3535
WriteResponseBody,
3636
COMMAND_OVERHEAD_SIZE,
@@ -263,67 +263,69 @@ impl<'a> OperationWithDefaults for BulkWrite<'a> {
263263
response: RawCommandResponse,
264264
description: &'b StreamDescription,
265265
session: Option<&'b mut ClientSession>,
266-
) -> BoxFuture<'b, Result<Self::O>> {
267-
async move {
268-
let response: WriteResponseBody<BulkWriteResponse> = response.body()?;
269-
270-
let mut result = BulkWriteResult::new();
271-
result.populate_summary_info(&response.summary);
272-
273-
let mut bulk_write_error = BulkWriteError::default();
274-
if let Some(write_concern_error) = response.write_concern_error {
275-
bulk_write_error
276-
.write_concern_errors
277-
.push(write_concern_error);
278-
}
279-
280-
let specification = CursorSpecification::new(
281-
response.body.cursor,
282-
description.server_address.clone(),
283-
None,
284-
None,
285-
self.options.and_then(|options| options.comment.clone()),
286-
);
287-
let iteration_result = match session {
288-
Some(session) => {
289-
let mut session_cursor =
290-
SessionCursor::new(self.client.clone(), specification, None);
291-
self.iterate_results_cursor(
292-
session_cursor.stream(session),
293-
&mut bulk_write_error,
294-
&mut result,
295-
)
296-
.await
266+
) -> OperationResponse<'b, Self::O> {
267+
OperationResponse::Async(
268+
async move {
269+
let response: WriteResponseBody<BulkWriteResponse> = response.body()?;
270+
271+
let mut result = BulkWriteResult::new();
272+
result.populate_summary_info(&response.summary);
273+
274+
let mut bulk_write_error = BulkWriteError::default();
275+
if let Some(write_concern_error) = response.write_concern_error {
276+
bulk_write_error
277+
.write_concern_errors
278+
.push(write_concern_error);
297279
}
298-
None => {
299-
let cursor = Cursor::new(self.client.clone(), specification, None, None);
300-
self.iterate_results_cursor(cursor, &mut bulk_write_error, &mut result)
280+
281+
let specification = CursorSpecification::new(
282+
response.body.cursor,
283+
description.server_address.clone(),
284+
None,
285+
None,
286+
self.options.and_then(|options| options.comment.clone()),
287+
);
288+
let iteration_result = match session {
289+
Some(session) => {
290+
let mut session_cursor =
291+
SessionCursor::new(self.client.clone(), specification, None);
292+
self.iterate_results_cursor(
293+
session_cursor.stream(session),
294+
&mut bulk_write_error,
295+
&mut result,
296+
)
301297
.await
302-
}
303-
};
304-
305-
let source = match iteration_result {
306-
Ok(()) => {
307-
if bulk_write_error.is_empty() {
308-
return Ok(result);
309-
} else {
310-
None
311298
}
299+
None => {
300+
let cursor = Cursor::new(self.client.clone(), specification, None, None);
301+
self.iterate_results_cursor(cursor, &mut bulk_write_error, &mut result)
302+
.await
303+
}
304+
};
305+
306+
let source = match iteration_result {
307+
Ok(()) => {
308+
if bulk_write_error.is_empty() {
309+
return Ok(result);
310+
} else {
311+
None
312+
}
313+
}
314+
Err(top_level_error) => Some(top_level_error),
315+
};
316+
317+
if !result.is_empty() {
318+
bulk_write_error.partial_result = Some(result);
312319
}
313-
Err(top_level_error) => Some(top_level_error),
314-
};
315320

316-
if !result.is_empty() {
317-
bulk_write_error.partial_result = Some(result);
321+
let error = Error::new(
322+
ErrorKind::ClientBulkWrite(bulk_write_error),
323+
response.labels,
324+
);
325+
Err(error.with_source(source))
318326
}
319-
320-
let error = Error::new(
321-
ErrorKind::ClientBulkWrite(bulk_write_error),
322-
response.labels,
323-
);
324-
Err(error.with_source(source))
325-
}
326-
.boxed()
327+
.boxed(),
328+
)
327329
}
328330

329331
fn handle_error(&self, error: Error) -> Result<Self::O> {

0 commit comments

Comments
 (0)