Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/cognitive/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@botpress/cognitive",
"version": "0.1.50",
"version": "0.2.0",
"description": "Wrapper around the Botpress Client to call LLMs",
"main": "./dist/index.cjs",
"module": "./dist/index.mjs",
Expand Down
27 changes: 25 additions & 2 deletions packages/cognitive/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,27 @@ export class Cognitive {
}

const betaClient = new CognitiveBeta(this._client.config)
const response = await betaClient.generateText(input as any)
const props: Request = { input }

return {
// Forward beta client events to main client events
betaClient.on('request', () => {
this._events.emit('request', props)
})

betaClient.on('error', (_req, error) => {
this._events.emit('error', props, error)
})

betaClient.on('retry', (_req, error) => {
this._events.emit('retry', props, error)
})

const response = await betaClient.generateText(input as any, {
signal: input.signal,
timeout: this._timeoutMs,
})

const result: Response = {
output: {
id: 'beta-output',
provider: response.metadata.provider,
Expand Down Expand Up @@ -224,6 +242,11 @@ export class Cognitive {
},
},
}

// Emit final response event with actual data
this._events.emit('response', props, result)

return result
}

private async _generateContent(input: InputProps): Promise<Response> {
Expand Down
214 changes: 146 additions & 68 deletions packages/cognitive/src/cognitive-v2/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import axios, { AxiosInstance } from 'axios'
import { backOff } from 'exponential-backoff'
import { createNanoEvents, Unsubscribe } from 'nanoevents'
import { defaultModel, models } from './models'
import { CognitiveRequest, CognitiveResponse, CognitiveStreamChunk, Model } from './types'

export { CognitiveRequest, CognitiveResponse, CognitiveStreamChunk }

export type BetaEvents = {
request: (req: { input: CognitiveRequest }) => void
response: (req: { input: CognitiveRequest }, res: CognitiveResponse) => void
error: (req: { input: CognitiveRequest }, error: any) => void
retry: (req: { input: CognitiveRequest }, error: any) => void
}

type ClientProps = {
apiUrl?: string
timeout?: number
Expand All @@ -31,6 +39,7 @@ export class CognitiveBeta {
private readonly _withCredentials: boolean
private readonly _headers: Record<string, string | string[]>
private readonly _debug: boolean = false
private _events = createNanoEvents<BetaEvents>()

public constructor(props: ClientProps) {
this._apiUrl = props.apiUrl || 'https://api.botpress.cloud'
Expand Down Expand Up @@ -68,17 +77,33 @@ export class CognitiveBeta {
})
}

public on<K extends keyof BetaEvents>(event: K, cb: BetaEvents[K]): Unsubscribe {
return this._events.on(event, cb)
}

public async generateText(input: CognitiveRequest, options: RequestOptions = {}) {
const signal = options.signal ?? AbortSignal.timeout(this._timeout)
const req = { input }

this._events.emit('request', req)

try {
const { data } = await this._withServerRetry(
() =>
this._axiosClient.post<CognitiveResponse>('/v2/cognitive/generate-text', input, {
signal,
timeout: options.timeout ?? this._timeout,
}),
options,
req
)

const { data } = await this._withServerRetry(() =>
this._axiosClient.post<CognitiveResponse>('/v2/cognitive/generate-text', input, {
signal,
timeout: options.timeout ?? this._timeout,
})
)

return data
this._events.emit('response', req, data)
return data
} catch (error) {
this._events.emit('error', req, error)
throw error
}
}

public async listModels() {
Expand All @@ -94,69 +119,102 @@ export class CognitiveBeta {
options: RequestOptions = {}
): AsyncGenerator<CognitiveStreamChunk, void, unknown> {
const signal = options.signal ?? AbortSignal.timeout(this._timeout)
const req = { input: request }
const chunks: CognitiveStreamChunk[] = []
let lastChunk: CognitiveStreamChunk | undefined

this._events.emit('request', req)

try {
if (isBrowser()) {
const res = await fetch(`${this._apiUrl}/v2/cognitive/generate-text-stream`, {
method: 'POST',
headers: {
...this._headers,
'Content-Type': 'application/json',
},
credentials: this._withCredentials ? 'include' : 'omit',
body: JSON.stringify({ ...request, stream: true }),
signal,
})

if (isBrowser()) {
const res = await fetch(`${this._apiUrl}/v2/cognitive/generate-text-stream`, {
method: 'POST',
headers: {
...this._headers,
'Content-Type': 'application/json',
},
credentials: this._withCredentials ? 'include' : 'omit',
body: JSON.stringify({ ...request, stream: true }),
signal,
})

if (!res.ok) {
const text = await res.text().catch(() => '')
const err = new Error(`HTTP ${res.status}: ${text || res.statusText}`)
;(err as any).response = { status: res.status, data: text }
throw err
}
if (!res.ok) {
const text = await res.text().catch(() => '')
const err = new Error(`HTTP ${res.status}: ${text || res.statusText}`)
;(err as any).response = { status: res.status, data: text }
throw err
}

const body = res.body
if (!body) {
throw new Error('No response body received for streaming request')
}
const body = res.body
if (!body) {
throw new Error('No response body received for streaming request')
}

const reader = body.getReader()
const iterable = (async function* () {
for (;;) {
const { value, done } = await reader.read()
if (done) {
break
}
if (value) {
yield value
const reader = body.getReader()
const iterable = (async function* () {
for (;;) {
const { value, done } = await reader.read()
if (done) {
break
}
if (value) {
yield value
}
}
})()

for await (const obj of this._ndjson<CognitiveStreamChunk>(iterable)) {
chunks.push(obj)
lastChunk = obj
yield obj
}
})()

for await (const obj of this._ndjson<CognitiveStreamChunk>(iterable)) {
yield obj
// Emit response event with the final chunk metadata
if (lastChunk?.metadata) {
this._events.emit('response', req, {
output: chunks.map((c) => c.output || '').join(''),
metadata: lastChunk.metadata,
})
}
return
}
return
}

const res = await this._withServerRetry(() =>
this._axiosClient.post(
'/v2/cognitive/generate-text-stream',
{ ...request, stream: true },
{
responseType: 'stream',
signal,
timeout: options.timeout ?? this._timeout,
}
const res = await this._withServerRetry(
() =>
this._axiosClient.post(
'/v2/cognitive/generate-text-stream',
{ ...request, stream: true },
{
responseType: 'stream',
signal,
timeout: options.timeout ?? this._timeout,
}
),
options,
req
)
)

const nodeStream: AsyncIterable<Uint8Array> = res.data as any
if (!nodeStream) {
throw new Error('No response body received for streaming request')
}
const nodeStream: AsyncIterable<Uint8Array> = res.data as any
if (!nodeStream) {
throw new Error('No response body received for streaming request')
}

for await (const obj of this._ndjson<CognitiveStreamChunk>(nodeStream)) {
yield obj
for await (const obj of this._ndjson<CognitiveStreamChunk>(nodeStream)) {
chunks.push(obj)
lastChunk = obj
yield obj
}

// Emit response event with the final chunk metadata
if (lastChunk?.metadata) {
this._events.emit('response', req, {
output: chunks.map((c) => c.output || '').join(''),
metadata: lastChunk.metadata,
})
}
} catch (error) {
this._events.emit('error', req, error)
throw error
}
}

Expand Down Expand Up @@ -214,14 +272,34 @@ export class CognitiveBeta {
return false
}

private async _withServerRetry<T>(fn: () => Promise<T>): Promise<T> {
return backOff(fn, {
numOfAttempts: 3,
startingDelay: 300,
timeMultiple: 2,
jitter: 'full',
retry: (e) => this._isRetryableServerError(e),
})
private async _withServerRetry<T>(
fn: () => Promise<T>,
options: RequestOptions = {},
req?: { input: CognitiveRequest }
): Promise<T> {
let attemptCount = 0
return backOff(
async () => {
try {
const result = await fn()
attemptCount = 0
return result
} catch (error) {
if (attemptCount > 0 && req) {
this._events.emit('retry', req, error)
}
attemptCount++
throw error
}
},
{
numOfAttempts: 3,
startingDelay: 300,
timeMultiple: 2,
jitter: 'full',
retry: (e) => !options.signal?.aborted && this._isRetryableServerError(e),
}
)
}
}

Expand Down
4 changes: 2 additions & 2 deletions packages/cognitive/src/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export type GenerateContentInput = {
type: 'text' | 'image'
/** Indicates the MIME type of the content. If not provided it will be detected from the content-type header of the provided URL. */
mimeType?: string
/** Required if part type is "text" */
/** Required if part type is "text" */
text?: string
/** Required if part type is "image" */
url?: string
Expand Down Expand Up @@ -103,7 +103,7 @@ export type GenerateContentOutput = {
type: 'text' | 'image'
/** Indicates the MIME type of the content. If not provided it will be detected from the content-type header of the provided URL. */
mimeType?: string
/** Required if part type is "text" */
/** Required if part type is "text" */
text?: string
/** Required if part type is "image" */
url?: string
Expand Down
3 changes: 1 addition & 2 deletions packages/llmz/examples/01_chat_basic/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
* - Conversation history management
*/

import { CLIChat } from '../utils/cli-chat'
import { Client } from '@botpress/client'
import { execute } from 'llmz'

import { CLIChat } from '../utils/cli-chat'

// Initialize the Botpress Client for LLM interactions
// This client handles authentication and communication with language models
const client = new Client({
Expand Down
2 changes: 1 addition & 1 deletion packages/llmz/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
},
"peerDependencies": {
"@botpress/client": "1.27.0",
"@botpress/cognitive": "0.1.50",
"@botpress/cognitive": "0.2.0",
"@bpinternal/thicktoken": "^1.0.5",
"@bpinternal/zui": "1.2.1"
},
Expand Down
Loading
Loading