1
1
import { Controller , Session , Req , Res , Post , Get } from 'routing-controllers'
2
2
import { Request , Response } from 'express'
3
- import { pipeline } from "node:stream/promises"
3
+ import { Transform , pipeline , Readable } from "node:stream"
4
+ import { ReadableStream as WebReadableStream } from "stream/web"
4
5
import { Service } from 'typedi'
5
6
import OBPClientService from '../services/OBPClientService'
6
7
import OpeyClientService from '../services/OpeyClientService'
7
8
8
9
import { UserInput } from '../schema/OpeySchema'
10
+ import { APIApi , Configuration , ConsentApi , ConsumerConsentrequestsBody , InlineResponse20151 } from 'obp-api-typescript'
9
11
10
12
@Service ( )
11
13
@Controller ( '/opey' )
@@ -42,9 +44,11 @@ export class OpeyController {
42
44
@Res ( ) response : Response ,
43
45
) {
44
46
47
+ // Read user input from request body
45
48
let user_input : UserInput
46
49
try {
47
- user_input = {
50
+ console . log ( "Request body: " , request . body )
51
+ user_input = {
48
52
"message" : request . body . message ,
49
53
"thread_id" : request . body . thread_id ,
50
54
"is_tool_call_approval" : request . body . is_tool_call_approval
@@ -54,45 +58,73 @@ export class OpeyController {
54
58
return response . status ( 500 ) . json ( { error : 'Internal Server Error' } )
55
59
}
56
60
57
-
58
- console . log ( "Calling OpeyClientService.stream" )
59
61
60
- // const streamMiddlewareTransform = new Transform({
61
- // transform(chunk, encoding, callback) {
62
- // console.log(`Logged Chunk: ${chunk}`)
63
- // this.push(chunk);
64
-
65
- // callback();
66
- // }
67
- // })
62
+ // Define a function to transform the response from Opey (which is a text stream) into a TS-Native langchain stream
63
+ const frontendTransformer = new TransformStream ( {
64
+ transform ( chunk , controller ) {
65
+ // Decode the chunk to a string
66
+ const decodedChunk = new TextDecoder ( ) . decode ( chunk )
67
+
68
+ console . log ( "Sending chunk" , decodedChunk )
69
+ controller . enqueue ( decodedChunk ) ;
70
+ } ,
71
+ flush ( controller ) {
72
+ console . log ( '[flush]' ) ;
73
+ // Close ReadableStream when done
74
+ controller . terminate ( ) ;
75
+ } ,
76
+ } ) ;
77
+
68
78
69
- let stream : NodeJS . ReadableStream | null = null
79
+ let stream : ReadableStream | null = null
70
80
71
81
try {
72
- // Read stream from OpeyClientService
82
+ // Read web stream from OpeyClientService
83
+ console . log ( "Calling OpeyClientService.stream" )
73
84
stream = await this . opeyClientService . stream ( user_input )
74
- console . debug ( `Stream received readable: ${ stream ?. readable } ` )
75
85
76
86
} catch ( error ) {
77
87
console . error ( "Error reading stream: " , error )
78
88
return response . status ( 500 ) . json ( { error : 'Internal Server Error' } )
79
89
}
80
90
81
- if ( ! stream || ! stream . readable ) {
91
+ if ( ! stream ) {
82
92
console . error ( "Stream is not recieved or not readable" )
83
93
return response . status ( 500 ) . json ( { error : 'Internal Server Error' } )
84
94
}
85
95
96
+
97
+ // Transform our stream if needed, right now this is just a passthrough
98
+ const frontendStream : ReadableStream = stream . pipeThrough ( frontendTransformer )
99
+
100
+ // If we need to split the stream into two, we can use the tee method as below
101
+
102
+ // const streamTee = langchainStream.tee()
103
+ // if (!streamTee) {
104
+ // console.error("Stream is not tee'd")
105
+ // return response.status(500).json({ error: 'Internal Server Error' })
106
+ // }
107
+ // const [stream1, stream2] = streamTee
108
+
109
+
110
+
111
+ const nodeStream = Readable . fromWeb ( frontendStream as WebReadableStream < any > )
112
+
113
+ response . setHeader ( 'x-vercel-ai-data-stream' , 'v1' )
114
+ response . setHeader ( 'Content-Type' , 'text/event-stream' ) ;
115
+ response . setHeader ( 'Cache-Control' , 'no-cache' ) ;
116
+ response . setHeader ( 'Connection' , 'keep-alive' ) ;
117
+ nodeStream . pipe ( response ) ;
118
+
119
+
86
120
return new Promise < Response > ( ( resolve , reject ) => {
87
- stream . pipe ( response )
88
- stream . on ( 'end' , ( ) => {
89
- response . status ( 200 )
90
- resolve ( response )
91
- } )
92
- stream . on ( 'error' , ( error ) => {
93
- console . error ( "Error piping stream: " , error )
94
- reject ( error )
95
- } )
121
+ nodeStream . on ( 'end' , ( ) => {
122
+ resolve ( response ) ;
123
+ } ) ;
124
+ nodeStream . on ( 'error' , ( error ) => {
125
+ console . error ( 'Stream error:' , error ) ;
126
+ reject ( error ) ;
127
+ } ) ;
96
128
97
129
} )
98
130
@@ -129,6 +161,67 @@ export class OpeyController {
129
161
}
130
162
}
131
163
164
+ @Post ( '/consent/request' )
165
+ /**
166
+ * Retrieves a consent request from OBP
167
+ *
168
+ */
169
+ async getConsentRequest (
170
+ @Session ( ) session : any ,
171
+ @Req ( ) request : Request ,
172
+ @Res ( ) response : Response ,
173
+ ) : Promise < Response | any > {
174
+ try {
175
+
176
+ let obpToken : string
177
+
178
+ obpToken = await this . obpClientService . getDirectLoginToken ( )
179
+ console . log ( "Got token: " , obpToken )
180
+ const authHeader = `DirectLogin token="${ obpToken } "`
181
+ console . log ( "Auth header: " , authHeader )
182
+
183
+ const obpOAuthHeaders = await this . obpClientService . getOAuthHeader ( '/consents' , 'POST' )
184
+ console . log ( "OBP OAuth Headers: " , obpOAuthHeaders )
185
+
186
+ const obpConfig : Configuration = {
187
+ apiKey : authHeader ,
188
+ basePath : process . env . VITE_OBP_API_HOST ,
189
+ }
190
+
191
+ console . log ( "OBP Config: " , obpConfig )
192
+
193
+ const consentAPI = new ConsentApi ( obpConfig , process . env . VITE_OBP_API_HOST )
194
+
195
+
196
+ // OBP sdk naming is a bit mad, can be rectified in the future
197
+ const consentRequestResponse = await consentAPI . oBPv500CreateConsentRequest ( {
198
+ accountAccess : [ ] ,
199
+ everything : false ,
200
+ entitlements : [ ] ,
201
+ consumerId : '' ,
202
+ } as unknown as ConsumerConsentrequestsBody ,
203
+ {
204
+ headers : {
205
+ 'Content-Type' : 'application/json' ,
206
+ } ,
207
+ }
208
+ )
209
+
210
+ //console.log("Consent request response: ", consentRequestResponse)
211
+
212
+ console . log ( { consentId : consentRequestResponse . data . consent_request_id } )
213
+ session [ 'obpConsentRequestId' ] = consentRequestResponse . data . consent_request_id
214
+
215
+ return response . status ( 200 ) . json ( JSON . stringify ( { consentId : consentRequestResponse . data . consent_request_id } ) )
216
+ //console.log(await response.body.json())
217
+
218
+
219
+ } catch ( error ) {
220
+ console . error ( "Error in consent/request endpoint: " , error ) ;
221
+ return response . status ( 500 ) . json ( { error : 'Internal Server Error' } ) ;
222
+ }
223
+ }
224
+
132
225
@Post ( '/consent' )
133
226
/**
134
227
* Retrieves a consent from OBP for the current user
0 commit comments