Skip to content

Commit 1fd5f93

Browse files
fix(udb): fix rocksdb conflict tracker (#3225)
* feat(engine): add RocksDB engine run script * fix(udb): fix rocksdb conflict tracker --------- Co-authored-by: Nathan Flurry <[email protected]>
1 parent 1d59170 commit 1fd5f93

File tree

11 files changed

+395
-547
lines changed

11 files changed

+395
-547
lines changed

packages/common/metrics/src/providers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ pub fn set_sampler_ratio(ratio: f64) -> anyhow::Result<()> {
7272
}
7373

7474
fn resource() -> Resource {
75-
let mut resource = Resource::builder()
75+
let resource = Resource::builder()
7676
.with_service_name(rivet_env::service_name())
7777
.with_schema_url(
7878
[KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION"))],

packages/common/universaldb/src/driver/postgres/database.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use std::{
2-
sync::{Arc, Mutex},
2+
sync::{
3+
Arc,
4+
atomic::{AtomicI32, Ordering},
5+
},
36
time::Duration,
47
};
58

@@ -23,7 +26,7 @@ const GC_INTERVAL: Duration = Duration::from_secs(5);
2326

2427
pub struct PostgresDatabaseDriver {
2528
pool: Arc<Pool>,
26-
max_retries: Arc<Mutex<i32>>,
29+
max_retries: AtomicI32,
2730
gc_handle: JoinHandle<()>,
2831
}
2932

@@ -162,7 +165,7 @@ impl PostgresDatabaseDriver {
162165

163166
Ok(PostgresDatabaseDriver {
164167
pool: Arc::new(pool),
165-
max_retries: Arc::new(Mutex::new(100)),
168+
max_retries: AtomicI32::new(100),
166169
gc_handle,
167170
})
168171
}
@@ -182,7 +185,7 @@ impl DatabaseDriver for PostgresDatabaseDriver {
182185
) -> BoxFut<'a, Result<Erased>> {
183186
Box::pin(async move {
184187
let mut maybe_committed = MaybeCommitted(false);
185-
let max_retries = *self.max_retries.lock().unwrap();
188+
let max_retries = self.max_retries.load(Ordering::SeqCst);
186189

187190
for attempt in 0..max_retries {
188191
let tx = self.create_trx()?;
@@ -227,7 +230,7 @@ impl DatabaseDriver for PostgresDatabaseDriver {
227230
fn set_option(&self, opt: DatabaseOption) -> Result<()> {
228231
match opt {
229232
DatabaseOption::TransactionRetryLimit(limit) => {
230-
*self.max_retries.lock().unwrap() = limit;
233+
self.max_retries.store(limit, Ordering::SeqCst);
231234
Ok(())
232235
}
233236
}

packages/common/universaldb/src/driver/postgres/transaction.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,6 @@ impl TransactionDriver for PostgresTransactionDriver {
223223

224224
let (operations, conflict_ranges) = self.operations.consume();
225225

226-
// We have operations but no transaction - create one just for commit
227226
let tx_sender = self.ensure_transaction().await?;
228227

229228
// Send commit command
@@ -250,8 +249,8 @@ impl TransactionDriver for PostgresTransactionDriver {
250249
self.operations.clear_all();
251250
self.committed.store(false, Ordering::SeqCst);
252251

253-
// Note: We can't reset the transaction once it's created
254-
// The transaction task will continue running
252+
// Replace tx sender to get a new txn version
253+
self.tx_sender = OnceCell::new();
255254
}
256255

257256
fn cancel(&self) {

0 commit comments

Comments
 (0)