Skip to content

Conversation

@dapan1121
Copy link
Contributor

Description

Please briefly describe the code changes in this pull request.

Checklist

Please check the items in the checklist if applicable.

  • Is the user manual updated?
  • Are the test cases passed and automated?
  • Is there no significant decrease in test coverage?

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @dapan1121, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces significant enhancements to stream processing by providing more granular control over latency. It allows for explicit configuration of low-latency calculations and batch request wait times, which directly influences the frequency and speed at which stream data is processed and outputted. This change aims to reduce delays and improve the responsiveness of stream-based operations.

Highlights

  • New Low Latency Flag: A new boolean flag, lowLatencyCalc, has been introduced in SStreamRunnerDeployMsg and SStreamRunnerTask to explicitly enable low-latency calculation for stream processing tasks.
  • Configurable Batch Request Wait Time: A new global configuration parameter, tsStreamBatchRequestWaitMs, has been added. This parameter, with a default of 5000ms, allows for dynamic control over the waiting time for stream batch requests, replacing a previously hardcoded constant.
  • Enhanced Low-Latency Processing Logic: The stream runner's output merging and processing logic has been updated. Data blocks are now processed and outputted more frequently when lowLatencyCalc is enabled or when tsStreamBatchRequestWaitMs is set to a value less than 1000ms, ensuring quicker data availability.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new configuration streamBatchRequestWaitMs to control trigger latency in streams, replacing a hardcoded value. The changes are well-implemented across the configuration, messaging, and stream processing logic. I've pointed out a logic inconsistency between two files and a magic number that should be a constant for better maintainability. Overall, this is a good enhancement.

}
if (initPendingSize == 0 && pGroup->pPendingCalcParams.neles > 0) {
int64_t t = pTask->lowLatencyCalc ? now : now + STREAM_TRIGGER_BATCH_WINDOW_WAIT_NS;
int64_t t = pTask->lowLatencyCalc ? now : (now + tsStreamBatchRequestWaitMs * NANOSECOND_PER_MSEC);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The logic for determining low latency mode is inconsistent with streamRunner.c. In streamRunner.c, low latency is enabled if pTask->lowLatencyCalc is true OR tsStreamBatchRequestWaitMs < 1000. However, here only pTask->lowLatencyCalc is checked to decide whether to wait. This could lead to unexpected delays even when a global low latency is intended via tsStreamBatchRequestWaitMs.

To ensure consistency, the condition should be the same as in streamRunner.c.

    int64_t t = (pTask->lowLatencyCalc || (tsStreamBatchRequestWaitMs < 1000)) ? now : (now + tsStreamBatchRequestWaitMs * NANOSECOND_PER_MSEC);

nextExecTime = TMIN(nextExecTime, now);
} else if (pGroup->pPendingCalcParams.neles > 0) {
int64_t t = pTask->lowLatencyCalc ? now : now + STREAM_TRIGGER_BATCH_WINDOW_WAIT_NS;
int64_t t = pTask->lowLatencyCalc ? now : (now + tsStreamBatchRequestWaitMs * NANOSECOND_PER_MSEC);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The logic for determining low latency mode is inconsistent with streamRunner.c. In streamRunner.c, low latency is enabled if pTask->lowLatencyCalc is true OR tsStreamBatchRequestWaitMs < 1000. However, here only pTask->lowLatencyCalc is checked to decide whether to wait. This could lead to unexpected delays even when a global low latency is intended via tsStreamBatchRequestWaitMs.

To ensure consistency, the condition should be the same as in streamRunner.c.

    int64_t t = (pTask->lowLatencyCalc || (tsStreamBatchRequestWaitMs < 1000)) ? now : (now + tsStreamBatchRequestWaitMs * NANOSECOND_PER_MSEC);


if (pTask->notification.calcNotifyOnly) return code;

bool lowLatencyCalc = pTask->lowLatencyCalc || (tsStreamBatchRequestWaitMs < 1000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The magic number 1000 is used here as a threshold to determine if low latency mode should be enabled based on tsStreamBatchRequestWaitMs. To improve code readability and maintainability, it's better to define this value as a named constant, for example, STREAM_LOW_LATENCY_WAIT_MS_THRESHOLD, and define it at the top of the file or in a relevant header.

  bool lowLatencyCalc = pTask->lowLatencyCalc || (tsStreamBatchRequestWaitMs < STREAM_LOW_LATENCY_WAIT_MS_THRESHOLD);

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
1 out of 2 committers have signed the CLA.

✅ JinqingKuang
❌ dapan1121
You have signed the CLA already but the status is still pending? Let us recheck it.

@dapan1121 dapan1121 merged commit 0dc3023 into main Oct 14, 2025
11 of 13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants