11import { IMaintenanceOperation , OperationResultType } from "../../OperationAbstractions.js" ;
2- import { Stream } from "node:stream" ;
2+ import { Readable , Stream } from "node:stream" ;
3+ import { createInterface } from "node:readline" ;
34import type { AiAgentActionResponse } from "./AiAgentActionResponse.js" ;
45import type { AiConversationCreationOptions } from "./AiConversationCreationOptions.js" ;
56import type { ConversationResult } from "./ConversationResult.js" ;
7+ import type { AiStreamCallback } from "../AiStreamCallback.js" ;
68import { RavenCommand } from "../../../../Http/RavenCommand.js" ;
79import { DocumentConventions } from "../../../Conventions/DocumentConventions.js" ;
810import { IRaftCommand } from "../../../../Http/IRaftCommand.js" ;
@@ -21,27 +23,39 @@ export class RunConversationOperation<TAnswer> implements IMaintenanceOperation<
2123 private readonly _actionResponses ?: AiAgentActionResponse [ ] ;
2224 private readonly _options ?: AiConversationCreationOptions ;
2325 private readonly _changeVector ?: string ;
26+ private readonly _streamPropertyPath ?: string ;
27+ private readonly _streamCallback ?: AiStreamCallback ;
2428
2529 public constructor (
2630 agentId : string ,
2731 conversationId : string ,
2832 userPrompt ?: string ,
2933 actionResponses ?: AiAgentActionResponse [ ] ,
3034 options ?: AiConversationCreationOptions ,
31- changeVector ?: string
35+ changeVector ?: string ,
36+ streamPropertyPath ?: string ,
37+ streamCallback ?: AiStreamCallback
3238 ) {
3339 if ( StringUtil . isNullOrEmpty ( agentId ) ) {
3440 throwError ( "InvalidArgumentException" , "agentId cannot be null or empty." ) ;
3541 }
3642 if ( StringUtil . isNullOrEmpty ( conversationId ) ) {
3743 throwError ( "InvalidArgumentException" , "conversationId cannot be null or empty." ) ;
3844 }
45+
46+ // Both streamPropertyPath and streamCallback must be specified together or neither
47+ if ( ( streamPropertyPath != null ) !== ( streamCallback != null ) ) {
48+ throwError ( "InvalidOperationException" , "Both streamPropertyPath and streamCallback must be specified together or neither." ) ;
49+ }
50+
3951 this . _agentId = agentId ;
4052 this . _conversationId = conversationId ;
4153 this . _userPrompt = userPrompt ;
4254 this . _actionResponses = actionResponses ;
4355 this . _options = options ;
4456 this . _changeVector = changeVector ;
57+ this . _streamPropertyPath = streamPropertyPath ;
58+ this . _streamCallback = streamCallback ;
4559 }
4660
4761 public get resultType ( ) : OperationResultType {
@@ -56,7 +70,9 @@ export class RunConversationOperation<TAnswer> implements IMaintenanceOperation<
5670 this . _actionResponses ,
5771 this . _options ,
5872 this . _changeVector ,
59- conventions
73+ conventions ,
74+ this . _streamPropertyPath ,
75+ this . _streamCallback
6076 ) ;
6177 }
6278}
@@ -70,6 +86,8 @@ class RunConversationCommand<TAnswer>
7086 private readonly _actionResponses ?: AiAgentActionResponse [ ] ;
7187 private readonly _options ?: AiConversationCreationOptions ;
7288 private readonly _changeVector ?: string ;
89+ private readonly _streamPropertyPath ?: string ;
90+ private readonly _streamCallback ?: AiStreamCallback ;
7391 private _raftId : string ;
7492
7593 public constructor (
@@ -79,7 +97,9 @@ class RunConversationCommand<TAnswer>
7997 actionResponses : AiAgentActionResponse [ ] | undefined ,
8098 options : AiConversationCreationOptions | undefined ,
8199 changeVector : string | undefined ,
82- conventions : DocumentConventions
100+ conventions : DocumentConventions ,
101+ streamPropertyPath ?: string ,
102+ streamCallback ?: AiStreamCallback
83103 ) {
84104 super ( ) ;
85105 this . _conversationId = conversationId ;
@@ -88,6 +108,13 @@ class RunConversationCommand<TAnswer>
88108 this . _actionResponses = actionResponses ;
89109 this . _options = options ;
90110 this . _changeVector = changeVector ;
111+ this . _streamPropertyPath = streamPropertyPath ;
112+ this . _streamCallback = streamCallback ;
113+
114+ // When streaming is enabled, we need to handle raw response
115+ if ( this . _streamPropertyPath && this . _streamCallback ) {
116+ this . _responseType = "Raw" ;
117+ }
91118
92119 if ( this . _conversationId && this . _conversationId . endsWith ( "|" ) ) {
93120 this . _raftId = RaftIdGenerator . newId ( ) ;
@@ -112,12 +139,17 @@ class RunConversationCommand<TAnswer>
112139 uriParams . append ( "changeVector" , this . _changeVector ) ;
113140 }
114141
142+ if ( this . _streamPropertyPath ) {
143+ uriParams . append ( "streaming" , "true" ) ;
144+ uriParams . append ( "streamPropertyPath" , this . _streamPropertyPath ) ;
145+ }
146+
115147 const uri = `${ node . url } /databases/${ node . database } /ai/agent?${ uriParams } ` ;
116148
117149 const bodyObj = {
118150 ActionResponses : this . _actionResponses ,
119151 UserPrompt : this . _prompt ,
120- CreationOptions : this . _options
152+ CreationOptions : this . _options ?? { }
121153 } ;
122154
123155 const headers = this . _headers ( ) . typeAppJson ( ) . build ( ) ;
@@ -144,6 +176,43 @@ class RunConversationCommand<TAnswer>
144176 this . _throwInvalidResponse ( ) ;
145177 }
146178
147- return this . _parseResponseDefaultAsync ( bodyStream )
179+ if ( this . _streamPropertyPath && this . _streamCallback ) {
180+ return await this . _processStreamingResponse ( bodyStream as Readable ) ;
181+ }
182+
183+ return await this . _parseResponseDefaultAsync ( bodyStream ) ;
184+ }
185+
186+ private async _processStreamingResponse ( bodyStream : Readable ) : Promise < string > {
187+ const rl = createInterface ( {
188+ input : bodyStream ,
189+ crlfDelay : Infinity
190+ } ) ;
191+
192+ for await ( const line of rl ) {
193+ if ( ! line || line . trim ( ) . length === 0 ) {
194+ continue ;
195+ }
196+
197+ if ( line . startsWith ( "{" ) ) {
198+ const jsonStream = Readable . from ( [ line ] ) ;
199+ let body : string = null ;
200+ this . result = await this . _defaultPipeline ( _ => body = _ ) . process ( jsonStream ) ;
201+ return body ;
202+ }
203+
204+ try {
205+ const unescaped = JSON . parse ( line ) ;
206+ await this . _streamCallback ! ( unescaped ) ;
207+ } catch ( err ) {
208+ await this . _streamCallback ! ( line ) ;
209+ }
210+ }
211+
212+ if ( ! this . result ) {
213+ throwError ( "InvalidOperationException" , "No final result received in streaming response" ) ;
214+ }
215+
216+ return null ;
148217 }
149218}
0 commit comments