Skip to content

[Bug]: Breaking Change Occurred in Workflow Streaming b/w 0.11.7 & 0.11.20 #16701

@brycecf

Description

@brycecf

Bug Description

Workflow streaming has a breaking change at some point between 0.11.7 & 0.11.20. Code provided in Steps to Reproduce works in 0.11.7 and produces the following output:

Running workflow
Step one is happening
0
Step 0 is happening
1
Step 1 is happening
2
Step 2 is happening
3
Step 3 is happening
4
Step 4 is happening
Second step is happening
Step three is happening
Final result Workflow complete.

Python 3.11.9 was used both times.

Version

0.11.20

Steps to Reproduce

Run the following code using Python 3.11.9:

from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Event,
    Context,
)
import asyncio
import time


class FirstEvent(Event):
    first_output: str


class SecondEvent(Event):
    second_output: str


class ProgressEvent(Event):
    msg: str


class MyWorkflow(Workflow):
    @step
    async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))
        await asyncio.sleep(0)
        for i in range(10):
            time.sleep(1)
            print(i)
            ctx.write_event_to_stream(ProgressEvent(msg=f"Step {i} is happening"))
            await asyncio.sleep(0)
        return FirstEvent(first_output="First step complete.")

    @step
    async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent:
        ctx.write_event_to_stream(ProgressEvent(msg=f"Second step is happening"))
        return SecondEvent(second_output="Second step complete.")

    @step
    async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Step three is happening"))
        return StopEvent(result="Workflow complete.")


async def main():
    w = MyWorkflow(timeout=30, verbose=False)
    r = asyncio.create_task(w.run())
    print('Running workflow')

    async for ev in w.stream_events():
        if isinstance(ev, ProgressEvent):
            print(ev.msg)

    final_result = await r
    print("Final result", final_result)


if __name__ == "__main__":
    asyncio.run(main())

Relevant Logs/Tracbacks

Traceback (most recent call last):
  File "/Users/me/demo/src/test.py", line 64, in <module>
    asyncio.run(main())
  File "/Users/me/.pyenv/versions/3.11.9/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/Users/me/.pyenv/versions/3.11.9/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/me/.pyenv/versions/3.11.9/lib/python3.11/asyncio/base_events.py", line 654, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users//Users/me/demo/src/test.py", line 52, in main
    r = asyncio.create_task(w.run())
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/me/.pyenv/versions/3.11.9/lib/python3.11/asyncio/tasks.py", line 384, in create_task
    task = loop.create_task(coro)
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/me/.pyenv/versions/3.11.9/lib/python3.11/asyncio/base_events.py", line 437, in create_task
    task = tasks.Task(coro, loop=self, name=name, context=context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: a coroutine was expected, got <WorkflowHandler pending cb=[Dispatcher.span.<locals>.wrapper.<locals>.handle_future_result(span_id='Workflow.run...-545c508246a8', bound_args=<BoundArguments ()>, instance=<__main__.MyW...t 0x127debf10>, context=<_contextvars...t 0x12671bd00>)() at /Users/me/Library/Caches/pypoetry/virtualenvs/demo-n-PaIroy-py3.11/lib/python3.11/site-packages/llama_index/core/instrumentation/dispatcher.py:273]>

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingtriageIssue needs to be triaged/prioritized

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions