-
Notifications
You must be signed in to change notification settings - Fork 6.7k
Closed
Labels
bugSomething isn't workingSomething isn't workingtriageIssue needs to be triaged/prioritizedIssue needs to be triaged/prioritized
Description
Bug Description
Workflow streaming has a breaking change at some point between 0.11.7 & 0.11.20. Code provided in Steps to Reproduce works in 0.11.7 and produces the following output:
Running workflow
Step one is happening
0
Step 0 is happening
1
Step 1 is happening
2
Step 2 is happening
3
Step 3 is happening
4
Step 4 is happening
Second step is happening
Step three is happening
Final result Workflow complete.Python 3.11.9 was used both times.
Version
0.11.20
Steps to Reproduce
Run the following code using Python 3.11.9:
from llama_index.core.workflow import (
StartEvent,
StopEvent,
Workflow,
step,
Event,
Context,
)
import asyncio
import time
class FirstEvent(Event):
first_output: str
class SecondEvent(Event):
second_output: str
class ProgressEvent(Event):
msg: str
class MyWorkflow(Workflow):
@step
async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))
await asyncio.sleep(0)
for i in range(10):
time.sleep(1)
print(i)
ctx.write_event_to_stream(ProgressEvent(msg=f"Step {i} is happening"))
await asyncio.sleep(0)
return FirstEvent(first_output="First step complete.")
@step
async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent:
ctx.write_event_to_stream(ProgressEvent(msg=f"Second step is happening"))
return SecondEvent(second_output="Second step complete.")
@step
async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent:
ctx.write_event_to_stream(ProgressEvent(msg="Step three is happening"))
return StopEvent(result="Workflow complete.")
async def main():
w = MyWorkflow(timeout=30, verbose=False)
r = asyncio.create_task(w.run())
print('Running workflow')
async for ev in w.stream_events():
if isinstance(ev, ProgressEvent):
print(ev.msg)
final_result = await r
print("Final result", final_result)
if __name__ == "__main__":
asyncio.run(main())Relevant Logs/Tracbacks
Traceback (most recent call last):
File "/Users/me/demo/src/test.py", line 64, in <module>
asyncio.run(main())
File "/Users/me/.pyenv/versions/3.11.9/lib/python3.11/asyncio/runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/Users/me/.pyenv/versions/3.11.9/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/me/.pyenv/versions/3.11.9/lib/python3.11/asyncio/base_events.py", line 654, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/Users//Users/me/demo/src/test.py", line 52, in main
r = asyncio.create_task(w.run())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/me/.pyenv/versions/3.11.9/lib/python3.11/asyncio/tasks.py", line 384, in create_task
task = loop.create_task(coro)
^^^^^^^^^^^^^^^^^^^^^^
File "/Users/me/.pyenv/versions/3.11.9/lib/python3.11/asyncio/base_events.py", line 437, in create_task
task = tasks.Task(coro, loop=self, name=name, context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: a coroutine was expected, got <WorkflowHandler pending cb=[Dispatcher.span.<locals>.wrapper.<locals>.handle_future_result(span_id='Workflow.run...-545c508246a8', bound_args=<BoundArguments ()>, instance=<__main__.MyW...t 0x127debf10>, context=<_contextvars...t 0x12671bd00>)() at /Users/me/Library/Caches/pypoetry/virtualenvs/demo-n-PaIroy-py3.11/lib/python3.11/site-packages/llama_index/core/instrumentation/dispatcher.py:273]>Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't workingtriageIssue needs to be triaged/prioritizedIssue needs to be triaged/prioritized