-
Notifications
You must be signed in to change notification settings - Fork 5k
enh: add and apply trigger latency config #33245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Summary of ChangesHello @dapan1121, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces significant enhancements to stream processing by providing more granular control over latency. It allows for explicit configuration of low-latency calculations and batch request wait times, which directly influences the frequency and speed at which stream data is processed and outputted. This change aims to reduce delays and improve the responsiveness of stream-based operations. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new configuration streamBatchRequestWaitMs to control trigger latency in streams, replacing a hardcoded value. The changes are well-implemented across the configuration, messaging, and stream processing logic. I've pointed out a logic inconsistency between two files and a magic number that should be a constant for better maintainability. Overall, this is a good enhancement.
| } | ||
| if (initPendingSize == 0 && pGroup->pPendingCalcParams.neles > 0) { | ||
| int64_t t = pTask->lowLatencyCalc ? now : now + STREAM_TRIGGER_BATCH_WINDOW_WAIT_NS; | ||
| int64_t t = pTask->lowLatencyCalc ? now : (now + tsStreamBatchRequestWaitMs * NANOSECOND_PER_MSEC); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for determining low latency mode is inconsistent with streamRunner.c. In streamRunner.c, low latency is enabled if pTask->lowLatencyCalc is true OR tsStreamBatchRequestWaitMs < 1000. However, here only pTask->lowLatencyCalc is checked to decide whether to wait. This could lead to unexpected delays even when a global low latency is intended via tsStreamBatchRequestWaitMs.
To ensure consistency, the condition should be the same as in streamRunner.c.
int64_t t = (pTask->lowLatencyCalc || (tsStreamBatchRequestWaitMs < 1000)) ? now : (now + tsStreamBatchRequestWaitMs * NANOSECOND_PER_MSEC);| nextExecTime = TMIN(nextExecTime, now); | ||
| } else if (pGroup->pPendingCalcParams.neles > 0) { | ||
| int64_t t = pTask->lowLatencyCalc ? now : now + STREAM_TRIGGER_BATCH_WINDOW_WAIT_NS; | ||
| int64_t t = pTask->lowLatencyCalc ? now : (now + tsStreamBatchRequestWaitMs * NANOSECOND_PER_MSEC); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for determining low latency mode is inconsistent with streamRunner.c. In streamRunner.c, low latency is enabled if pTask->lowLatencyCalc is true OR tsStreamBatchRequestWaitMs < 1000. However, here only pTask->lowLatencyCalc is checked to decide whether to wait. This could lead to unexpected delays even when a global low latency is intended via tsStreamBatchRequestWaitMs.
To ensure consistency, the condition should be the same as in streamRunner.c.
int64_t t = (pTask->lowLatencyCalc || (tsStreamBatchRequestWaitMs < 1000)) ? now : (now + tsStreamBatchRequestWaitMs * NANOSECOND_PER_MSEC);|
|
||
| if (pTask->notification.calcNotifyOnly) return code; | ||
|
|
||
| bool lowLatencyCalc = pTask->lowLatencyCalc || (tsStreamBatchRequestWaitMs < 1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The magic number 1000 is used here as a threshold to determine if low latency mode should be enabled based on tsStreamBatchRequestWaitMs. To improve code readability and maintainability, it's better to define this value as a named constant, for example, STREAM_LOW_LATENCY_WAIT_MS_THRESHOLD, and define it at the top of the file or in a relevant header.
bool lowLatencyCalc = pTask->lowLatencyCalc || (tsStreamBatchRequestWaitMs < STREAM_LOW_LATENCY_WAIT_MS_THRESHOLD);|
|
Description
Please briefly describe the code changes in this pull request.
Checklist
Please check the items in the checklist if applicable.