Skip to content

Commit

Permalink
PDS: ActorStore Refactor (#3492)
Browse files Browse the repository at this point in the history
* Similification of the `pds` package by:
- Removing `DetailedAccountStore` class (moving its logic inside `AccountManager`)
- Factorizes image URL building into its own class (for easy re-use from `AccountManager`)
- Adds an `AppView` class that exposes an `agent: AtpAgent` and url builder function (used by the `ImageUrlBuilder`).
- Reworks the `ActorStore` to avoid circular dependency between `AccountManager` and `LocalViewerCreator` (needed because of first item)

* tidy

* move classes in their own file
  • Loading branch information
matthieusieben authored Feb 5, 2025
1 parent 8a30e0e commit 53a577f
Show file tree
Hide file tree
Showing 37 changed files with 522 additions and 530 deletions.
5 changes: 5 additions & 0 deletions .changeset/serious-pears-sit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@atproto/pds": patch
---

Code refactor
41 changes: 36 additions & 5 deletions packages/pds/src/account-manager/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { HOUR, wait } from '@atproto/common'
import {
Account,
AccountInfo,
AccountStore,
Code,
Expand All @@ -20,12 +21,15 @@ import {
UpdateRequestData,
} from '@atproto/oauth-provider'
import { AuthRequiredError } from '@atproto/xrpc-server'
import { Selectable } from 'kysely'
import { CID } from 'multiformats/cid'
import { KeyObject } from 'node:crypto'

import { ActorStore } from '../actor-store/actor-store'
import { AuthScope } from '../auth-verifier'
import { BackgroundQueue } from '../background'
import { softDeleted } from '../db'
import { ImageUrlBuilder } from '../image/image-url-builder'
import { StatusAttr } from '../lexicon/types/com/atproto/admin/defs'
import { AccountDb, EmailTokenPurpose, getDb, getMigrator } from './db'
import * as account from './helpers/account'
Expand All @@ -50,6 +54,8 @@ export class AccountManager
db: AccountDb

constructor(
private actorStore: ActorStore,
private imageUrlBuilder: ImageUrlBuilder,
private backgroundQueue: BackgroundQueue,
dbLocation: string,
private jwtKey: KeyObject,
Expand Down Expand Up @@ -500,6 +506,29 @@ export class AccountManager

// AccountStore

private async buildAccount(row: Selectable<ActorAccount>): Promise<Account> {
const account = deviceAccount.toAccount(row, this.serviceDid)

if (!account.name || !account.picture) {
const did = account.sub

const profile = await this.actorStore.read(did, async (store) => {
return store.record.getProfileRecord()
})

if (profile) {
const { avatar, displayName } = profile

account.name ||= displayName
account.picture ||= avatar
? this.imageUrlBuilder.build('avatar', did, avatar.ref.toString())
: undefined
}
}

return account
}

async authenticateAccount(
{ username: identifier, password, remember = false }: SignInCredentials,
deviceId: DeviceId,
Expand Down Expand Up @@ -554,7 +583,7 @@ export class AccountManager
if (!row) return null

return {
account: deviceAccount.toAccount(row, this.serviceDid),
account: await this.buildAccount(row),
info: deviceAccount.toDeviceAccountInfo(row),
}
}
Expand All @@ -564,10 +593,12 @@ export class AccountManager
.listRememberedQB(this.db, deviceId)
.execute()

return rows.map((row) => ({
account: deviceAccount.toAccount(row, this.serviceDid),
info: deviceAccount.toDeviceAccountInfo(row),
}))
return Promise.all(
rows.map(async (row) => ({
account: await this.buildAccount(row),
info: deviceAccount.toDeviceAccountInfo(row),
})),
)
}

async removeDeviceAccount(deviceId: DeviceId, sub: string): Promise<void> {
Expand Down
45 changes: 45 additions & 0 deletions packages/pds/src/actor-store/actor-store-reader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { Keypair } from '@atproto/crypto'
import { ActorStoreResources } from './actor-store-resources'
import { ActorDb } from './db'
import { PreferenceReader } from './preference/reader'
import { RecordReader } from './record/reader'
import { RepoReader } from './repo/reader'
import { ActorStoreTransactor } from './actor-store-transactor'

export class ActorStoreReader {
public readonly repo: RepoReader
public readonly record: RecordReader
public readonly pref: PreferenceReader

constructor(
public readonly did: string,
protected readonly db: ActorDb,
protected readonly resources: ActorStoreResources,
public readonly keypair: () => Promise<Keypair>,
) {
const blobstore = resources.blobstore(did)

this.repo = new RepoReader(db, blobstore)
this.record = new RecordReader(db)
this.pref = new PreferenceReader(db)

// Invoke "keypair" once. Also avoids leaking "this" as keypair context.
let keypairPromise: Promise<Keypair>
this.keypair = () => (keypairPromise ??= Promise.resolve().then(keypair))
}

async transact<T>(
fn: (fn: ActorStoreTransactor) => T | PromiseLike<T>,
): Promise<T> {
const keypair = await this.keypair()
return this.db.transaction((dbTxn) => {
const store = new ActorStoreTransactor(
this.did,
dbTxn,
keypair,
this.resources,
)
return fn(store)
})
}
}
8 changes: 8 additions & 0 deletions packages/pds/src/actor-store/actor-store-resources.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { BlobStore } from '@atproto/repo'
import { BackgroundQueue } from '../background'

export type ActorStoreResources = {
blobstore: (did: string) => BlobStore
backgroundQueue: BackgroundQueue
reservedKeyDir?: string
}
31 changes: 31 additions & 0 deletions packages/pds/src/actor-store/actor-store-transactor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { Keypair } from '@atproto/crypto'
import { ActorStoreResources } from './actor-store-resources'
import { ActorDb } from './db'
import { PreferenceTransactor } from './preference/transactor'
import { RecordTransactor } from './record/transactor'
import { RepoTransactor } from './repo/transactor'

export class ActorStoreTransactor {
public readonly record: RecordTransactor
public readonly repo: RepoTransactor
public readonly pref: PreferenceTransactor

constructor(
public readonly did: string,
protected readonly db: ActorDb,
protected readonly keypair: Keypair,
protected readonly resources: ActorStoreResources,
) {
const blobstore = resources.blobstore(did)

this.record = new RecordTransactor(db, blobstore)
this.pref = new PreferenceTransactor(db)
this.repo = new RepoTransactor(
db,
blobstore,
did,
keypair,
resources.backgroundQueue,
)
}
}
17 changes: 17 additions & 0 deletions packages/pds/src/actor-store/actor-store-writer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { ActorStoreTransactor } from './actor-store-transactor'

export class ActorStoreWriter extends ActorStoreTransactor {
async transact<T>(
fn: (fn: ActorStoreTransactor) => T | PromiseLike<T>,
): Promise<T> {
return this.db.transaction((dbTxn) => {
const transactor = new ActorStoreTransactor(
this.did,
dbTxn,
this.keypair,
this.resources,
)
return fn(transactor)
})
}
}
Original file line number Diff line number Diff line change
@@ -1,35 +1,23 @@
import path from 'path'
import assert from 'assert'
import fs from 'fs/promises'
import * as crypto from '@atproto/crypto'
import { Keypair, ExportableKeypair } from '@atproto/crypto'
import { BlobStore } from '@atproto/repo'
import {
chunkArray,
fileExists,
readIfExists,
rmIfExists,
} from '@atproto/common'
import { ActorDb, getDb, getMigrator } from './db'
import { BackgroundQueue } from '../background'
import { RecordReader } from './record/reader'
import { PreferenceReader } from './preference/reader'
import { RepoReader } from './repo/reader'
import { RepoTransactor } from './repo/transactor'
import { PreferenceTransactor } from './preference/transactor'
import * as crypto from '@atproto/crypto'
import { ExportableKeypair, Keypair } from '@atproto/crypto'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { RecordTransactor } from './record/transactor'
import { CID } from 'multiformats/cid'
import DiskBlobStore from '../disk-blobstore'
import { mkdir } from 'fs/promises'
import assert from 'node:assert'
import fs, { mkdir } from 'node:fs/promises'
import path from 'node:path'
import { ActorStoreConfig } from '../config'
import { retrySqlite } from '../db'

type ActorStoreResources = {
blobstore: (did: string) => BlobStore
backgroundQueue: BackgroundQueue
reservedKeyDir?: string
}
import DiskBlobStore from '../disk-blobstore'
import { ActorStoreReader } from './actor-store-reader'
import { ActorStoreResources } from './actor-store-resources'
import { ActorStoreTransactor } from './actor-store-transactor'
import { ActorStoreWriter } from './actor-store-writer'
import { ActorDb, getDb, getMigrator } from './db'

export class ActorStore {
reservedKeyDir: string
Expand Down Expand Up @@ -82,50 +70,39 @@ export class ActorStore {
return db
}

async read<T>(did: string, fn: ActorStoreReadFn<T>) {
async read<T>(did: string, fn: (fn: ActorStoreReader) => T | PromiseLike<T>) {
const db = await this.openDb(did)
try {
const reader = createActorReader(did, db, this.resources, () =>
this.keypair(did),
)
return await fn(reader)
const getKeypair = () => this.keypair(did)
return await fn(new ActorStoreReader(did, db, this.resources, getKeypair))
} finally {
db.close()
}
}

async transact<T>(did: string, fn: ActorStoreTransactFn<T>) {
async transact<T>(
did: string,
fn: (fn: ActorStoreTransactor) => T | PromiseLike<T>,
) {
const keypair = await this.keypair(did)
const db = await this.openDb(did)
try {
return await db.transaction((dbTxn) => {
const store = createActorTransactor(did, dbTxn, keypair, this.resources)
return fn(store)
return fn(new ActorStoreTransactor(did, dbTxn, keypair, this.resources))
})
} finally {
db.close()
}
}

async writeNoTransaction<T>(did: string, fn: ActorStoreWriterFn<T>) {
async writeNoTransaction<T>(
did: string,
fn: (fn: ActorStoreWriter) => T | PromiseLike<T>,
) {
const keypair = await this.keypair(did)
const db = await this.openDb(did)
try {
const writer = createActorTransactor(did, db, keypair, this.resources)
return await fn({
...writer,
transact: async <T>(fn: ActorStoreTransactFn<T>): Promise<T> => {
return db.transaction((dbTxn) => {
const transactor = createActorTransactor(
did,
dbTxn,
keypair,
this.resources,
)
return fn(transactor)
})
},
})
return await fn(new ActorStoreWriter(did, db, keypair, this.resources))
} finally {
db.close()
}
Expand Down Expand Up @@ -157,10 +134,9 @@ export class ActorStore {
if (blobstore instanceof DiskBlobStore) {
await blobstore.deleteAll()
} else {
const blobRows = await this.read(did, (store) =>
store.db.db.selectFrom('blob').select('cid').execute(),
const cids = await this.read(did, async (store) =>
store.repo.blob.getBlobCids(),
)
const cids = blobRows.map((row) => CID.parse(row.cid))
await Promise.allSettled(
chunkArray(cids, 500).map((chunk) => blobstore.deleteMany(chunk)),
)
Expand Down Expand Up @@ -226,73 +202,6 @@ const loadKey = async (loc: string): Promise<ExportableKeypair | undefined> => {
return crypto.Secp256k1Keypair.import(privKey, { exportable: true })
}

const createActorTransactor = (
did: string,
db: ActorDb,
keypair: Keypair,
resources: ActorStoreResources,
): ActorStoreTransactor => {
const { blobstore, backgroundQueue } = resources
const userBlobstore = blobstore(did)
return {
did,
db,
repo: new RepoTransactor(db, did, keypair, userBlobstore, backgroundQueue),
record: new RecordTransactor(db, userBlobstore),
pref: new PreferenceTransactor(db),
}
}

const createActorReader = (
did: string,
db: ActorDb,
resources: ActorStoreResources,
getKeypair: () => Promise<Keypair>,
): ActorStoreReader => {
const { blobstore } = resources
return {
did,
db,
repo: new RepoReader(db, blobstore(did)),
record: new RecordReader(db),
pref: new PreferenceReader(db),
keypair: getKeypair,
transact: async <T>(fn: ActorStoreTransactFn<T>): Promise<T> => {
const keypair = await getKeypair()
return db.transaction((dbTxn) => {
const store = createActorTransactor(did, dbTxn, keypair, resources)
return fn(store)
})
},
}
}

export type ActorStoreReadFn<T> = (fn: ActorStoreReader) => Promise<T>
export type ActorStoreTransactFn<T> = (fn: ActorStoreTransactor) => Promise<T>
export type ActorStoreWriterFn<T> = (fn: ActorStoreWriter) => Promise<T>

export type ActorStoreReader = {
did: string
db: ActorDb
repo: RepoReader
record: RecordReader
pref: PreferenceReader
keypair: () => Promise<Keypair>
transact: <T>(fn: ActorStoreTransactFn<T>) => Promise<T>
}

export type ActorStoreTransactor = {
did: string
db: ActorDb
repo: RepoTransactor
record: RecordTransactor
pref: PreferenceTransactor
}

export type ActorStoreWriter = ActorStoreTransactor & {
transact: <T>(fn: ActorStoreTransactFn<T>) => Promise<T>
}

function assertSafePathPart(part: string) {
const normalized = path.normalize(part)
assert(
Expand Down
Loading

0 comments on commit 53a577f

Please sign in to comment.