Skip to content

Commit 0e0d2af

Browse files
authored
ai/core: add pipeTextStreamToResponse helper to streamText. (#1442)
1 parent 5189802 commit 0e0d2af

File tree

3 files changed

+130
-2
lines changed

3 files changed

+130
-2
lines changed

‎.changeset/small-islands-melt.md‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'ai': patch
3+
---
4+
5+
ai/core: add pipeTextStreamToResponse helper to streamText.

‎packages/core/core/generate-text/stream-text.test.ts‎

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ describe('result.toAIStream', () => {
269269
});
270270

271271
describe('result.pipeAIStreamToResponse', async () => {
272-
it('should write text deltas to a Node.js response-like object', async () => {
272+
it('should write data stream parts to a Node.js response-like object', async () => {
273273
const mockResponse = createMockServerResponse();
274274

275275
const result = await experimental_streamText({
@@ -315,6 +315,92 @@ describe('result.pipeAIStreamToResponse', async () => {
315315
});
316316
});
317317

318+
describe('result.pipeTextStreamToResponse', async () => {
319+
it('should write text deltas to a Node.js response-like object', async () => {
320+
const mockResponse = createMockServerResponse();
321+
322+
const result = await experimental_streamText({
323+
model: new MockLanguageModelV1({
324+
doStream: async () => {
325+
return {
326+
stream: convertArrayToReadableStream([
327+
{ type: 'text-delta', textDelta: 'Hello' },
328+
{ type: 'text-delta', textDelta: ', ' },
329+
{ type: 'text-delta', textDelta: 'world!' },
330+
]),
331+
rawCall: { rawPrompt: 'prompt', rawSettings: {} },
332+
};
333+
},
334+
}),
335+
prompt: 'test-input',
336+
});
337+
338+
result.pipeTextStreamToResponse(mockResponse);
339+
340+
// Wait for the stream to finish writing to the mock response
341+
await new Promise(resolve => {
342+
const checkIfEnded = () => {
343+
if (mockResponse.ended) {
344+
resolve(undefined);
345+
} else {
346+
setImmediate(checkIfEnded);
347+
}
348+
};
349+
checkIfEnded();
350+
});
351+
352+
const decoder = new TextDecoder();
353+
354+
assert.strictEqual(mockResponse.statusCode, 200);
355+
assert.deepStrictEqual(mockResponse.headers, {
356+
'Content-Type': 'text/plain; charset=utf-8',
357+
});
358+
assert.deepStrictEqual(
359+
mockResponse.writtenChunks.map(chunk => decoder.decode(chunk)),
360+
['Hello', ', ', 'world!'],
361+
);
362+
});
363+
});
364+
365+
describe('result.toAIStreamResponse', () => {
366+
it('should create a Response with a stream data stream', async () => {
367+
const result = await experimental_streamText({
368+
model: new MockLanguageModelV1({
369+
doStream: async ({ prompt, mode }) => {
370+
return {
371+
stream: convertArrayToReadableStream([
372+
{ type: 'text-delta', textDelta: 'Hello' },
373+
{ type: 'text-delta', textDelta: ', ' },
374+
{ type: 'text-delta', textDelta: 'world!' },
375+
]),
376+
rawCall: { rawPrompt: 'prompt', rawSettings: {} },
377+
};
378+
},
379+
}),
380+
prompt: 'test-input',
381+
});
382+
383+
const response = result.toAIStreamResponse();
384+
385+
assert.strictEqual(response.status, 200);
386+
assert.strictEqual(
387+
response.headers.get('Content-Type'),
388+
'text/plain; charset=utf-8',
389+
);
390+
391+
// Read the chunks into an array
392+
const reader = response.body!.getReader();
393+
const chunks = [];
394+
while (true) {
395+
const { value, done } = await reader.read();
396+
if (done) break;
397+
chunks.push(new TextDecoder().decode(value));
398+
}
399+
400+
assert.deepStrictEqual(chunks, ['0:"Hello"\n', '0:", "\n', '0:"world!"\n']);
401+
});
402+
});
403+
318404
describe('result.toTextStreamResponse', () => {
319405
it('should create a Response with a text stream', async () => {
320406
const result = await experimental_streamText({

‎packages/core/core/generate-text/stream-text.ts‎

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ Stream callbacks that will be called when the stream emits events.
240240
/**
241241
Writes stream data output to a Node.js response-like object.
242242
It sets a `Content-Type` header to `text/plain; charset=utf-8` and
243-
writes each text delta as a separate chunk.
243+
writes each stream data part as a separate chunk.
244244
245245
@param response A Node.js response-like object (ServerResponse).
246246
@param init Optional headers and status code.
@@ -276,6 +276,43 @@ writes each text delta as a separate chunk.
276276
read();
277277
}
278278

279+
/**
280+
Writes text delta output to a Node.js response-like object.
281+
It sets a `Content-Type` header to `text/plain; charset=utf-8` and
282+
writes each text delta as a separate chunk.
283+
284+
@param response A Node.js response-like object (ServerResponse).
285+
@param init Optional headers and status code.
286+
*/
287+
pipeTextStreamToResponse(
288+
response: ServerResponse,
289+
init?: { headers?: Record<string, string>; status?: number },
290+
) {
291+
response.writeHead(init?.status ?? 200, {
292+
'Content-Type': 'text/plain; charset=utf-8',
293+
...init?.headers,
294+
});
295+
296+
const reader = this.textStream.getReader();
297+
298+
const read = async () => {
299+
const encoder = new TextEncoder();
300+
try {
301+
while (true) {
302+
const { done, value } = await reader.read();
303+
if (done) break;
304+
response.write(encoder.encode(value));
305+
}
306+
} catch (error) {
307+
throw error;
308+
} finally {
309+
response.end();
310+
}
311+
};
312+
313+
read();
314+
}
315+
279316
/**
280317
Converts the result to a streamed response object with a stream data part stream.
281318
It can be used with the `useChat` and `useCompletion` hooks.

0 commit comments

Comments
 (0)