File tree Expand file tree Collapse file tree 8 files changed +51
-51
lines changed
examples/ai-core/src/stream-text Expand file tree Collapse file tree 8 files changed +51
-51
lines changed Original file line number Diff line number Diff line change 1+ ---
2+ ' @ai-sdk/provider-utils ' : patch
3+ ' ai ' : patch
4+ ---
5+
6+ ai/core: fix abort handling in transformation stream
Original file line number Diff line number Diff line change @@ -5,33 +5,24 @@ import dotenv from 'dotenv';
55dotenv . config ( ) ;
66
77async function main ( ) {
8- const abortController = new AbortController ( ) ;
9-
10- // run async:
11- ( async ( ) => {
12- await delay ( 1500 ) ; // wait 1.5 seconds
13- abortController . abort ( ) ; // aborts the streaming
14- } ) ( ) ;
15-
168 try {
179 const { textStream } = await experimental_streamText ( {
1810 model : openai ( 'gpt-3.5-turbo' ) ,
1911 prompt : 'Write a short story about a robot learning to love:\n\n' ,
20- abortSignal : abortController . signal ,
12+ abortSignal : AbortSignal . timeout ( 3000 ) ,
2113 } ) ;
2214
2315 for await ( const textPart of textStream ) {
2416 process . stdout . write ( textPart ) ;
2517 }
2618 } catch ( error ) {
27- if ( error instanceof Error && error . name === 'AbortError' ) {
19+ if (
20+ error instanceof Error &&
21+ ( error . name === 'AbortError' || error . name === 'TimeoutError' )
22+ ) {
2823 console . log ( '\n\nAbortError: The run was aborted.' ) ;
2924 }
3025 }
3126}
3227
3328main ( ) . catch ( console . error ) ;
34-
35- async function delay ( delayInMs : number ) : Promise < void > {
36- return new Promise ( resolve => setTimeout ( resolve , delayInMs ) ) ;
37- }
Original file line number Diff line number Diff line change @@ -162,27 +162,30 @@ export function runToolsTransformation<
162162 // combine the generator stream and the tool results stream
163163 return new ReadableStream < TextStreamPart < TOOLS > > ( {
164164 async start ( controller ) {
165- generatorStream . pipeThrough ( forwardStream ) . pipeTo (
166- new WritableStream ( {
167- write ( chunk ) {
168- controller . enqueue ( chunk ) ;
169- } ,
170- close ( ) {
171- // the generator stream controller is automatically closed when it's consumed
172- } ,
173- } ) ,
174- ) ;
175-
176- toolResultsStream . pipeTo (
177- new WritableStream ( {
178- write ( chunk ) {
179- controller . enqueue ( chunk ) ;
180- } ,
181- close ( ) {
182- controller . close ( ) ;
183- } ,
184- } ) ,
185- ) ;
165+ // need to wait for both pipes so there are no dangling promises that
166+ // can cause uncaught promise rejections when the stream is aborted
167+ return Promise . all ( [
168+ generatorStream . pipeThrough ( forwardStream ) . pipeTo (
169+ new WritableStream ( {
170+ write ( chunk ) {
171+ controller . enqueue ( chunk ) ;
172+ } ,
173+ close ( ) {
174+ // the generator stream controller is automatically closed when it's consumed
175+ } ,
176+ } ) ,
177+ ) ,
178+ toolResultsStream . pipeTo (
179+ new WritableStream ( {
180+ write ( chunk ) {
181+ controller . enqueue ( chunk ) ;
182+ } ,
183+ close ( ) {
184+ controller . close ( ) ;
185+ } ,
186+ } ) ,
187+ ) ,
188+ ] ) ;
186189 } ,
187190 } ) ;
188191}
Original file line number Diff line number Diff line change 44 LanguageModelV1FinishReason ,
55 LanguageModelV1LogProbs ,
66} from '@ai-sdk/provider' ;
7+ import { ServerResponse } from 'node:http' ;
78import {
89 AIStreamCallbacksAndOptions ,
9- StreamData ,
1010 StreamingTextResponse ,
1111 createCallbacksTransformer ,
1212 createStreamDataTransformer ,
@@ -26,7 +26,6 @@ import { retryWithExponentialBackoff } from '../util/retry-with-exponential-back
2626import { runToolsTransformation } from './run-tools-transformation' ;
2727import { ToToolCall } from './tool-call' ;
2828import { ToToolResult } from './tool-result' ;
29- import { ServerResponse } from 'node:http' ;
3029
3130/**
3231Generate a text and call tools for a given prompt using a language model.
Original file line number Diff line number Diff line change 11import { APICallError , RetryError } from '@ai-sdk/provider' ;
2- import { getErrorMessage } from '@ai-sdk/provider-utils' ;
2+ import { getErrorMessage , isAbortError } from '@ai-sdk/provider-utils' ;
33import { delay } from './delay' ;
44
55export type RetryFunction = < OUTPUT > (
@@ -35,7 +35,7 @@ async function _retryWithExponentialBackoff<OUTPUT>(
3535 try {
3636 return await f ( ) ;
3737 } catch ( error ) {
38- if ( error instanceof Error && error . name === 'AbortError' ) {
38+ if ( isAbortError ( error ) ) {
3939 throw error ; // don't retry when the request was aborted
4040 }
4141
Original file line number Diff line number Diff line change 11export * from './extract-response-headers' ;
22export * from './generate-id' ;
33export * from './get-error-message' ;
4+ export * from './is-abort-error' ;
45export * from './load-api-key' ;
56export * from './parse-json' ;
67export * from './post-to-api' ;
Original file line number Diff line number Diff line change 1+ export function isAbortError ( error : unknown ) : error is DOMException {
2+ return (
3+ error instanceof DOMException &&
4+ ( error . name === 'AbortError' || error . name === 'TimeoutError' )
5+ ) ;
6+ }
Original file line number Diff line number Diff line change 11import { APICallError } from '@ai-sdk/provider' ;
2+ import { isAbortError } from './is-abort-error' ;
23import { ResponseHandler } from './response-handler' ;
34
45export const postJsonToApi = async < T > ( {
@@ -70,13 +71,8 @@ export const postToApi = async <T>({
7071 requestBodyValues : body . values ,
7172 } ) ;
7273 } catch ( error ) {
73- if ( error instanceof Error ) {
74- if (
75- error . name === 'AbortError' ||
76- APICallError . isAPICallError ( error )
77- ) {
78- throw error ;
79- }
74+ if ( isAbortError ( error ) || APICallError . isAPICallError ( error ) ) {
75+ throw error ;
8076 }
8177
8278 throw new APICallError ( {
@@ -97,7 +93,7 @@ export const postToApi = async <T>({
9793 } ) ;
9894 } catch ( error ) {
9995 if ( error instanceof Error ) {
100- if ( error . name === 'AbortError' || APICallError . isAPICallError ( error ) ) {
96+ if ( isAbortError ( error ) || APICallError . isAPICallError ( error ) ) {
10197 throw error ;
10298 }
10399 }
@@ -111,10 +107,8 @@ export const postToApi = async <T>({
111107 } ) ;
112108 }
113109 } catch ( error ) {
114- if ( error instanceof Error ) {
115- if ( error . name === 'AbortError' ) {
116- throw error ;
117- }
110+ if ( isAbortError ( error ) ) {
111+ throw error ;
118112 }
119113
120114 // unwrap original error when fetch failed (for easier debugging):
You can’t perform that action at this time.
0 commit comments