Skip to content

Commit 5ef2d18

Browse files
committed
refa: migrate @satorijs/plugin-database
1 parent 357c73a commit 5ef2d18

File tree

9 files changed

+707
-1
lines changed

9 files changed

+707
-1
lines changed

packages/database/package.json

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
{
2+
"name": "@satorijs/plugin-database",
3+
"description": "Database for Satori protocol",
4+
"version": "0.1.1",
5+
"type": "module",
6+
"module": "lib/index.js",
7+
"files": [
8+
"lib"
9+
],
10+
"author": "Shigma <[email protected]>",
11+
"license": "MIT",
12+
"repository": {
13+
"type": "git",
14+
"url": "git+https://github.com/satorijs/satori.git",
15+
"directory": "packages/database"
16+
},
17+
"bugs": {
18+
"url": "https://github.com/satorijs/satori/issues"
19+
},
20+
"homepage": "https://github.com/satorijs/satori/tree/master/packages/database",
21+
"keywords": [
22+
"satori",
23+
"protocol",
24+
"server",
25+
"basic",
26+
"api",
27+
"database"
28+
],
29+
"cordis": {
30+
"service": {
31+
"required": [
32+
"database"
33+
],
34+
"implements": [
35+
"satori.database"
36+
]
37+
}
38+
},
39+
"devDependencies": {
40+
"@satorijs/plugin-server": "^2.5.0",
41+
"minato": "^3.3.0"
42+
},
43+
"peerDependencies": {
44+
"@satorijs/core": "^4.0.0",
45+
"minato": "^3.3.0"
46+
},
47+
"dependencies": {
48+
"cosmokit": "^1.6.2"
49+
}
50+
}

packages/database/src/channel.ts

