Skip to content

Commit cd13f05

Browse files
committed
integration tests passing
1 parent 29c9812 commit cd13f05

File tree

6 files changed

+131
-119
lines changed

6 files changed

+131
-119
lines changed

package.json

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
"@element-plus/icons-vue": "^2.1.0",
2222
"@fontsource/roboto": "^5.0.0",
2323
"@highlightjs/vue-plugin": "^2.1.0",
24+
"@types/node-fetch": "^2.6.12",
2425
"ai": "^4.1.43",
2526
"axios": "^1.7.4",
2627
"cheerio": "^1.0.0",
@@ -35,6 +36,7 @@
3536
"json-editor-vue": "^0.17.3",
3637
"jsonwebtoken": "^9.0.2",
3738
"markdown-it": "^14.1.0",
39+
"node-fetch": "^2.6.7",
3840
"oauth": "^0.10.0",
3941
"obp-typescript": "^1.0.36",
4042
"pinia": "^2.0.37",

server/controllers/OpeyIIController.ts

+28-50
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
1-
import { streamText } from 'ai'
2-
import axios from 'axios'
31
import { Controller, Session, Req, Res, Post, Get } from 'routing-controllers'
42
import { Request, Response } from 'express'
3+
import { pipeline } from "node:stream/promises"
54
import { Service } from 'typedi'
65
import OBPClientService from '../services/OBPClientService'
76
import OpeyClientService from '../services/OpeyClientService'
8-
import { v6 as uuid6 } from 'uuid';
9-
import { Transform } from 'stream'
7+
108
import { UserInput } from '../schema/OpeySchema'
119

