Skip to content

Commit cd9f10f

Browse files
committed
store: Map other shards into a new shard on startup
Fixes #4719
1 parent 43baf5b commit cd9f10f

File tree

2 files changed

+55
-25
lines changed

2 files changed

+55
-25
lines changed

store/postgres/src/catalog.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -415,14 +415,18 @@ pub fn drop_schema(conn: &PgConnection, nsp: &str) -> Result<(), StoreError> {
415415
Ok(conn.batch_execute(&query)?)
416416
}
417417

418-
pub fn migration_count(conn: &PgConnection) -> Result<i64, StoreError> {
418+
pub fn migration_count(conn: &PgConnection) -> Result<usize, StoreError> {
419419
use __diesel_schema_migrations as m;
420420

421421
if !table_exists(conn, NAMESPACE_PUBLIC, &MIGRATIONS_TABLE)? {
422422
return Ok(0);
423423
}
424424

425-
m::table.count().get_result(conn).map_err(StoreError::from)
425+
m::table
426+
.count()
427+
.get_result(conn)
428+
.map(|n: i64| n as usize)
429+
.map_err(StoreError::from)
426430
}
427431

428432
pub fn account_like(conn: &PgConnection, site: &Site) -> Result<HashSet<String>, StoreError> {

store/postgres/src/connection_pool.rs

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,13 +1011,7 @@ impl PoolInner {
10111011
let result = pool
10121012
.configure_fdw(coord.servers.as_ref())
10131013
.and_then(|()| migrate_schema(&pool.logger, &conn))
1014-
.and_then(|had_migrations| {
1015-
if had_migrations {
1016-
coord.propagate_schema_change(&self.shard)
1017-
} else {
1018-
Ok(())
1019-
}
1020-
});
1014+
.and_then(|count| coord.propagate(&pool, count));
10211015
debug!(&pool.logger, "Release migration lock");
10221016
advisory_lock::unlock_migration(&conn).unwrap_or_else(|err| {
10231017
die(&pool.logger, "failed to release migration lock", &err);
@@ -1107,12 +1101,31 @@ impl PoolInner {
11071101

11081102
embed_migrations!("./migrations");
11091103

1104+
struct MigrationCount {
1105+
old: usize,
1106+
new: usize,
1107+
}
1108+
1109+
impl MigrationCount {
1110+
fn new(old: usize, new: usize) -> Self {
1111+
Self { old, new }
1112+
}
1113+
1114+
fn had_migrations(&self) -> bool {
1115+
self.old != self.new
1116+
}
1117+
1118+
fn is_new(&self) -> bool {
1119+
self.old == 0
1120+
}
1121+
}
1122+
11101123
/// Run all schema migrations.
11111124
///
11121125
/// When multiple `graph-node` processes start up at the same time, we ensure
11131126
/// that they do not run migrations in parallel by using `blocking_conn` to
11141127
/// serialize them. The `conn` is used to run the actual migration.
1115-
fn migrate_schema(logger: &Logger, conn: &PgConnection) -> Result<bool, StoreError> {
1128+
fn migrate_schema(logger: &Logger, conn: &PgConnection) -> Result<MigrationCount, StoreError> {
11161129
// Collect migration logging output
11171130
let mut output = vec![];
11181131

@@ -1122,7 +1135,7 @@ fn migrate_schema(logger: &Logger, conn: &PgConnection) -> Result<bool, StoreErr
11221135
let result = embedded_migrations::run_with_output(conn, &mut output);
11231136
info!(logger, "Migrations finished");
11241137

1125-
let had_migrations = catalog::migration_count(conn)? != old_count;
1138+
let new_count = catalog::migration_count(conn)?;
11261139

11271140
// If there was any migration output, log it now
11281141
let msg = String::from_utf8(output).unwrap_or_else(|_| String::from("<unreadable>"));
@@ -1136,14 +1149,15 @@ fn migrate_schema(logger: &Logger, conn: &PgConnection) -> Result<bool, StoreErr
11361149
debug!(logger, "Postgres migration output"; "output" => msg);
11371150
}
11381151
}
1152+
let count = MigrationCount::new(old_count, new_count);
11391153

1140-
if had_migrations {
1154+
if count.had_migrations() {
11411155
// Reset the query statistics since a schema change makes them not
11421156
// all that useful. An error here is not serious and can be ignored.
11431157
conn.batch_execute("select pg_stat_statements_reset()").ok();
11441158
}
11451159

1146-
Ok(had_migrations)
1160+
Ok(count)
11471161
}
11481162

11491163
/// Helper to coordinate propagating schema changes from the database that
@@ -1207,18 +1221,23 @@ impl PoolCoordinator {
12071221

12081222
/// Propagate changes to the schema in `shard` to all other pools. Those
12091223
/// other pools will then recreate any tables that they imported from
1210-
/// `shard`
1211-
fn propagate_schema_change(&self, shard: &Shard) -> Result<(), StoreError> {
1212-
let server = self
1213-
.servers
1214-
.iter()
1215-
.find(|server| &server.shard == shard)
1216-
.ok_or_else(|| constraint_violation!("unknown shard {shard}"))?;
1217-
1218-
for pool in self.pools.lock().unwrap().values() {
1219-
if let Err(e) = pool.remap(server) {
1220-
error!(pool.logger, "Failed to map imports from {}", server.shard; "error" => e.to_string());
1221-
return Err(e);
1224+
/// `shard`. If `pool` is a new shard, we also map all other shards into
1225+
/// it.
1226+
fn propagate(&self, pool: &PoolInner, count: MigrationCount) -> Result<(), StoreError> {
1227+
// pool is a new shard, map all other shards into it
1228+
if count.is_new() {
1229+
for server in self.servers.iter() {
1230+
pool.remap(server)?;
1231+
}
1232+
}
1233+
// pool had schema changes, refresh the import from pool into all other shards
1234+
if count.had_migrations() {
1235+
let server = self.server(&pool.shard)?;
1236+
for pool in self.pools.lock().unwrap().values() {
1237+
if let Err(e) = pool.remap(server) {
1238+
error!(pool.logger, "Failed to map imports from {}", server.shard; "error" => e.to_string());
1239+
return Err(e);
1240+
}
12221241
}
12231242
}
12241243
Ok(())
@@ -1231,4 +1250,11 @@ impl PoolCoordinator {
12311250
pub fn servers(&self) -> Arc<Vec<ForeignServer>> {
12321251
self.servers.clone()
12331252
}
1253+
1254+
fn server(&self, shard: &Shard) -> Result<&ForeignServer, StoreError> {
1255+
self.servers
1256+
.iter()
1257+
.find(|server| &server.shard == shard)
1258+
.ok_or_else(|| constraint_violation!("unknown shard {shard}"))
1259+
}
12341260
}

0 commit comments

Comments
 (0)