Skip to content

Commit b4c68ec

Browse files
authored
ai/rsc: ReadableStream as provider for streamable value; add .append() method (#1460)
1 parent 400b2b1 commit b4c68ec

File tree

3 files changed

+416
-2
lines changed

3 files changed

+416
-2
lines changed

‎.changeset/clean-planes-reflect.md‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'ai': patch
3+
---
4+
5+
ai/rsc: ReadableStream as provider for createStreamableValue; add .append() method

‎packages/core/rsc/shared-client/streamable.ui.test.tsx‎

Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,5 +228,282 @@ describe('rsc - readStreamableValue()', () => {
228228
]
229229
`);
230230
});
231+
232+
it('should be able to call .append() to send patches', async () => {
233+
const streamable = createStreamableValue();
234+
const value = streamable.value;
235+
236+
streamable.append('hello');
237+
streamable.append(' world');
238+
streamable.append('!');
239+
streamable.done();
240+
241+
expect(await getRawChunks(value)).toMatchInlineSnapshot(`
242+
[
243+
{
244+
"curr": undefined,
245+
"type": Symbol(ui.streamable.value),
246+
},
247+
{
248+
"curr": "hello",
249+
},
250+
{
251+
"diff": [
252+
0,
253+
" world",
254+
],
255+
},
256+
{
257+
"diff": [
258+
0,
259+
"!",
260+
],
261+
},
262+
{},
263+
]
264+
`);
265+
266+
const values = [];
267+
for await (const v of readStreamableValue(value)) {
268+
values.push(v);
269+
}
270+
expect(values).toMatchInlineSnapshot(`
271+
[
272+
"hello",
273+
"hello world",
274+
"hello world!",
275+
]
276+
`);
277+
});
278+
279+
it('should be able to mix .update() and .append() with optimized payloads', async () => {
280+
const streamable = createStreamableValue('hello');
281+
const value = streamable.value;
282+
283+
streamable.append(' world');
284+
streamable.update('hello world!!');
285+
streamable.update('some new');
286+
streamable.update('some new string');
287+
streamable.append(' with patch!');
288+
streamable.done();
289+
290+
expect(await getRawChunks(value)).toMatchInlineSnapshot(`
291+
[
292+
{
293+
"curr": "hello",
294+
"type": Symbol(ui.streamable.value),
295+
},
296+
{
297+
"diff": [
298+
0,
299+
" world",
300+
],
301+
},
302+
{
303+
"diff": [
304+
0,
305+
"!!",
306+
],
307+
},
308+
{
309+
"curr": "some new",
310+
},
311+
{
312+
"diff": [
313+
0,
314+
" string",
315+
],
316+
},
317+
{
318+
"diff": [
319+
0,
320+
" with patch!",
321+
],
322+
},
323+
{},
324+
]
325+
`);
326+
327+
const values = [];
328+
for await (const v of readStreamableValue(value)) {
329+
values.push(v);
330+
}
331+
expect(values).toMatchInlineSnapshot(`
332+
[
333+
"hello",
334+
"hello world",
335+
"hello world!!",
336+
"some new",
337+
"some new string",
338+
"some new string with patch!",
339+
]
340+
`);
341+
});
342+
343+
it('should behave like .update() with .append() and .done()', async () => {
344+
const streamable = createStreamableValue('hello');
345+
const value = streamable.value;
346+
347+
streamable.append(' world');
348+
streamable.done('fin');
349+
350+
expect(await getRawChunks(value)).toMatchInlineSnapshot(`
351+
[
352+
{
353+
"curr": "hello",
354+
"type": Symbol(ui.streamable.value),
355+
},
356+
{
357+
"diff": [
358+
0,
359+
" world",
360+
],
361+
},
362+
{
363+
"curr": "fin",
364+
},
365+
]
366+
`);
367+
368+
const values = [];
369+
for await (const v of readStreamableValue(value)) {
370+
values.push(v);
371+
}
372+
expect(values).toMatchInlineSnapshot(`
373+
[
374+
"hello",
375+
"hello world",
376+
"fin",
377+
]
378+
`);
379+
});
380+
});
381+
382+
describe('readableStream', () => {
383+
it('should be able to accept readableStream as the source', async () => {
384+
const streamable = createStreamableValue(
385+
new ReadableStream({
386+
start(controller) {
387+
controller.enqueue('hello');
388+
controller.enqueue(' world');
389+
controller.enqueue('!');
390+
controller.close();
391+
},
392+
}),
393+
);
394+
const value = streamable.value;
395+
396+
expect(await getRawChunks(value)).toMatchInlineSnapshot(`
397+
[
398+
{
399+
"curr": undefined,
400+
"type": Symbol(ui.streamable.value),
401+
},
402+
{
403+
"curr": "hello",
404+
},
405+
{
406+
"diff": [
407+
0,
408+
" world",
409+
],
410+
},
411+
{
412+
"diff": [
413+
0,
414+
"!",
415+
],
416+
},
417+
{},
418+
]
419+
`);
420+
421+
const values = [];
422+
for await (const v of readStreamableValue(value)) {
423+
values.push(v);
424+
}
425+
expect(values).toMatchInlineSnapshot(`
426+
[
427+
"hello",
428+
"hello world",
429+
"hello world!",
430+
]
431+
`);
432+
});
433+
434+
it('should accept readableStream with JSON payloads', async () => {
435+
const streamable = createStreamableValue(
436+
new ReadableStream({
437+
start(controller) {
438+
controller.enqueue({ v: 1 });
439+
controller.enqueue({ v: 2 });
440+
controller.enqueue({ v: 3 });
441+
controller.close();
442+
},
443+
}),
444+
);
445+
const value = streamable.value;
446+
447+
expect(await getRawChunks(value)).toMatchInlineSnapshot(`
448+
[
449+
{
450+
"curr": undefined,
451+
"type": Symbol(ui.streamable.value),
452+
},
453+
{
454+
"curr": {
455+
"v": 1,
456+
},
457+
},
458+
{
459+
"curr": {
460+
"v": 2,
461+
},
462+
},
463+
{
464+
"curr": {
465+
"v": 3,
466+
},
467+
},
468+
{},
469+
]
470+
`);
471+
472+
const values = [];
473+
for await (const v of readStreamableValue(value)) {
474+
values.push(v);
475+
}
476+
expect(values).toMatchInlineSnapshot(`
477+
[
478+
{
479+
"v": 1,
480+
},
481+
{
482+
"v": 2,
483+
},
484+
{
485+
"v": 3,
486+
},
487+
]
488+
`);
489+
});
490+
491+
it('should lock the streamable if from readableStream', async () => {
492+
const streamable = createStreamableValue(
493+
new ReadableStream({
494+
async start(controller) {
495+
await nextTick();
496+
controller.enqueue('hello');
497+
controller.close();
498+
},
499+
}),
500+
);
501+
502+
expect(() =>
503+
streamable.update('world'),
504+
).toThrowErrorMatchingInlineSnapshot(
505+
'".update(): Value stream is locked and cannot be updated."',
506+
);
507+
});
231508
});
232509
});

0 commit comments

Comments
 (0)