1
- import { Bot , Context , Logger , Session , Universal } from '@satorijs/core'
1
+ import { Bot , Context , Logger , Session , Universal , omit } from '@satorijs/core'
2
2
import { Message } from './types'
3
- import { Span } from './span'
3
+ import { MessageLike , Span } from './span'
4
4
5
- const logger = new Logger ( 'sync' )
5
+ const logger = new Logger ( 'message' )
6
+ logger . level = 3
6
7
7
8
export enum SyncStatus {
8
9
INIT ,
9
10
READY ,
10
11
FAILED ,
11
12
}
12
13
13
- type MessageLike = Message | { sid : bigint }
14
-
15
14
type LocateResult = [ Span , MessageLike ]
16
15
17
16
interface CollectResult {
@@ -28,13 +27,12 @@ export class SyncChannel {
28
27
29
28
private _initTask ?: Promise < void >
30
29
31
- constructor ( public ctx : Context , public bot : Bot , public guildId : string , public data : Universal . Channel ) {
32
- this . _query = { platform : bot . platform , 'channel.id' : data . id }
33
- bot . ctx . emit ( 'satori/database/update' )
30
+ constructor ( public ctx : Context , public bot : Bot , public channelId : string ) {
31
+ this . _query = { platform : bot . platform , 'channel.id' : channelId }
34
32
}
35
33
36
34
private async init ( ) {
37
- logger . debug ( 'init channel %s %s %s ' , this . bot . platform , this . guildId , this . data . id )
35
+ logger . debug ( 'init channel %s %s' , this . bot . platform , this . channelId )
38
36
const data = await this . ctx . database
39
37
. select ( 'satori.message' )
40
38
. where ( {
@@ -76,10 +74,11 @@ export class SyncChannel {
76
74
return left
77
75
}
78
76
79
- insert ( data : Message [ ] , options : Pick < Span , 'prev' | 'next' | 'prevTemp' | 'nextTemp' > = { } , forced : Span . PrevNext < boolean > = { } ) {
77
+ insert ( data : Message [ ] , options : Pick < Span , 'prev' | 'next' | 'prevTemp' | 'nextTemp' > = { } ) {
80
78
if ( ! data . length && ! options . prev && ! options . next ) {
81
79
throw new Error ( 'unexpected empty span' )
82
80
}
81
+ console . log ( 'insert' , data . length )
83
82
const back : Span . Endpoint = [ data . at ( 0 ) ! . sid , data . at ( 0 ) ! . id ]
84
83
const front : Span . Endpoint = [ data . at ( - 1 ) ! . sid , data . at ( - 1 ) ! . id ]
85
84
const span = new Span ( this , Span . Type . LOCAL , front , back , data )
@@ -91,82 +90,100 @@ export class SyncChannel {
91
90
span . nextTemp = options . nextTemp
92
91
span . link ( 'after' , options . next )
93
92
span . merge ( 'after' )
94
- span . flush ( forced )
93
+ console . log ( 'inserted' , span . back [ 0 ] , span . front [ 0 ] , this . _spans . length )
94
+ span . flush ( )
95
95
return span
96
96
}
97
97
98
98
async queue ( session : Session ) {
99
99
const prev = this . hasLatest ? this . _spans [ 0 ] : undefined
100
- const message = Message . from ( session . event . message ! , session . platform , 'after' , prev ?. front [ 0 ] )
100
+ const message = Message . from ( session . event . message ! , session . platform , session . event , 'after' , prev ?. front [ 0 ] )
101
101
this . hasLatest = true
102
- this . insert ( [ message ] , { prev } , { prev : true , next : true } )
103
- }
104
-
105
- getMessageList ( id : string , dir ?: Universal . Direction , limit ?: number ) {
106
- return this . bot . getMessageList ( this . data . id , id , dir , limit , 'asc' )
102
+ this . insert ( [ message ] , { prev } )
107
103
}
108
104
109
- // TODO handle default limit
110
- async list ( id : string , dir : Universal . Direction , limit : number ) {
105
+ async getMessageList ( id ?: string , dir : Universal . Direction = 'before' , limit ?: number , order : Universal . Order = 'asc' ) {
111
106
await ( this . _initTask ||= this . init ( ) )
112
- const result = await this . locate ( id , dir , limit )
113
- if ( ! result ) return [ ]
114
- const [ span , message ] = result
107
+ logger . debug ( 'message.list %s:%s' , this . bot . platform , this . channelId , id , dir , limit , order )
108
+ const location = await this . locate ( id , dir , limit )
109
+ if ( ! location ) return { data : [ ] }
110
+ const [ span , message ] = location
111
+ logger . debug ( 'location %s in [%s, %s]' , message . sid , span . back [ 0 ] , span . front [ 0 ] )
112
+ limit ??= this . ctx . get ( 'satori.database' ) ! . config . message . defaultLimit
115
113
if ( dir === 'around' ) limit = Math . floor ( limit / 2 ) + 1
116
- const beforeTask = dir === 'after' ? Promise . resolve ( [ ] ) : this . extend ( span , message , limit , 'before' )
117
- const afterTask = dir === 'before' ? Promise . resolve ( [ ] ) : this . extend ( span , message , limit , 'after' )
114
+ const beforeTask = dir === 'after' ? Promise . resolve ( [ ] ) : this . extend ( span , message , 'before' , limit , id ? 1 : 0 )
115
+ const afterTask = dir === 'before' ? Promise . resolve ( [ ] ) : this . extend ( span , message , 'after' , limit , 1 )
118
116
const [ before , after ] = await Promise . all ( [ beforeTask , afterTask ] )
119
- if ( dir === 'after' ) return after
120
- if ( dir === 'before' ) return before
121
- return [ ...before . slice ( 0 , - 1 ) , message , ...after . slice ( 1 ) ]
117
+ let result : Universal . TwoWayList < Universal . Message >
118
+ // TODO: support hasEarliest and hasLatest
119
+ if ( dir === 'before' ) {
120
+ result = { data : before , next : before . at ( 0 ) ?. id , prev : before . at ( 0 ) ?. id }
121
+ } else if ( dir === 'after' ) {
122
+ result = { data : after , next : after . at ( - 1 ) ?. id , prev : after . at ( - 1 ) ?. id }
123
+ } else {
124
+ const data = [ ...before , message as Message , ...after ]
125
+ result = { data, next : data . at ( - 1 ) ?. id , prev : data . at ( 0 ) ?. id }
126
+ }
127
+ result . data = result . data . map ( ( message ) => omit ( message , [ 'sid' ] ) )
128
+ if ( order === 'desc' ) result . data . reverse ( )
129
+ return result
122
130
}
123
131
124
- collect ( result : Universal . TwoWayList < Universal . Message > , dir : Span . Direction , data : Message [ ] , index ? : number ) : CollectResult {
132
+ collect ( result : Universal . TwoWayList < Universal . Message > , dir : Span . Direction , data : Message [ ] , index : number ) : CollectResult {
125
133
const w = Span . words [ dir ]
126
- index ??= dir === 'after' ? - 1 : result . data . length
127
- for ( let i = index + w . inc ; i >= 0 && i < result . data . length ; i += w . inc ) {
134
+ for ( let i = index + w . unit ; i >= 0 && i < result . data . length ; i += w . unit ) {
128
135
const span = this . _spans . find ( span => span [ w . back ] [ 1 ] === result . data [ i ] . id )
129
136
if ( span ) {
130
- const data = w . slice ( result . data , i + w . inc )
137
+ const data = w . slice ( result . data , i + w . unit )
131
138
if ( data . length ) {
132
139
span [ w . temp ] = { [ w . next ] : result [ w . next ] , data }
133
140
}
134
141
return { span }
135
142
}
136
- data [ w . push ] ( Message . from ( result . data [ i ] , this . bot . platform , dir , data . at ( w . last ) ?. sid ) )
143
+ data [ w . push ] ( Message . from ( result . data [ i ] , this . bot . platform , undefined , dir , data . at ( w . last ) ?. sid ) )
137
144
}
138
145
return { temp : { data : [ ] , [ w . next ] : result [ w . next ] } }
139
146
}
140
147
141
- private async locate ( id : string , dir : Universal . Direction , limit ?: number ) : Promise < LocateResult | undefined > {
142
- // condition 1: message in memory
143
- for ( const span of this . _spans ) {
144
- const message = span . data ?. find ( message => message . id === id )
145
- if ( message ) return [ span , message ]
146
- }
148
+ private async locate ( id ?: string , dir : Universal . Direction = 'before' , limit ?: number ) : Promise < LocateResult | undefined > {
149
+ if ( id ) {
150
+ // condition 1: message in memory
151
+ for ( const span of this . _spans ) {
152
+ const message = span . data ?. find ( message => message . id === id )
153
+ if ( message ) return [ span , message ]
154
+ }
147
155
148
- // condition 2: message in database
149
- const data = await this . ctx . database
150
- . select ( 'satori.message' )
151
- . where ( { ...this . _query , id } )
152
- . execute ( )
153
- if ( data [ 0 ] ) {
154
- const { sid } = data [ 0 ]
155
- const span = this . _spans [ this . binarySearch ( sid ) ]
156
- if ( ! span || span . back [ 0 ] > sid || span . front [ 0 ] < sid ) throw new Error ( 'malformed sync span' )
157
- return [ span , data [ 0 ] ]
156
+ // condition 2: message in database
157
+ const data = await this . ctx . database
158
+ . select ( 'satori.message' )
159
+ . where ( { ...this . _query , id } )
160
+ . execute ( )
161
+ if ( data [ 0 ] ) {
162
+ const { sid } = data [ 0 ]
163
+ const span = this . _spans [ this . binarySearch ( sid ) ]
164
+ if ( ! span || span . back [ 0 ] > sid || span . front [ 0 ] < sid ) throw new Error ( 'malformed sync span' )
165
+ return [ span , data [ 0 ] ]
166
+ }
167
+ } else if ( this . hasLatest ) {
168
+ return [ this . _spans [ 0 ] , { sid : this . _spans [ 0 ] . front [ 0 ] } ]
158
169
}
159
170
160
171
// condition 3: message not cached, request from adapter
161
172
let span : Span
162
173
let message : MessageLike
163
174
let index : number | undefined
164
- const result = await this . getMessageList ( id , dir , limit )
175
+ const data : Message [ ] = [ ]
176
+ const result = await this . bot . self . getMessageList ( this . channelId , id , dir , limit )
177
+ console . log ( 'raw:' , result . data . length )
165
178
if ( dir === 'around' ) {
166
179
index = result . data . findIndex ( item => item . id === id )
167
180
if ( index === - 1 ) throw new Error ( 'malformed message list' )
168
181
message = Message . from ( result . data [ index ] , this . bot . platform )
169
182
data . push ( message as Message )
183
+ } else if ( dir === 'before' ) {
184
+ index = result . data . length
185
+ } else {
186
+ index = - 1
170
187
}
171
188
172
189
const { span : prev , temp : prevTemp } = this . collect ( result , 'before' , data , index )
@@ -191,12 +208,12 @@ export class SyncChannel {
191
208
return [ span , message ! ]
192
209
}
193
210
194
- private async extend ( span : Span , message : MessageLike , limit : number , dir : Span . Direction ) {
211
+ private async extend ( span : Span , message : MessageLike , dir : Span . Direction , limit : number , exclusive : number ) {
195
212
const buffer : Message [ ] = [ ]
196
213
const w = Span . words [ dir ]
197
214
198
215
while ( true ) {
199
- const data = await span . collect ( message , dir , limit - buffer . length )
216
+ const data = await span . collect ( message , dir , limit - buffer . length , exclusive )
200
217
buffer [ w . push ] ( ...data )
201
218
if ( buffer . length >= limit ) {
202
219
delete span [ w . temp ]
@@ -206,10 +223,11 @@ export class SyncChannel {
206
223
let result = span [ w . temp ]
207
224
if ( result ) {
208
225
let i = dir === 'before' ? result . data . length - 1 : 0
209
- for ( ; i >= 0 && i < result . data . length ; i += w . inc ) {
226
+ for ( ; i >= 0 && i < result . data . length ; i += w . unit ) {
210
227
if ( ! data . some ( item => item . id === result ! . data [ i ] . id ) ) break
211
228
}
212
229
result . data = w . slice ( result . data , i )
230
+ // inherit next?
213
231
if ( ! result . data . length ) result = undefined
214
232
delete span [ w . temp ]
215
233
}
@@ -219,6 +237,7 @@ export class SyncChannel {
219
237
220
238
span = next
221
239
message = { sid : span [ w . back ] [ 0 ] }
240
+ exclusive = 0
222
241
}
223
242
224
243
if ( dir === 'before' ) {
0 commit comments