-
Notifications
You must be signed in to change notification settings - Fork 6.8k
Inline response recording and remove process_items indirection #7310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
codex-rs/core/src/codex.rs
Outdated
| sess.record_conversation_items( | ||
| &turn_context, | ||
| std::slice::from_ref(&item), | ||
| ) | ||
| .await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid persisting partial items before stream retries
Stream items are written to conversation history immediately in the OutputItemDone handler (record_conversation_items at lines 2225-2229), but run_turn will retry the same turn on transient stream errors (see the retry loop at lines 2113-2137). If the SSE disconnects before ResponseEvent::Completed, the partial tool call or message is already persisted and will be included in the next prompt when the turn retries, producing duplicated/partial conversation history and misleading turn diffs. The retry should happen before committing history or roll back the writes when the turn is retried.
Useful? React with 👍 / 👎.
|
@codex review this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
|
|
||
| output.push_back( | ||
| async move { | ||
| Ok(ProcessedResponseItem { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what I mean. we were pushing to output directly
| Ok(turn_output) => { | ||
| let processed_items = turn_output; | ||
| let TurnRunResult { | ||
| needs_follow_up, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need both of these? Can there be end of turn without assistant message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
end_of_turn_message: Option<..> ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For OpenAI provider no. I am not sure about azure or why it was implemented like that originally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the API point of view, yes you can. But from a client point of view, a turn is stopped when we decide so it should be agnostic from the provider
| /// history and rollout stay in sync even if the turn is later cancelled. | ||
| pub(crate) type InFlightFuture<'f> = Pin<Box<dyn Future<Output = Result<()>> + Send + 'f>>; | ||
|
|
||
| pub(crate) struct OutputItemResult { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it feel like we unnessesarily mixing tool (long running) and non-tool items?
This type is almost like a union, either we return a future or a possible end of turn.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or reasoning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can have the match at an earlier layer. just wanted to decrease the code in codex.rs
| if let Some(agent_message) = output_result.last_agent_message { | ||
| last_agent_message = Some(agent_message); | ||
| } | ||
| needs_follow_up |= output_result.needs_follow_up; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this value always comes from the last item, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, ideally a tool call will be at the end if we need a follow up. Not the most correct if ResponsesAPI changed under us though
| } | ||
| } | ||
| Ok(None) => { | ||
| if let Some(turn_item) = handle_non_tool_response_item(&item).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We handle handle_non_tool_response_item for item_added in the main loop. Should we do the same for done?
|
Can you make sure this fixes the issue described here? |
| } | ||
|
|
||
| impl From<CancelErr> for CodexErr { | ||
| fn from(_: CancelErr) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we at least log the error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we log errors. which part you mean?
codex/codex-rs/core/src/codex.rs
Lines 2163 to 2165 in 5e5fc7b
| Err(err) => { | |
| tracing::error!(error = ?err, "in-flight tool future failed during drain"); | |
| } |
| Ok(turn_output) => { | ||
| let processed_items = turn_output; | ||
| let TurnRunResult { | ||
| needs_follow_up, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the API point of view, yes you can. But from a client point of view, a turn is stopped when we decide so it should be agnostic from the provider
run_turnnow records items as they arrive instead of building aProcessedResponseItemlist and post‑processing viaprocess_items.handle_output_item_donereturns the follow‑up signal + optional tool future;needs_follow_upis set only there, and in‑flight tool futures are drained once at the end (errors logged, no extra state writes).process_itemsindirection and the extra output queuetool_parallelism::tool_results_groupedto allow any completion order while still requiring matching call/output IDs.