feat(Stream): implement SSE streaming support for chat completions

This commit is contained in:
Kyush 2026-04-02 18:37:12 +09:00
commit 882e2c9bbc
4 changed files with 484 additions and 9 deletions

View file

@ -107,6 +107,145 @@ router.post('/chat/completions', async (req: AuthenticatedRequest, res: Response
logger.warn(`Script warnings for user ${user.id}: ${requestErrors.join('; ')}`);
}
// Stream path: pipe SSE response directly to client
if (modifiedContext.request.body && typeof modifiedContext.request.body === 'object' && 'stream' in modifiedContext.request.body && modifiedContext.request.body.stream === true) {
const streamResult = await RouterService.forwardStreamRequest(
backend,
'/v1/chat/completions',
'POST',
modifiedContext.request.headers,
modifiedContext.request.body
);
// Network error — return JSON error
if (!('response' in streamResult)) {
const responseTime = Date.now() - startTime;
AnalyticsService.logRequest({
user_id: user.id,
backend_id: backend.id,
endpoint: '/v1/chat/completions',
request_model: model,
routed_model: resolution.routedModel,
status_code: streamResult.status,
response_time_ms: responseTime,
error_message: JSON.stringify(streamResult.data),
detail_logged: detailLoggingEnabled,
request_headers: detailLoggingEnabled ? modifiedContext.request.headers : undefined,
request_body: detailLoggingEnabled ? modifiedContext.request.body : undefined,
local_date: undefined,
});
logger.error(`Backend error for user ${user.id} (stream): ${JSON.stringify(streamResult.data)}`);
void ModelCatalogService.refreshBackendAfterFailure(backend.id);
res.status(streamResult.status).json(streamResult.data);
return;
}
const backendResponse = streamResult.response;
const backendResponseHeaders = Object.fromEntries(backendResponse.headers.entries());
// Backend returned non-SSE response (e.g. JSON error)
if (!backendResponse.headers.get('content-type')?.includes('text/event-stream')) {
const data = await backendResponse.json().catch(() => ({}));
const responseTime = Date.now() - startTime;
AnalyticsService.logRequest({
user_id: user.id,
backend_id: backend.id,
endpoint: '/v1/chat/completions',
request_model: model,
routed_model: resolution.routedModel,
status_code: backendResponse.status,
response_time_ms: responseTime,
error_message: backendResponse.status >= 400 ? JSON.stringify(data) : undefined,
detail_logged: detailLoggingEnabled,
request_headers: detailLoggingEnabled ? modifiedContext.request.headers : undefined,
request_body: detailLoggingEnabled ? modifiedContext.request.body : undefined,
response_headers: detailLoggingEnabled ? backendResponseHeaders : undefined,
response_body: detailLoggingEnabled ? data : undefined,
local_date: undefined,
});
if (backendResponse.status >= 400) {
void ModelCatalogService.refreshBackendAfterFailure(backend.id);
}
res.status(backendResponse.status).json(data);
return;
}
// onResponse scripts (body not available for streams)
await ScriptEngine.applyOnResponseScripts(
execContext,
{ status: backendResponse.status, headers: backendResponseHeaders, body: null, isStream: true },
user.id,
backend.id
);
// Set SSE headers and start piping
res.status(backendResponse.status);
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders();
const reader = backendResponse.body!.getReader();
const decoder = new TextDecoder();
req.on('close', () => reader.cancel());
let responseModel: string | undefined;
let usage: { prompt_tokens?: number; completion_tokens?: number; total_tokens?: number } | undefined;
const collectedChunks: string[] = [];
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
res.write(value);
// Parse SSE chunks for model and usage metadata
const text = decoder.decode(value, { stream: true });
if (detailLoggingEnabled) collectedChunks.push(text);
for (const line of text.split('\n')) {
if (!line.startsWith('data: ') || line === 'data: [DONE]') continue;
try {
const parsed = JSON.parse(line.slice(6));
if (parsed.model && !responseModel) responseModel = parsed.model;
if (parsed.usage) usage = parsed.usage;
} catch { /* non-JSON data line, skip */ }
}
}
} catch (err) {
logger.error(`Stream interrupted for user ${user.id}: ${err instanceof Error ? err.message : String(err)}`);
} finally {
res.end();
}
const responseTime = Date.now() - startTime;
AnalyticsService.logRequest({
user_id: user.id,
backend_id: backend.id,
endpoint: '/v1/chat/completions',
request_model: model,
routed_model: resolution.routedModel,
response_model: responseModel,
prompt_tokens: usage?.prompt_tokens,
completion_tokens: usage?.completion_tokens,
total_tokens: usage?.total_tokens,
status_code: backendResponse.status,
response_time_ms: responseTime,
detail_logged: detailLoggingEnabled,
request_headers: detailLoggingEnabled ? modifiedContext.request.headers : undefined,
request_body: detailLoggingEnabled ? modifiedContext.request.body : undefined,
response_headers: detailLoggingEnabled ? backendResponseHeaders : undefined,
response_body: detailLoggingEnabled ? collectedChunks.join('') : undefined,
local_date: undefined,
});
if (backendResponse.status >= 400) {
void ModelCatalogService.refreshBackendAfterFailure(backend.id);
}
return;
}
// Non-stream path: buffer and return JSON (unchanged)
const response = await RouterService.forwardRequest(
backend,
'/v1/chat/completions',
@ -121,7 +260,7 @@ router.post('/chat/completions', async (req: AuthenticatedRequest, res: Response
status: response.status,
headers: response.headers,
body: response.data,
isStream: req.body.stream === true,
isStream: false,
};
await ScriptEngine.applyOnResponseScripts(

View file

@ -35,18 +35,18 @@ export class RouterService {
return backends[roundRobinIndex];
}
static async forwardRequest(
private static async rawFetch(
backend: Backend,
path: string,
method: string,
headers: Record<string, string>,
body?: unknown
): Promise<{ status: number; data: unknown; headers: Record<string, string> }> {
): Promise<Response> {
let backendPath = path;
if (backend.base_url.includes('/v1')) {
backendPath = path.replace(/^\/v1/, '');
}
const backendUrl = backend.base_url.replace(/\/$/, '') + backendPath;
const fetchHeaders: Record<string, string> = {
@ -71,12 +71,22 @@ export class RouterService {
fetchHeaders['Authorization'] = `Bearer ${backend.api_key}`;
}
return fetch(backendUrl, {
method,
headers: fetchHeaders,
body: preparedBody,
});
}
static async forwardRequest(
backend: Backend,
path: string,
method: string,
headers: Record<string, string>,
body?: unknown
): Promise<{ status: number; data: unknown; headers: Record<string, string> }> {
try {
const response = await fetch(backendUrl, {
method,
headers: fetchHeaders,
body: preparedBody,
});
const response = await this.rawFetch(backend, path, method, headers, body);
const data = await response.json().catch(() => ({}));
const responseHeaders = Object.fromEntries(response.headers.entries());
@ -143,4 +153,72 @@ export class RouterService {
};
}
}
static async forwardStreamRequest(
backend: Backend,
path: string,
method: string,
headers: Record<string, string>,
body?: unknown
): Promise<
| { response: Response }
| { status: number; data: unknown; headers: Record<string, string> }
> {
try {
const response = await this.rawFetch(backend, path, method, headers, body);
return { response };
} catch (error) {
const errorMsg = error instanceof Error ? error.message : 'Unknown error';
let cause: string | undefined;
let errorType: string;
if (error instanceof Error && error.cause) {
const causeError = error.cause as any;
const causeCode = causeError.code || causeError.errno;
const causeSyscall = causeError.syscall;
const causeAddress = causeError.address || causeError.hostname;
const causePort = causeError.port;
if (causeCode === 'ECONNREFUSED') {
errorType = 'Backend connection refused';
cause = causeAddress && causePort
? `Backend server at ${causeAddress}:${causePort} is not accepting connections`
: 'Backend server is not accepting connections';
} else if (causeCode === 'ETIMEDOUT' || causeCode === 'ECONNABORTED') {
errorType = 'Backend request timeout';
cause = 'Connection to backend timed out';
} else if (causeCode === 'ENOTFOUND') {
errorType = 'Backend unreachable';
cause = causeAddress ? `Could not resolve hostname: ${causeAddress}` : 'Could not resolve backend hostname';
} else if (causeCode === 'EPIPE' || causeSyscall === 'write') {
errorType = 'Backend connection lost';
cause = causeSyscall ? `Connection broken during ${causeSyscall} operation` : 'Connection broken during operation';
} else {
errorType = 'Backend connection error';
cause = `${causeCode || 'Unknown error'} during ${causeSyscall || 'connection'}`;
}
} else if (errorMsg.includes('ETIMEDOUT') || errorMsg.includes('ECONNABORTED')) {
errorType = 'Backend request timeout';
cause = 'Connection timed out after 30s';
} else if (errorMsg.includes('aborted')) {
errorType = 'Request aborted';
cause = 'Request was aborted before completion';
} else {
errorType = 'Failed to forward request to backend';
cause = errorMsg;
}
return {
status: 502,
data: {
error: errorType,
cause: cause,
backend: backend.base_url,
path: path,
},
headers: {},
};
}
}
}

View file

@ -0,0 +1,240 @@
import { describe, it, expect, beforeAll, afterAll, afterEach } from 'vitest';
import request from 'supertest';
import http from 'http';
import { createTestApp } from '../utils/testApp';
import { createMockBackend } from '../utils/mockBackend';
import { initDb } from '../../src/config/database';
import { createAdminClient } from '../utils/adminClient';
/**
* These tests verify that the router correctly proxies SSE streaming responses
* from backends when the client sends `stream: true`.
*
* Current expectation: these tests FAIL because RouterService.forwardRequest()
* calls response.json() which buffers the entire response and breaks SSE.
*/
describe('Streaming Response Proxying', () => {
let app: ReturnType<typeof createTestApp>;
let admin: Awaited<ReturnType<typeof createAdminClient>>;
let mockServer: any;
const sampleStreamChunks = [
JSON.stringify({
id: 'chatcmpl-stream-1',
object: 'chat.completion.chunk',
model: 'mock-model',
choices: [{ index: 0, delta: { role: 'assistant', content: 'Hello' }, finish_reason: null }],
}),
JSON.stringify({
id: 'chatcmpl-stream-1',
object: 'chat.completion.chunk',
model: 'mock-model',
choices: [{ index: 0, delta: { content: ' world' }, finish_reason: null }],
}),
JSON.stringify({
id: 'chatcmpl-stream-1',
object: 'chat.completion.chunk',
model: 'mock-model',
choices: [{ index: 0, delta: {}, finish_reason: 'stop' }],
}),
];
beforeAll(() => {
initDb();
app = createTestApp();
});
beforeAll(async () => {
admin = await createAdminClient(app);
// Deactivate all existing backends
const allBackendsResponse = await admin.get('/admin/backends');
for (const backend of allBackendsResponse.body) {
if (backend.is_active) {
await admin.put(`/admin/backends/${backend.id}`).send({ is_active: false });
}
}
});
afterEach(async () => {
if (mockServer) {
await new Promise<void>(resolve => mockServer.close(resolve));
mockServer = undefined;
}
});
afterAll(async () => {
// Re-activate all backends
const allBackendsResponse = await admin.get('/admin/backends');
for (const backend of allBackendsResponse.body) {
if (!backend.is_active) {
await admin.put(`/admin/backends/${backend.id}`).send({ is_active: true });
}
}
});
async function setupUserAndBackend(mockPort: number) {
// Deactivate all existing backends to ensure only our mock backend is selected
const allBackendsResponse = await admin.get('/admin/backends');
for (const backend of allBackendsResponse.body) {
if (backend.is_active) {
await admin.put(`/admin/backends/${backend.id}`).send({ is_active: false });
}
}
const userResponse = await admin.post('/admin/users').send({ name: `Stream Test User ${Date.now()}` });
const userApiKey = userResponse.body.api_key;
const userId = userResponse.body.id;
const backendResponse = await admin.post('/admin/backends').send({
name: `Stream Backend ${Date.now()}`,
base_url: `http://localhost:${mockPort}`,
});
const backendId = backendResponse.body.id;
await admin.post('/admin/permissions').send({ user_id: userId, backend_id: backendId });
return { userApiKey, userId, backendId };
}
it('should return Content-Type text/event-stream for stream requests', async () => {
const { server, port } = createMockBackend({
streamChunks: sampleStreamChunks,
modelsResponse: [{ id: 'mock-model', object: 'model' }],
});
mockServer = server;
const { userApiKey } = await setupUserAndBackend(port);
const response = await request(app)
.post('/v1/chat/completions')
.set('Authorization', `Bearer ${userApiKey}`)
.send({
model: 'mock-model',
messages: [{ role: 'user', content: 'Hello' }],
stream: true,
});
// The router should forward the SSE content-type from the backend
expect(response.headers['content-type']).toMatch(/text\/event-stream/);
});
it('should forward all SSE chunks from backend to client', async () => {
const { server, port } = createMockBackend({
streamChunks: sampleStreamChunks,
modelsResponse: [{ id: 'mock-model', object: 'model' }],
});
mockServer = server;
const { userApiKey } = await setupUserAndBackend(port);
const response = await request(app)
.post('/v1/chat/completions')
.set('Authorization', `Bearer ${userApiKey}`)
.send({
model: 'mock-model',
messages: [{ role: 'user', content: 'Hello' }],
stream: true,
});
const body = response.text;
// Should contain all three data chunks plus [DONE]
const dataLines = body.split('\n').filter((line: string) => line.startsWith('data: '));
expect(dataLines.length).toBe(4); // 3 chunks + [DONE]
// Verify first chunk
const firstChunk = JSON.parse(dataLines[0].replace('data: ', ''));
expect(firstChunk.choices[0].delta.content).toBe('Hello');
// Verify second chunk
const secondChunk = JSON.parse(dataLines[1].replace('data: ', ''));
expect(secondChunk.choices[0].delta.content).toBe(' world');
// Verify last data line is [DONE]
expect(dataLines[3]).toBe('data: [DONE]');
});
it('should not buffer the stream into a JSON response', async () => {
const { server, port } = createMockBackend({
streamChunks: sampleStreamChunks,
modelsResponse: [{ id: 'mock-model', object: 'model' }],
});
mockServer = server;
const { userApiKey } = await setupUserAndBackend(port);
const response = await request(app)
.post('/v1/chat/completions')
.set('Authorization', `Bearer ${userApiKey}`)
.send({
model: 'mock-model',
messages: [{ role: 'user', content: 'Hello' }],
stream: true,
});
// The response should NOT be a JSON object (which is what happens when
// RouterService buffers with response.json())
expect(response.headers['content-type']).not.toMatch(/application\/json/);
// The response text should contain SSE data lines, not be empty
expect(response.text).toContain('data: ');
});
it('should still return JSON for non-stream requests', async () => {
const { server, port } = createMockBackend({
streamChunks: sampleStreamChunks,
chatResponse: {
id: 'non-stream-1',
model: 'mock-model',
choices: [{ index: 0, message: { role: 'assistant', content: 'Hello' }, finish_reason: 'stop' }],
usage: { prompt_tokens: 10, completion_tokens: 5, total_tokens: 15 },
},
modelsResponse: [{ id: 'mock-model', object: 'model' }],
});
mockServer = server;
const { userApiKey } = await setupUserAndBackend(port);
const response = await request(app)
.post('/v1/chat/completions')
.set('Authorization', `Bearer ${userApiKey}`)
.send({
model: 'mock-model',
messages: [{ role: 'user', content: 'Hello' }],
// stream is not set (or false)
});
expect(response.headers['content-type']).toMatch(/application\/json/);
expect(response.body.id).toBe('non-stream-1');
expect(response.body.choices).toHaveLength(1);
});
it('should pass stream flag through to the backend', async () => {
let receivedBody: any;
const { server, port } = createMockBackend({
streamChunks: sampleStreamChunks,
modelsResponse: [{ id: 'mock-model', object: 'model' }],
onRequest: (req) => {
receivedBody = req.body;
},
});
mockServer = server;
const { userApiKey } = await setupUserAndBackend(port);
const response = await request(app)
.post('/v1/chat/completions')
.set('Authorization', `Bearer ${userApiKey}`)
.send({
model: 'mock-model',
messages: [{ role: 'user', content: 'Hello' }],
stream: true,
});
// Ensure the request actually reached the mock backend (not a 502)
expect(response.status).toBe(200);
expect(receivedBody).toBeDefined();
expect(receivedBody.stream).toBe(true);
});
});

View file

@ -13,6 +13,8 @@ export interface MockBackendOptions {
}>;
usage: { prompt_tokens: number; completion_tokens: number; total_tokens: number };
}>;
/** SSE stream chunks to send when request has stream: true. Each string becomes one SSE data line. */
streamChunks?: string[];
modelsResponse?: Array<{ id: string; object: string }>;
}
@ -26,6 +28,7 @@ export function createMockBackend(options: MockBackendOptions = {}) {
choices: [{ index: 0, message: { role: 'assistant', content: 'Hello' }, finish_reason: 'stop' }],
usage: { prompt_tokens: 10, completion_tokens: 20, total_tokens: 30 }
},
streamChunks,
modelsResponse = [{ id: 'mock-model', object: 'model' }]
} = options;
@ -34,6 +37,21 @@ export function createMockBackend(options: MockBackendOptions = {}) {
app.post('/v1/chat/completions', (req, res) => {
onRequest?.(req);
if (req.body.stream === true && streamChunks) {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders();
for (const chunk of streamChunks) {
res.write(`data: ${chunk}\n\n`);
}
res.write('data: [DONE]\n\n');
res.end();
return;
}
res.json(chatResponse);
});