Skip to content

feat: truly limit time range by split window #6295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

discord9
Copy link
Contributor

@discord9 discord9 commented Jun 11, 2025

I hereby agree to the terms of the GreptimeDB CLA.

Refer to a related PR or issue link (optional)

What's changed and what's your intention?

as title, split large dirty time window into smaller ones for query, so that each flow can run a moderate size query without waiting for it to finish for too long, and avoid repeat computation due to query too eargely

PR Checklist

Please convert it to a draft if some of the following conditions are not met.

  • I have written the necessary rustdoc comments.
  • I have added the necessary unit tests and integration tests.
  • This PR requires documentation updates.
  • API changes are backward compatible.
  • Schema or data changes are backward compatible.

@github-actions github-actions bot added the docs-not-required This change does not impact docs. label Jun 11, 2025
@v0y4g3r v0y4g3r self-requested a review June 11, 2025 08:13
@discord9 discord9 force-pushed the flow/split_large_window_for_query branch from 7403cce to c26930d Compare June 11, 2025 10:03
@discord9 discord9 marked this pull request as ready for review June 11, 2025 11:55
discord9 and others added 2 commits June 11, 2025 19:56
feat: truly limit time range by split window

Update src/flow/src/batching_mode/state.rs

Co-authored-by: Lei, HUANG <[email protected]>
Signed-off-by: discord9 <[email protected]>
@discord9 discord9 force-pushed the flow/split_large_window_for_query branch from c26930d to 0fb11c4 Compare June 11, 2025 12:14
@waynexia waynexia requested a review from Copilot June 12, 2025 06:46
Copilot

This comment was marked as outdated.

@discord9 discord9 requested a review from a team as a code owner June 13, 2025 07:41
discord9 added 3 commits June 13, 2025 17:38
Signed-off-by: discord9 <[email protected]>
@@ -370,7 +370,7 @@ impl BatchingEngine {
let task = self.tasks.read().await.get(&flow_id).cloned();
let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;

task.mark_all_windows_as_dirty()?;
// task.mark_all_windows_as_dirty()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, why is this line of code commented out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, why is this line of code commented out?

Added explain in comment, as it is usually prohibitive to update the entire time range(unless no time window is found, but that's also handled now), change to only update dirty time window instead, this behavior shouldn't be breaking though, as it still update dirty time window

@v0y4g3r
Copy link
Contributor

v0y4g3r commented Jun 17, 2025

bugbot run

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Window Splitting Logic Errors

The gen_filter_exprs function's window splitting logic has three issues:

  1. A division-by-zero vulnerability exists when calculating times if window_size.num_seconds() is zero.
  2. The condition surplus.num_seconds() < window_size.num_seconds() is too strict; it should be <= to prevent unnecessary splitting when surplus is exactly one window_size.
  3. Casting the i64 times variable to i32 for split_offset calculation can lead to integer overflow, resulting in incorrect split points.

src/flow/src/batching_mode/state.rs#L266-L274

// split at window_size * times
let surplus = max_time_range - cur_time_range;
if surplus.num_seconds() < window_size.num_seconds() {
// Skip splitting if surplus is smaller than window_size
break;
}
let times = surplus.num_seconds() / window_size.num_seconds();
let split_offset = window_size * times as i32;

Fix in Cursor


Bug: Flow Flush Command Fails to Process All Data

The removal of task.mark_all_windows_as_dirty()?; from flush_flow_inner fundamentally changes its behavior. Previously, this ensured all time windows were marked dirty, forcing a complete recalculation and flush of all relevant data. Now, flush_flow_inner only processes already dirty windows, breaking the ADMIN FLUSH_FLOW command's expected functionality to flush all data. This can lead to missed data and empty results in scenarios where a full flush is expected.

src/flow/src/batching_mode/engine.rs#L369-L403

/// As flush the whole time range is usually prohibitively expensive.
pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
debug!("Try flush flow {flow_id}");
let task = self.tasks.read().await.get(&flow_id).cloned();
let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
let time_window_size = task
.config
.time_window_expr
.as_ref()
.and_then(|expr| *expr.time_window_size());
let cur_dirty_window_cnt = time_window_size.map(|time_window_size| {
task.state
.read()
.unwrap()
.dirty_time_windows
.effective_count(&time_window_size)
});
let res = task
.gen_exec_once(
&self.query_engine,
&self.frontend_client,
cur_dirty_window_cnt,
)
.await?;
let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize;
debug!(
"Successfully flush flow {flow_id}, affected rows={}",
affected_rows
);
Ok(affected_rows)
}

Fix in Cursor


BugBot free trial expires on June 24, 2025
You have used $0.00 of your $5.00 spend limit so far. Manage your spend limit in the Cursor dashboard.

Was this report helpful? Give feedback by reacting with 👍 or 👎

Signed-off-by: discord9 <[email protected]>
@discord9 discord9 requested a review from Copilot June 20, 2025 08:13
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Enhance batching mode to split large dirty time windows into smaller ones, capping query spans by a configurable window count, while updating related metrics and tests.

  • Add max_window_cnt parameter to batching task and engine flush to limit windows per query.
  • Implement window‐splitting logic in DirtyTimeWindows and expose effective_count.
  • Rename metrics to reflect window size and update test SQL/result files for deterministic timestamps.

Reviewed Changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated no comments.

Show a summary per file
File Description
tests/cases/standalone/common/flow/flow_rebuild.sql Use fixed 0::TimestampSecond and SELECT * in SQL tests
tests/cases/standalone/common/flow/flow_rebuild.result Update expected columns for SELECT * output
tests/cases/standalone/common/flow/flow_flush.result Remove legacy table output and replace with placeholders
src/flow/src/metrics.rs Rename histogram metric and add stalled window size metric
src/flow/src/batching_mode/task.rs Add max_window_cnt arg to gen_exec_once and gen_insert_plan
src/flow/src/batching_mode/state.rs Implement window splitting, add effective_count, update metrics
src/flow/src/batching_mode/engine.rs Pass max_window_cnt into gen_exec_once during flush
Comments suppressed due to low confidence (3)

src/flow/src/batching_mode/state.rs:264

  • [nitpick] The variable x is not descriptive. Consider renaming it to something like duration or window_duration for clarity.
            let Some(x) = end.sub(start) else {

tests/cases/standalone/common/flow/flow_flush.result:45

  • It looks like stray '+++' and '++' lines were inserted into the expected result file. Please remove these to restore the correct expected output or replace them with the accurate table delimiter syntax.
++

src/flow/src/batching_mode/state.rs:639

  • [nitpick] This commented‐out slice of testcases appears to be leftover debug code. Consider removing it to keep the test suite clean.
        // let len = testcases.len();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs-not-required This change does not impact docs. size/M
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants