Skip to content

Commit 4b9cdca

Browse files
authored
Merge pull request #1162 from sfackler/transaction-builder-cancellation
Fix cancellation of TransactionBuilder::start
2 parents a1bdd0b + a0b2d70 commit 4b9cdca

File tree

2 files changed

+41
-41
lines changed

2 files changed

+41
-41
lines changed

tokio-postgres/src/client.rs

+3-39
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::codec::{BackendMessages, FrontendMessage};
1+
use crate::codec::BackendMessages;
22
use crate::config::SslMode;
33
use crate::connection::{Request, RequestMessages};
44
use crate::copy_out::CopyOutStream;
@@ -21,7 +21,7 @@ use fallible_iterator::FallibleIterator;
2121
use futures_channel::mpsc;
2222
use futures_util::{future, pin_mut, ready, StreamExt, TryStreamExt};
2323
use parking_lot::Mutex;
24-
use postgres_protocol::message::{backend::Message, frontend};
24+
use postgres_protocol::message::backend::Message;
2525
use postgres_types::BorrowToSql;
2626
use std::collections::HashMap;
2727
use std::fmt;
@@ -532,43 +532,7 @@ impl Client {
532532
///
533533
/// The transaction will roll back by default - use the `commit` method to commit it.
534534
pub async fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
535-
struct RollbackIfNotDone<'me> {
536-
client: &'me Client,
537-
done: bool,
538-
}
539-
540-
impl<'a> Drop for RollbackIfNotDone<'a> {
541-
fn drop(&mut self) {
542-
if self.done {
543-
return;
544-
}
545-
546-
let buf = self.client.inner().with_buf(|buf| {
547-
frontend::query("ROLLBACK", buf).unwrap();
548-
buf.split().freeze()
549-
});
550-
let _ = self
551-
.client
552-
.inner()
553-
.send(RequestMessages::Single(FrontendMessage::Raw(buf)));
554-
}
555-
}
556-
557-
// This is done, as `Future` created by this method can be dropped after
558-
// `RequestMessages` is synchronously send to the `Connection` by
559-
// `batch_execute()`, but before `Responses` is asynchronously polled to
560-
// completion. In that case `Transaction` won't be created and thus
561-
// won't be rolled back.
562-
{
563-
let mut cleaner = RollbackIfNotDone {
564-
client: self,
565-
done: false,
566-
};
567-
self.batch_execute("BEGIN").await?;
568-
cleaner.done = true;
569-
}
570-
571-
Ok(Transaction::new(self))
535+
self.build_transaction().start().await
572536
}
573537

574538
/// Returns a builder for a transaction with custom settings.

tokio-postgres/src/transaction_builder.rs

+38-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use crate::{Client, Error, Transaction};
1+
use postgres_protocol::message::frontend;
2+
3+
use crate::{codec::FrontendMessage, connection::RequestMessages, Client, Error, Transaction};
24

35
/// The isolation level of a database transaction.
46
#[derive(Debug, Copy, Clone)]
@@ -106,7 +108,41 @@ impl<'a> TransactionBuilder<'a> {
106108
query.push_str(s);
107109
}
108110

109-
self.client.batch_execute(&query).await?;
111+
struct RollbackIfNotDone<'me> {
112+
client: &'me Client,
113+
done: bool,
114+
}
115+
116+
impl<'a> Drop for RollbackIfNotDone<'a> {
117+
fn drop(&mut self) {
118+
if self.done {
119+
return;
120+
}
121+
122+
let buf = self.client.inner().with_buf(|buf| {
123+
frontend::query("ROLLBACK", buf).unwrap();
124+
buf.split().freeze()
125+
});
126+
let _ = self
127+
.client
128+
.inner()
129+
.send(RequestMessages::Single(FrontendMessage::Raw(buf)));
130+
}
131+
}
132+
133+
// This is done as `Future` created by this method can be dropped after
134+
// `RequestMessages` is synchronously send to the `Connection` by
135+
// `batch_execute()`, but before `Responses` is asynchronously polled to
136+
// completion. In that case `Transaction` won't be created and thus
137+
// won't be rolled back.
138+
{
139+
let mut cleaner = RollbackIfNotDone {
140+
client: self.client,
141+
done: false,
142+
};
143+
self.client.batch_execute(&query).await?;
144+
cleaner.done = true;
145+
}
110146

111147
Ok(Transaction::new(self.client))
112148
}

0 commit comments

Comments
 (0)