Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,12 @@ export class Agent implements IAyaAgent {
})
this.runtime_ = runtime

const knowledgeService = new KnowledgeService(runtime)
const knowledgeService = new KnowledgeService(
runtime,
agentcoinAPI,
agentcoinCookie,
agentcoinIdentity
)

// shutdown handler
let isShuttingDown = false
Expand Down
21 changes: 21 additions & 0 deletions src/apis/agentcoinfun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import {
HydratedMessage,
HydratedMessageSchema,
Identity,
Knowledge,
KnowledgeSchema,
User,
UserSchema
} from '@/common/types'
Expand Down Expand Up @@ -198,4 +200,23 @@ export class AgentcoinAPI {

return wallet
}

async getKnowledges(
identity: Identity,
options: { cookie: string; limit: number; cursor: number }
): Promise<Knowledge[]> {
const response = await fetch(`${AGENTCOIN_FUN_API_URL}/api/agents/knowledge/get`, {
method: 'POST',
headers: { 'Content-Type': 'application/json', Cookie: options.cookie },
body: JSON.stringify({ agentId: identity, limit: options.limit, cursor: options.cursor })
})

if (response.status !== 200) {
throw new Error('Failed to get knowledges')
}

const responseData = await response.json()
const knowledges = KnowledgeSchema.array().parse(responseData)
return knowledges
}
}
2 changes: 0 additions & 2 deletions src/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,3 @@ export const RUNTIME_SERVER_SOCKET_FILE = path.join(AGENTCOIN_FUN_DIR, 'runtime-
if (!fs.existsSync(AGENTCOIN_FUN_DIR)) {
fs.mkdirSync(AGENTCOIN_FUN_DIR, { recursive: true })
}

export const KNOWLEDGE_DIR = path.join(AGENTCOIN_FUN_DIR, 'knowledge')
53 changes: 47 additions & 6 deletions src/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,12 @@ export const GitStateSchema = z.object({

export type GitState = z.infer<typeof GitStateSchema>

export const KnowledgeSchema = z.object({
source: z.string(),
filename: z.string(),
action: z.enum(['create', 'delete']),
updatedAt: z.preprocess((arg) => (isRequiredString(arg) ? new Date(arg) : arg), z.date())
export const UserDmEventSchema = z.object({
channel: DMChannelSchema,
message: HydratedMessageSchema
})

export type Knowledge = z.infer<typeof KnowledgeSchema>
export type UserEvent = z.infer<typeof UserDmEventSchema>

// Character schema

Expand Down Expand Up @@ -353,6 +351,49 @@ export enum ServiceKind {
knowledge = 'knowledge-service'
}

const PdfFileSchema = z.object({
kind: z.literal('pdf'),
url: z.string()
})

const TxtFileSchema = z.object({
kind: z.literal('txt'),
url: z.string()
})

const MarkdownFileSchema = z.object({
kind: z.literal('markdown'),
url: z.string()
})

const DocxFileSchema = z.object({
kind: z.literal('docx'),
url: z.string()
})

const CsvFileSchema = z.object({
kind: z.literal('csv'),
url: z.string()
})

export const KnowledgeMetadataSchema = z.discriminatedUnion('kind', [
PdfFileSchema,
TxtFileSchema,
MarkdownFileSchema,
DocxFileSchema,
CsvFileSchema
])

export const KnowledgeSchema = z.object({
id: z.number(),
metadata: KnowledgeMetadataSchema,
name: z.string(),
agentId: AgentIdentitySchema,
createdAt: z.preprocess((arg) => (isRequiredString(arg) ? new Date(arg) : arg), z.date())
})

export type Knowledge = z.infer<typeof KnowledgeSchema>

export const MessageStatusEnumSchema = z.enum(['idle', 'thinking', 'typing'])
export type MessageStatusEnum = z.infer<typeof MessageStatusEnumSchema>

Expand Down
149 changes: 92 additions & 57 deletions src/services/knowledge.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { KNOWLEDGE_DIR } from '@/common/constants'
import { Knowledge, KnowledgeSchema, ServiceKind } from '@/common/types'
import { AgentcoinAPI } from '@/apis/agentcoinfun'
import { AgentcoinRuntime } from '@/common/runtime'
import { Identity, Knowledge, ServiceKind } from '@/common/types'
import {
elizaLogger,
embed,
Expand Down Expand Up @@ -33,7 +34,12 @@ export class KnowledgeService extends Service {

async initialize(_: IAgentRuntime): Promise<void> {}

constructor(private readonly runtime: IAgentRuntime) {
constructor(
private readonly runtime: AgentcoinRuntime,
private readonly agentCoinApi: AgentcoinAPI,
private readonly agentCoinCookie: string,
private readonly agentCoinIdentity: Identity
) {
super()
if (this.runtime.ragKnowledgeManager instanceof RAGKnowledgeManager) {
this.knowledgeRoot = this.runtime.ragKnowledgeManager.knowledgeRoot
Expand All @@ -47,82 +53,111 @@ export class KnowledgeService extends Service {
return
}

console.log('📌 Knowledge indexing job started...')
elizaLogger.info('📌 Knowledge sync job started...')
this.isRunning = true

while (this.isRunning) {
try {
await this.processJsonFiles(KNOWLEDGE_DIR)
await this.syncKnowledge()
} catch (error) {
console.error('⚠️ Error in indexing job:', error)
if (error instanceof Error) {
elizaLogger.error('⚠️ Error in sync job:', error.message)
} else {
elizaLogger.error('⚠️ Error in sync job:', error)
}
}

// Wait for 1 minute before the next run
await new Promise((resolve) => setTimeout(resolve, 60_000))
}

console.log('✅ Indexing job stopped gracefully.')
elizaLogger.info('✅ Sync job stopped gracefully.')
}

async stop(): Promise<void> {
this.isRunning = false
elizaLogger.info('Knowledge service stopped')
elizaLogger.info('Knowledge sync service stopped')
}

private async processJsonFiles(jsonDirectory: string): Promise<void> {
try {
const dirExists = await fs
.access(jsonDirectory)
.then(() => true)
.catch(() => false)
if (!dirExists) {
return
private async getAllKnowledge(): Promise<Knowledge[]> {
const allKnowledge: Knowledge[] = []
let cursor = 0
const limit = 100

while (true) {
const knowledges = await this.agentCoinApi.getKnowledges(this.agentCoinIdentity, {
cookie: this.agentCoinCookie,
limit,
cursor
})

allKnowledge.push(...knowledges)

if (knowledges.length < limit) {
break
}

const files = (await fs.readdir(jsonDirectory)).filter((file) => file.endsWith('.json'))
cursor = knowledges[knowledges.length - 1].id
}

for (const jsonFile of files) {
const filePath = path.join(jsonDirectory, jsonFile)
const metadata = await fs.readFile(filePath, 'utf-8')
elizaLogger.info(`Found ${allKnowledge.length} knowledges`)

let data: Knowledge
try {
data = KnowledgeSchema.parse(JSON.parse(metadata))
} catch (error) {
console.error(`Invalid JSON format in ${jsonFile}:`, error)
continue
}
return allKnowledge
}

const itemId = stringToUuid(jsonFile)
const existingKnowledge = await this.runtime.databaseAdapter.getKnowledge({
id: itemId,
private async syncKnowledge(): Promise<void> {
elizaLogger.info('Syncing knowledge...')
try {
const [knowledges, existingKnowledges] = await Promise.all([
this.getAllKnowledge(),
this.runtime.databaseAdapter.getKnowledge({
agentId: this.runtime.agentId
})
])

switch (data.action) {
case 'delete': {
if (existingKnowledge.length > 0) {
console.log(`Deleting knowledge item ${itemId}`)
await fs.unlink(path.join(this.knowledgeRoot, data.filename))
await this.runtime.ragKnowledgeManager.cleanupDeletedKnowledgeFiles()
}
break
}
case 'create': {
if (existingKnowledge.length === 0) {
await this.processFileMetadata(data, itemId)
}
break
}
const existingParentKnowledges = existingKnowledges.filter(
(knowledge) => !knowledge.content.metadata?.isChunk
)
const existingKnowledgeIds = existingParentKnowledges.map((knowledge) => knowledge.id)

const remoteKnowledgeIds: UUID[] = []
for (const knowledge of knowledges) {
const itemId = stringToUuid(knowledge.metadata.url)
remoteKnowledgeIds.push(itemId)

if (!existingKnowledgeIds.includes(itemId)) {
elizaLogger.info(`Processing new knowledge: ${knowledge.name}`)
await this.processFileKnowledge(knowledge, itemId)
}
}

const knowledgesToRemove = existingParentKnowledges.filter(
(knowledge) => !remoteKnowledgeIds.includes(knowledge.id)
)

for (const knowledge of knowledgesToRemove) {
elizaLogger.info(`Removing knowledge: ${knowledge.content.metadata?.source}`)

await this.runtime.databaseAdapter.removeKnowledge(knowledge.id)

await fs.unlink(path.join(this.knowledgeRoot, knowledge.content.metadata?.source))
await this.runtime.ragKnowledgeManager.cleanupDeletedKnowledgeFiles()
}
elizaLogger.info(
`Knowledge sync completed: ${remoteKnowledgeIds.length} remote items, ` +
`${knowledgesToRemove.length} items removed`
)
} catch (error) {
console.error('Error processing JSON files:', error)
if (error instanceof Error) {
elizaLogger.error('Error processing knowledge files:', error.message)
} else {
elizaLogger.error('Error processing knowledge files:', error)
}
throw error
}
}

private async processFileMetadata(data: Knowledge, itemId: UUID): Promise<void> {
private async processFileKnowledge(data: Knowledge, itemId: UUID): Promise<void> {
try {
const content = await this.downloadFile(data)

Expand All @@ -131,7 +166,7 @@ export class KnowledgeService extends Service {
agentId: this.runtime.agentId,
content: {
text: '',
metadata: { source: data.filename }
metadata: { source: data.name }
},
embedding: new Float32Array(getEmbeddingZeroVector()),
createdAt: Date.now()
Expand All @@ -154,7 +189,7 @@ export class KnowledgeService extends Service {
text: chunk,
metadata: {
isChunk: true,
source: data.filename,
source: data.name,
originalId: itemId,
chunkIndex: index
}
Expand All @@ -165,18 +200,18 @@ export class KnowledgeService extends Service {
})
)
} catch (error) {
console.error(`Error processing file metadata for ${data.filename}:`, error)
elizaLogger.error(`Error processing file metadata for ${data.name}:`, error)
}
}

private async downloadFile(file: Knowledge): Promise<string> {
await fs.mkdir(this.knowledgeRoot, { recursive: true })
const outputPath = path.join(this.knowledgeRoot, file.filename)
const outputPath = path.join(this.knowledgeRoot, file.name)

try {
const response = await axios({
method: 'GET',
url: file.source,
url: file.metadata.url,
responseType: 'arraybuffer'
})

Expand All @@ -194,9 +229,9 @@ export class KnowledgeService extends Service {
return ext in loaderMap
}

const fileExtension = path.extname(file.filename).toLowerCase()
const fileExtension = path.extname(file.name).toLowerCase()
if (!isValidFileExtension(fileExtension)) {
console.error(`Unsupported file type: ${fileExtension}`)
elizaLogger.error(`Unsupported file type: ${fileExtension}`)
throw new Error(`Unsupported file type: ${fileExtension}`)
}

Expand All @@ -206,14 +241,14 @@ export class KnowledgeService extends Service {
const loader = new LoaderClass(outputPath)
const docs = await loader.load()
const content = docs.map((doc) => doc.pageContent).join('\n')
console.log(`Successfully processed file: ${file.filename}`)
elizaLogger.info(`Successfully processed file: ${file.name}`)
return content
} catch (error) {
console.error(`Error parsing ${fileExtension} file: ${file.filename}`, error)
elizaLogger.error(`Error parsing ${fileExtension} file: ${file.name}`, error)
return ''
}
} catch (error) {
console.error(`Error processing file from ${file.source}:`, error)
elizaLogger.error(`Error processing file from ${file.metadata.url}:`, error)
throw error
}
}
Expand Down