1210
@Service()
@@ -41,7 +39,7 @@ export class OpeyController {
4139
async streamOpey(
4240
@Session() session: any,
4341
@Req() request: Request,
44-
@Res() response: Response
42+
@Res() response: Response,
4543
) {
4644

4745
let user_input: UserInput
@@ -59,66 +57,46 @@ export class OpeyController {
5957

6058
console.log("Calling OpeyClientService.stream")
6159

62-
const streamMiddlewareTransform = new Transform({
63-
transform(chunk, encoding, callback) {
64-
console.log(`Logged Chunk: ${chunk}`)
65-
this.push(chunk);
60+
// const streamMiddlewareTransform = new Transform({
61+
// transform(chunk, encoding, callback) {
62+
// console.log(`Logged Chunk: ${chunk}`)
63+
// this.push(chunk);
6664

67-
callback();
68-
}
69-
})
65+
// callback();
66+
// }
67+
// })
7068

71-
let stream: ReadableStream | null = null
69+
let stream: NodeJS.ReadableStream | null = null
7270

7371
try {
7472
// Read stream from OpeyClientService
7573
stream = await this.opeyClientService.stream(user_input)
76-
console.debug(`Stream received readable: ${stream}`)
74+
console.debug(`Stream received readable: ${stream?.readable}`)
7775

7876
} catch (error) {
7977
console.error("Error reading stream: ", error)
80-
response.status(500).json({ error: 'Internal Server Error' })
81-
return
78+
return response.status(500).json({ error: 'Internal Server Error' })
8279
}
8380

84-
if (!stream) {
85-
console.error("Stream is not readable")
86-
response.status(500).json({ error: 'Internal Server Error' })
87-
return
81+
if (!stream || !stream.readable) {
82+
console.error("Stream is not recieved or not readable")
83+
return response.status(500).json({ error: 'Internal Server Error' })
8884
}
8985

90-
try {
91-
// response.writeHead(200, {
92-
// 'Content-Type': "text/event-stream",
93-
// 'Cache-Control': "no-cache",
94-
// 'Connection': "keep-alive"
95-
// });
96-
97-
response.setHeader('Content-Type', 'text/event-stream')
98-
response.setHeader('Cache-Control', 'no-cache')
99-
response.setHeader('Connection', 'keep-alive')
100-
101-
let data: any[] = []
86+
return new Promise<Response>((resolve, reject) => {
87+
stream.pipe(response)
88+
stream.on('end', () => {
89+
response.status(200)
90+
resolve(response)
91+
})
92+
stream.on('error', (error) => {
93+
console.error("Error piping stream: ", error)
94+
reject(error)
95+
})
10296

103-
const streamReader = stream.getReader()
104-
console.log("Got stream reader: ", streamReader)
105-
106-
streamReader
107-
.read()
108-
.then(function processText({ done, value }) {
109-
if (done) {
110-
console.log("Stream done")
111-
return response.status(200).json(data)
112-
}
113-
console.log("Stream value: ", value)
114-
data.push(value)
115-
response.write(`data: ${value}\n\n`)
116-
})
97+
})
11798

118-
} catch (error) {
119-
console.error("Error writing data: ", error)
120-
response.status(500).json({ error: 'Internal Server Error' })
121-
}
99+
122100
}
123101

124102
@Post('/invoke')

server/services/OpeyClientService.ts

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { Service } from 'typedi'
22
import { UserInput, StreamInput, OpeyConfig, AuthConfig } from '../schema/OpeySchema'
3+
import { Readable } from "stream"
4+
import fetch from 'node-fetch'
35

46
@Service()
57
export default class OpeyClientService {
@@ -46,7 +48,7 @@ export default class OpeyClientService {
4648
}
4749
}
4850

49-
async stream(user_input: UserInput): Promise<ReadableStream> {
51+
async stream(user_input: UserInput): Promise<any> {
5052
// Endpoint to post a message to Opey and stream the response tokens/messages
5153
try {
5254

@@ -68,6 +70,9 @@ export default class OpeyClientService {
6870
if (!response.body) {
6971
throw new Error("No response body")
7072
}
73+
74+
console.log("Got response body: ", response.body) //DEBUG
75+
7176
return response.body
7277
}
7378
catch (error) {

tests/opey-controller.test.ts

+26-21
Original file line numberDiff line numberDiff line change
@@ -111,15 +111,15 @@ describe('OpeyController', () => {
111111
expect(res.statusCode).toBe(200);
112112
})
113113

114-
it('streamOpey', () => {
114+
it('streamOpey', async () => {
115115

116116
const _eventEmitter = new EventEmitter();
117117
_eventEmitter.addListener('data', () => {
118118
console.log('Data received')
119119
})
120120
// The default event emitter does nothing, so replace
121-
const res = httpMocks.createResponse({
122-
eventEmitter: _eventEmitter,
121+
const res = await httpMocks.createResponse({
122+
eventEmitter: EventEmitter,
123123
writableStream: Stream.Writable
124124
});
125125

@@ -132,27 +132,32 @@ describe('OpeyController', () => {
132132
} as unknown as Request;
133133

134134
// Define handelrs for events
135-
res.on('end', () => {
136-
console.log('Stream ended')
137-
console.log(res._getData())
138-
expect(res.statusCode).toBe(200);
139-
})
135+
140136

137+
141138
let chunks: any[] = [];
142-
res.on('data', (chunk) => {
143-
console.log(chunk)
144-
chunks.push(chunk);
145-
expect(chunk).toBeDefined();
146-
})
147-
148-
opeyController.streamOpey({}, req, res)
149-
.then((res) => {
150-
console.log(res)
151-
})
139+
try {
140+
const response = await opeyController.streamOpey({}, req, res)
141+
142+
response.on('end', async () => {
143+
console.log('Stream ended')
144+
console.log(res._getData())
145+
await expect(res.statusCode).toBe(200);
146+
})
147+
148+
response.on('data', async (chunk) => {
149+
console.log(chunk)
150+
await chunks.push(chunk);
151+
await expect(chunk).toBeDefined();
152+
})
153+
} catch (error) {
154+
console.error(error)
155+
}
156+
152157

153-
expect(chunks.length).toBe(10);
154-
expect(MockOpeyClientService.stream).toHaveBeenCalled();
155-
expect(res).toBeDefined();
158+
await expect(chunks.length).toBe(10);
159+
await expect(MockOpeyClientService.stream).toHaveBeenCalled();
160+
await expect(res).toBeDefined();
156161

157162
})
158163
})

tests/opey.test.ts

+36-47
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import http from 'node:http';
44
import { UserInput } from '../server/schema/OpeySchema';
55
import {v4 as uuidv4} from 'uuid';
66
import { agent } from "superagent";
7+
import fetch from 'node-fetch';
78

89

910
const BEFORE_ALL_TIMEOUT = 30000; // 30 sec
@@ -51,76 +52,64 @@ describe('GET /api/opey/invoke', () => {
5152

5253
describe('POST /api/opey/stream', () => {
5354

54-
let streamingResponse;
55+
let data: Array<string> = [];
56+
let res;
5557

5658
let userInput: UserInput = {
5759
message: "Hello Opey",
5860
thread_id: uuidv4(),
5961
is_tool_call_approval: false
6062
}
6163

62-
const httpAgent = new http.Agent({ keepAlive: true, port: 9999 });
6364

6465
beforeAll(async () => {
6566
app.listen(5173)
66-
67-
try {
68-
streamingResponse = await fetch(`${SERVER_URL}/api/opey/stream`, {
69-
method: 'POST',
70-
headers: {
71-
'Content-Type': 'application/json',
72-
'connection': 'keep-alive'
73-
},
74-
body: JSON.stringify(userInput),
75-
})
76-
} catch (error) {
77-
console.error(`Error getting stream: ${error}`)
78-
}
79-
8067
});
8168

8269
afterAll(async () => {
8370
instance.close()
84-
httpAgent.destroy()
8571
});
8672

8773
it
8874

8975
it('Should stream response', async () => {
90-
9176

92-
77+
try {
78+
const response = await fetch(`${SERVER_URL}/api/opey/stream`, {
79+
method: 'POST',
80+
headers: {
81+
'Content-Type': 'application/json',
82+
'connection': 'keep-alive'
83+
},
84+
body: JSON.stringify(userInput),
85+
});
9386

94-
// const response = await request(app)
95-
// .post("/api/opey/stream")
96-
// .set('Content-Type', 'text/event-stream')
97-
// .responseType('blob')
98-
// .send(userInput)
87+
console.log(`Response in test: ${response.body}`)
88+
const stream = response.body
89+
90+
stream.on('data', (chunk) => {
91+
console.log(`chunk: ${chunk}`)
92+
// check if chunk is not empty
93+
expect(chunk).toBeTruthy()
94+
})
95+
stream.on('end', () => {
96+
console.log('Stream ended')
97+
})
98+
stream.on('error', (error) => {
99+
console.error(`Error in stream: ${error}`)
100+
})
101+
102+
res = response;
103+
104+
await expect(res.status).toBe(200)
105+
106+
} catch (error) {
107+
console.error(`Error fetching stream from test: ${error}`)
108+
}
99109

100-
expect(streamingResponse.status).toBe(200)
101-
102-
streamingResponse.body.on('data', (chunk) => {
103-
console.log(`${chunk}`)
104-
})
105-
// response.on
106-
// console.log(response.body)
107-
// const readable = response.body
108-
// readable.on('data', (chunk) => {
109-
// const data = chunk.toString()
110-
// console.log(`data: ${data}`)
111-
// })
112-
113-
114110

115111

116-
// while (true) {
117-
// const {value, done} = await reader.read();
118-
// if (done) break;
119-
// console.log('Received', value);
120-
// }
121-
122-
// expect(response.headers['content-type']).toBe('text/event-stream')
123-
// expect(response.status).toBe(200)
124-
// Optionally, parse chunks or check SSE headers
112+
113+
125114
})
126115
});

0 commit comments

Comments
 (0)