+230
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
import { Bot, Context, Logger, Session, Universal } from '@satorijs/core'
2+
import { Message } from './types'
3+
import { Span } from './span'
4+
5+
const logger = new Logger('sync')
6+
7+
export enum SyncStatus {
8+
INIT,
9+
READY,
10+
FAILED,
11+
}
12+
13+
type MessageLike = Message | { sid: bigint }
14+
15+
type LocateResult = [Span, MessageLike]
16+
17+
interface CollectResult {
18+
temp?: Universal.TwoWayList<Universal.Message>
19+
span?: Span
20+
}
21+
22+
export class SyncChannel {
23+
public _spans: Span[] = []
24+
public _query: { platform: string; 'channel.id': string }
25+
26+
public hasLatest = false
27+
public hasEarliest = false
28+
29+
private _initTask?: Promise<void>
30+
31+
constructor(public ctx: Context, public bot: Bot, public guildId: string, public data: Universal.Channel) {
32+
this._query = { platform: bot.platform, 'channel.id': data.id }
33+
bot.ctx.emit('satori/database/update')
34+
}
35+
36+
private async init() {
37+
logger.debug('init channel %s %s %s', this.bot.platform, this.guildId, this.data.id)
38+
const data = await this.ctx.database
39+
.select('satori.message')
40+
.where({
41+
...this._query,
42+
flag: { $bitsAnySet: Message.Flag.FRONT | Message.Flag.BACK },
43+
})
44+
.orderBy('sid', 'asc')
45+
.project(['id', 'sid', 'flag'])
46+
.execute()
47+
while (data.length) {
48+
const { flag, id: frontId, sid: frontUid } = data.pop()!
49+
const front: Span.Endpoint = [frontUid, frontId]
50+
if (!(flag & Message.Flag.FRONT)) {
51+
throw new Error('malformed sync flag')
52+
} else if (flag & Message.Flag.BACK) {
53+
this._spans.push(new Span(this, Span.Type.REMOTE, front, front))
54+
} else {
55+
const { flag, id, sid } = data.pop()!
56+
if (flag & Message.Flag.BACK) {
57+
this._spans.push(new Span(this, Span.Type.REMOTE, front, [sid, id]))
58+
} else {
59+
throw new Error('malformed sync flag')
60+
}
61+
}
62+
}
63+
}
64+
65+
private binarySearch(sid: bigint) {
66+
let left = 0
67+
let right = this._spans.length
68+
while (left < right) {
69+
const mid = Math.floor((left + right) / 2)
70+
if (this._spans[mid].back[0] <= sid) {
71+
right = mid
72+
} else {
73+
left = mid + 1
74+
}
75+
}
76+
return left
77+
}
78+
79+
insert(data: Message[], options: Pick<Span, 'prev' | 'next' | 'prevTemp' | 'nextTemp'> = {}, forced: Span.PrevNext<boolean> = {}) {
80+
if (!data.length && !options.prev && !options.next) {
81+
throw new Error('unexpected empty span')
82+
}
83+
const back: Span.Endpoint = [data.at(0)!.sid, data.at(0)!.id]
84+
const front: Span.Endpoint = [data.at(-1)!.sid, data.at(-1)!.id]
85+
const span = new Span(this, Span.Type.LOCAL, front, back, data)
86+
const index = this.binarySearch(back[0])
87+
this._spans.splice(index, 0, span)
88+
span.prevTemp = options.prevTemp
89+
span.link('before', options.prev)
90+
span.merge('before')
91+
span.nextTemp = options.nextTemp
92+
span.link('after', options.next)
93+
span.merge('after')
94+
span.flush(forced)
95+
return span
96+
}
97+
98+
async queue(session: Session) {
99+
const prev = this.hasLatest ? this._spans[0] : undefined
100+
const message = Message.from(session.event.message!, session.platform, 'after', prev?.front[0])
101+
this.hasLatest = true
102+
this.insert([message], { prev }, { prev: true, next: true })
103+
}
104+
105+
getMessageList(id: string, dir?: Universal.Direction, limit?: number) {
106+
return this.bot.getMessageList(this.data.id, id, dir, limit, 'asc')
107+
}
108+
109+
// TODO handle default limit
110+
async list(id: string, dir: Universal.Direction, limit: number) {
111+
await (this._initTask ||= this.init())
112+
const result = await this.locate(id, dir, limit)
113+
if (!result) return []
114+
const [span, message] = result
115+
if (dir === 'around') limit = Math.floor(limit / 2) + 1
116+
const beforeTask = dir === 'after' ? Promise.resolve([]) : this.extend(span, message, limit, 'before')
117+
const afterTask = dir === 'before' ? Promise.resolve([]) : this.extend(span, message, limit, 'after')
118+
const [before, after] = await Promise.all([beforeTask, afterTask])
119+
if (dir === 'after') return after
120+
if (dir === 'before') return before
121+
return [...before.slice(0, -1), message, ...after.slice(1)]
122+
}
123+
124+
collect(result: Universal.TwoWayList<Universal.Message>, dir: Span.Direction, data: Message[], index?: number): CollectResult {
125+
const w = Span.words[dir]
126+
index ??= dir === 'after' ? -1 : result.data.length
127+
for (let i = index + w.inc; i >= 0 && i < result.data.length; i += w.inc) {
128+
const span = this._spans.find(span => span[w.back][1] === result.data[i].id)
129+
if (span) {
130+
const data = w.slice(result.data, i + w.inc)
131+
if (data.length) {
132+
span[w.temp] = { [w.next]: result[w.next], data }
133+
}
134+
return { span }
135+
}
136+
data[w.push](Message.from(result.data[i], this.bot.platform, dir, data.at(w.last)?.sid))
137+
}
138+
return { temp: { data: [], [w.next]: result[w.next] } }
139+
}
140+
141+
private async locate(id: string, dir: Universal.Direction, limit?: number): Promise<LocateResult | undefined> {
142+
// condition 1: message in memory
143+
for (const span of this._spans) {
144+
const message = span.data?.find(message => message.id === id)
145+
if (message) return [span, message]
146+
}
147+
148+
// condition 2: message in database
149+
const data = await this.ctx.database
150+
.select('satori.message')
151+
.where({ ...this._query, id })
152+
.execute()
153+
if (data[0]) {
154+
const { sid } = data[0]
155+
const span = this._spans[this.binarySearch(sid)]
156+
if (!span || span.back[0] > sid || span.front[0] < sid) throw new Error('malformed sync span')
157+
return [span, data[0]]
158+
}
159+
160+
// condition 3: message not cached, request from adapter
161+
let span: Span
162+
let message: MessageLike
163+
let index: number | undefined
164+
const result = await this.getMessageList(id, dir, limit)
165+
if (dir === 'around') {
166+
index = result.data.findIndex(item => item.id === id)
167+
if (index === -1) throw new Error('malformed message list')
168+
message = Message.from(result.data[index], this.bot.platform)
169+
data.push(message as Message)
170+
}
171+
172+
const { span: prev, temp: prevTemp } = this.collect(result, 'before', data, index)
173+
const { span: next, temp: nextTemp } = this.collect(result, 'after', data, index)
174+
175+
if (data.length || prev && next) {
176+
span = this.insert(data, { prev, next })
177+
} else if (prev || next) {
178+
span = prev || next!
179+
} else {
180+
if (dir === 'before') this.hasEarliest = true
181+
return
182+
}
183+
184+
span.prevTemp = prevTemp
185+
span.nextTemp = nextTemp
186+
if (dir === 'before') {
187+
message = { sid: span.front[0] }
188+
} else if (dir === 'after') {
189+
message = { sid: span.back[0] }
190+
}
191+
return [span, message!]
192+
}
193+
194+
private async extend(span: Span, message: MessageLike, limit: number, dir: Span.Direction) {
195+
const buffer: Message[] = []
196+
const w = Span.words[dir]
197+
198+
while (true) {
199+
const data = await span.collect(message, dir, limit - buffer.length)
200+
buffer[w.push](...data)
201+
if (buffer.length >= limit) {
202+
delete span[w.temp]
203+
break
204+
}
205+
206+
let result = span[w.temp]
207+
if (result) {
208+
let i = dir === 'before' ? result.data.length - 1 : 0
209+
for (; i >= 0 && i < result.data.length; i += w.inc) {
210+
if (!data.some(item => item.id === result!.data[i].id)) break
211+
}
212+
result.data = w.slice(result.data, i)
213+
if (!result.data.length) result = undefined
214+
delete span[w.temp]
215+
}
216+
217+
const next = span[w.next] ?? await (span[w.task] ??= span.extend(dir, limit - buffer.length, result))
218+
if (!next) break
219+
220+
span = next
221+
message = { sid: span[w.back][0] }
222+
}
223+
224+
if (dir === 'before') {
225+
return buffer.slice(-limit)
226+
} else {
227+
return buffer.slice(0, limit)
228+
}
229+
}
230+
}

packages/database/src/guild.ts

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import { Bot, Universal } from '@satorijs/core'
2+
3+
export class SyncGuild {
4+
public members?: Universal.List<Universal.GuildMember>
5+
6+
constructor(public bot: Bot, public data: Universal.Guild) {
7+
bot.ctx.emit('satori/database/update')
8+
}
9+
10+
async getMembers() {
11+
if (this.members) return this.members
12+
return this.members = await this.bot.getGuildMemberList(this.data.id)
13+
}
14+
}
15+
16+
export namespace SyncGuild {
17+
export interface Data {}
18+
}

0 commit comments

Comments
 (0)