Skip to content

Commit e2c677a

Browse files
authored
uberf-8425: fix created-modified owners tool (#9283)
Signed-off-by: Alexey Zinoviev <[email protected]>
1 parent 858632e commit e2c677a

File tree

2 files changed

+333
-0
lines changed

2 files changed

+333
-0
lines changed

dev/tool/src/db.ts

Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import { generateToken } from '@hcengineering/server-token'
4242
import { connect } from '@hcengineering/server-tool'
4343
import { type MongoClient } from 'mongodb'
4444
import type postgres from 'postgres'
45+
import { type Row } from 'postgres'
4546
import { getToolToken } from './utils'
4647

4748
export async function moveFromMongoToPG (
@@ -694,6 +695,274 @@ export async function migrateCreatedModifiedBy (
694695
}
695696
}
696697

698+
export async function migrateCreatedModifiedByFixed (
699+
ctx: MeasureMetricsContext,
700+
dbUrl: string,
701+
workspace: Workspace,
702+
includeDomains?: string[],
703+
excludeDomains?: string[],
704+
maxLifetimeSec?: number,
705+
batchSize?: number,
706+
force: boolean = false,
707+
maxReconnects: number = 30,
708+
maxRetries: number = 50
709+
): Promise<void> {
710+
if (!dbUrl.startsWith('postgresql')) {
711+
throw new Error('Only CockroachDB is supported')
712+
}
713+
714+
const wsUuid = workspace.uuid
715+
ctx.info('Processing workspace', {
716+
workspaceUuid: workspace.uuid,
717+
workspaceName: workspace.name,
718+
workspaceUrl: workspace.url
719+
})
720+
721+
if (maxLifetimeSec !== undefined) {
722+
setDBExtraOptions({ max_lifetime: maxLifetimeSec })
723+
}
724+
725+
let progressMade = false
726+
let connectsCount = 0
727+
let retriesCount = 0
728+
let reconnecting = false
729+
let retrying = false
730+
let done = false
731+
let pg: ReturnType<typeof getDBClient> | undefined
732+
let pgClient: postgres.Sql | undefined
733+
734+
while (!done && (connectsCount === 0 || retrying || (reconnecting && progressMade))) {
735+
try {
736+
if (connectsCount === 0 || reconnecting) {
737+
ctx.info(reconnecting ? ' Reconnecting...' : ' Connecting...')
738+
739+
progressMade = false
740+
connectsCount++
741+
742+
pg = getDBClient(sharedPipelineContextVars, dbUrl)
743+
pgClient = await pg.getClient()
744+
745+
// Expect temp table with mapping to be created manually
746+
// Create progress table
747+
await pgClient`
748+
CREATE TABLE IF NOT EXISTS temp_data.account_personid_mapping_fixed_progress (
749+
workspace_id text,
750+
domain text,
751+
field text,
752+
CONSTRAINT account_personid_mapping_fixed_progress_pk PRIMARY KEY (workspace_id, domain, field)
753+
)
754+
`
755+
}
756+
757+
if (pgClient == null) {
758+
throw new Error('Could not connect to postgres')
759+
}
760+
761+
if (retrying) {
762+
retriesCount++
763+
}
764+
765+
reconnecting = false
766+
retrying = false
767+
768+
// Get list of tables to process
769+
const tables = await pgClient`
770+
SELECT table_name
771+
FROM information_schema.columns
772+
WHERE table_schema = 'public'
773+
AND column_name IN ('createdBy', 'modifiedBy')
774+
GROUP BY table_name
775+
HAVING COUNT(DISTINCT column_name) = 2
776+
`
777+
let filteredTables: Row[] = tables
778+
if (includeDomains != null && includeDomains.length > 0) {
779+
filteredTables = tables.filter((t) => includeDomains.includes(t.table_name))
780+
}
781+
if (excludeDomains != null && excludeDomains.length > 0) {
782+
filteredTables = filteredTables.filter((t) => !excludeDomains.includes(t.table_name))
783+
}
784+
785+
ctx.info(` Found ${filteredTables.length} tables to process`, {
786+
domains: filteredTables.map((t) => t.table_name)
787+
})
788+
789+
// Process each table
790+
for (const table of filteredTables) {
791+
const tableName = table.table_name
792+
ctx.info(` Processing table: ${tableName}`)
793+
794+
const progress = await pgClient`
795+
SELECT field
796+
FROM temp_data.account_personid_mapping_fixed_progress
797+
WHERE workspace_id = ${wsUuid} AND domain = ${tableName}
798+
`
799+
800+
const createdDone = !force && progress.some((p) => p.field === 'createdBy')
801+
const modifiedDone = !force && progress.some((p) => p.field === 'modifiedBy')
802+
803+
// Get counts for logging
804+
const [createdByCount] = !createdDone
805+
? await pgClient`
806+
SELECT COUNT(*)
807+
FROM ${pgClient(tableName)} t
808+
JOIN temp_data.account_personid_mapping_fixed m ON t."workspaceId" = m.workspace_id AND t."createdBy" = m.current_person_id
809+
WHERE t."workspaceId" = ${wsUuid}
810+
`
811+
: [{ count: 0 }]
812+
813+
const [modifiedByCount] = !modifiedDone
814+
? await pgClient`
815+
SELECT COUNT(*)
816+
FROM ${pgClient(tableName)} t
817+
JOIN temp_data.account_personid_mapping_fixed m ON t."workspaceId" = m.workspace_id AND t."modifiedBy" = m.current_person_id
818+
WHERE t."workspaceId" = ${wsUuid}
819+
`
820+
: [{ count: 0 }]
821+
822+
ctx.info(
823+
` Table ${tableName}: ${createdByCount.count} createdBy and ${modifiedByCount.count} modifiedBy records need updating`
824+
)
825+
826+
if (createdByCount.count > 0) {
827+
ctx.info(` Updating createdBy for ${tableName}...`)
828+
const startTime = Date.now()
829+
830+
if (batchSize == null || batchSize > createdByCount.count) {
831+
ctx.info(` Processing the whole table ${tableName}...`)
832+
await pgClient`
833+
UPDATE ${pgClient(tableName)}
834+
SET "createdBy" = m.correct_person_id::text
835+
FROM temp_data.account_personid_mapping_fixed m
836+
WHERE ${pgClient(tableName)}."workspaceId" = ${wsUuid} AND ${pgClient(tableName)}."workspaceId" = m.workspace_id AND ${pgClient(tableName)}."createdBy" = m.current_person_id
837+
`
838+
progressMade = true
839+
} else {
840+
ctx.info(` Processing the table ${tableName} in batches of ${batchSize}...`)
841+
let processed = 0
842+
while (true) {
843+
const res = await pgClient`
844+
UPDATE ${pgClient(tableName)}
845+
SET "createdBy" = m.correct_person_id::text
846+
FROM temp_data.account_personid_mapping_fixed m
847+
WHERE ${pgClient(tableName)}."workspaceId" = ${wsUuid} AND ${pgClient(tableName)}."workspaceId" = m.workspace_id AND ${pgClient(tableName)}."createdBy" = m.current_person_id
848+
LIMIT ${batchSize}
849+
`
850+
progressMade = true
851+
if (res.count === 0) {
852+
break
853+
}
854+
processed += res.count
855+
const duration = (Date.now() - startTime) / 1000
856+
const rate = Math.round(processed / duration)
857+
ctx.info(
858+
` Processing createdBy for ${tableName}: ${processed} rows in ${duration}s (${rate} rows/sec)`
859+
)
860+
}
861+
}
862+
863+
await pgClient`INSERT INTO temp_data.account_personid_mapping_fixed_progress (workspace_id, domain, field) VALUES (${wsUuid}, ${tableName}, 'createdBy') ON CONFLICT DO NOTHING`
864+
865+
const duration = (Date.now() - startTime) / 1000
866+
const rate = Math.round(createdByCount.count / duration)
867+
ctx.info(
868+
` Updated createdBy for ${tableName}: ${createdByCount.count} rows in ${duration}s (${rate} rows/sec)`
869+
)
870+
} else {
871+
if (createdDone) {
872+
ctx.info(' Skipping createdBy for table. Already done', { tableName })
873+
} else {
874+
await pgClient`INSERT INTO temp_data.account_personid_mapping_fixed_progress (workspace_id, domain, field) VALUES (${wsUuid}, ${tableName}, 'createdBy') ON CONFLICT DO NOTHING`
875+
}
876+
}
877+
878+
if (modifiedByCount.count > 0) {
879+
ctx.info(` Updating modifiedBy for ${tableName}...`)
880+
const startTime = Date.now()
881+
882+
if (batchSize == null || batchSize > modifiedByCount.count) {
883+
ctx.info(` Processing the whole table ${tableName}...`)
884+
await pgClient`
885+
UPDATE ${pgClient(tableName)}
886+
SET "modifiedBy" = m.correct_person_id::text
887+
FROM temp_data.account_personid_mapping_fixed m
888+
WHERE ${pgClient(tableName)}."workspaceId" = ${wsUuid} AND ${pgClient(tableName)}."workspaceId" = m.workspace_id AND ${pgClient(tableName)}."modifiedBy" = m.current_person_id
889+
`
890+
progressMade = true
891+
} else {
892+
ctx.info(` Processing the table ${tableName} in batches of ${batchSize}...`)
893+
let processed = 0
894+
while (true) {
895+
const res = await pgClient`
896+
UPDATE ${pgClient(tableName)}
897+
SET "modifiedBy" = m.correct_person_id::text
898+
FROM temp_data.account_personid_mapping_fixed m
899+
WHERE ${pgClient(tableName)}."workspaceId" = ${wsUuid} AND ${pgClient(tableName)}."workspaceId" = m.workspace_id AND ${pgClient(tableName)}."modifiedBy" = m.current_person_id
900+
LIMIT ${batchSize}
901+
`
902+
progressMade = true
903+
if (res.count === 0) {
904+
break
905+
}
906+
processed += res.count
907+
const duration = (Date.now() - startTime) / 1000
908+
const rate = Math.round(processed / duration)
909+
ctx.info(
910+
` Processing modifiedBy for ${tableName}: ${processed} rows in ${duration}s (${rate} rows/sec)`
911+
)
912+
}
913+
}
914+
915+
await pgClient`INSERT INTO temp_data.account_personid_mapping_fixed_progress (workspace_id, domain, field) VALUES (${wsUuid}, ${tableName}, 'modifiedBy') ON CONFLICT DO NOTHING`
916+
917+
const duration = (Date.now() - startTime) / 1000
918+
const rate = Math.round(modifiedByCount.count / duration)
919+
ctx.info(
920+
` Updated modifiedBy for ${tableName}: ${modifiedByCount.count} rows in ${duration}s (${rate} rows/sec)`
921+
)
922+
} else {
923+
if (modifiedDone) {
924+
ctx.info(' Skipping modifiedBy for table. Already done', { tableName })
925+
} else {
926+
await pgClient`INSERT INTO temp_data.account_personid_mapping_fixed_progress (workspace_id, domain, field) VALUES (${wsUuid}, ${tableName}, 'modifiedBy') ON CONFLICT DO NOTHING`
927+
}
928+
}
929+
}
930+
931+
done = true
932+
ctx.info('Migration of created/modified completed successfully')
933+
} catch (err: any) {
934+
if (err.code === '40001' || err.code === '55P03') {
935+
// Retry transaction
936+
if (retriesCount === maxRetries) {
937+
ctx.error('Failed to migrate created/modified by. Max retries reached', { err })
938+
} else {
939+
retrying = true
940+
continue
941+
}
942+
}
943+
944+
if (err.code === 'CONNECTION_CLOSED') {
945+
// Reconnect
946+
ctx.info(' Connection closed...')
947+
if (connectsCount === maxReconnects) {
948+
ctx.error('Failed to migrate created/modified by. Max reconnects reached', { err })
949+
} else {
950+
reconnecting = true
951+
continue
952+
}
953+
}
954+
955+
throw err
956+
} finally {
957+
pg?.close()
958+
}
959+
}
960+
961+
if (!done) {
962+
ctx.error('Failed to migrate created/modified by')
963+
}
964+
}
965+
697966
async function fillAccountSocialKeyMapping (ctx: MeasureMetricsContext, pgClient: postgres.Sql): Promise<void> {
698967
ctx.info('Creating account to social key mapping table...')
699968
// Create schema

dev/tool/src/index.ts

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ import accountPlugin, {
1919
createWorkspaceRecord,
2020
flattenStatus,
2121
getAccountDB,
22+
getWorkspaceById,
2223
getWorkspaceInfoWithStatusById,
24+
getWorkspaces,
25+
getWorkspacesInfoWithStatusByIds,
2326
signUpByEmail,
2427
updateWorkspaceInfo,
2528
type AccountDB,
@@ -56,6 +59,7 @@ import { updateField } from './workspace'
5659

5760
import {
5861
AccountRole,
62+
isArchivingMode,
5963
MeasureMetricsContext,
6064
metricsToString,
6165
SocialIdType,
@@ -103,6 +107,7 @@ import {
103107
ensureGlobalPersonsForLocalAccounts,
104108
filterMergedAccountsInMembers,
105109
migrateCreatedModifiedBy,
110+
migrateCreatedModifiedByFixed,
106111
migrateMergedAccounts,
107112
moveAccountDbFromMongoToPG
108113
} from './db'
@@ -2409,6 +2414,65 @@ export function devTool (
24092414
await migrateCreatedModifiedBy(toolCtx, dbUrl, domains, maxLifetime, batchSize)
24102415
})
24112416

2417+
program
2418+
.command('migrate-created-modified-by-fixed')
2419+
.option('--include-domains <includeDomains>', 'Domains to migrate(comma-separated)')
2420+
.option('--exclude-domains <excludeDomains>', 'Domains to skip migration for(comma-separated)')
2421+
.option('--lifetime <lifetime>', 'Max lifetime for the connection in seconds')
2422+
.option('--batch <batch>', 'Batch size')
2423+
.option('--force <force>', 'Force update', false)
2424+
.option('--max-reconnects <maxReconnects>', 'Max reconnects', '30')
2425+
.option('--max-retries <maxRetries>', 'Max reconnects', '50')
2426+
.option('--workspaces <workspaces>', 'Workspaces to migrate(comma-separated)')
2427+
.action(
2428+
async (cmd: {
2429+
includeDomains?: string
2430+
excludeDomains?: string
2431+
lifetime?: string
2432+
batch?: string
2433+
workspaces?: string
2434+
force: boolean
2435+
maxReconnects: string
2436+
maxRetries: string
2437+
}) => {
2438+
const { dbUrl } = prepareTools()
2439+
const includeDomains = cmd.includeDomains?.split(',').map((d) => d.trim())
2440+
const excludeDomains = cmd.excludeDomains?.split(',').map((d) => d.trim())
2441+
const maxLifetime = cmd.lifetime != null ? parseInt(cmd.lifetime) : undefined
2442+
const batchSize = cmd.batch != null ? parseInt(cmd.batch) : undefined
2443+
const maxReconnects = parseInt(cmd.maxReconnects)
2444+
const maxRetries = parseInt(cmd.maxRetries)
2445+
const wsUuids = cmd.workspaces?.split(',').map((it) => it.trim()) as WorkspaceUuid[]
2446+
2447+
await withAccountDatabase(async (accDb) => {
2448+
const rawWorkspaces =
2449+
wsUuids != null && wsUuids.length > 0
2450+
? await getWorkspacesInfoWithStatusByIds(accDb, wsUuids)
2451+
: await getWorkspaces(accDb, null, null, null)
2452+
const workspaces = rawWorkspaces
2453+
.filter((it) => !isArchivingMode(it.status.mode))
2454+
.sort((a, b) => (b.status.lastVisit ?? 0) - (a.status.lastVisit ?? 0))
2455+
2456+
toolCtx.info('Workspaces found', { count: workspaces.length })
2457+
2458+
for (const workspace of workspaces) {
2459+
await migrateCreatedModifiedByFixed(
2460+
toolCtx,
2461+
dbUrl,
2462+
workspace,
2463+
includeDomains,
2464+
excludeDomains,
2465+
maxLifetime,
2466+
batchSize,
2467+
cmd.force,
2468+
maxReconnects,
2469+
maxRetries
2470+
)
2471+
}
2472+
})
2473+
}
2474+
)
2475+
24122476
program.command('ensure-global-persons-for-local-accounts').action(async () => {
24132477
const { dbUrl } = prepareTools()
24142478

0 commit comments

Comments
 (0)