-
Notifications
You must be signed in to change notification settings - Fork 384
feat: truly limit time range by split window #6295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
7403cce
to
c26930d
Compare
feat: truly limit time range by split window Update src/flow/src/batching_mode/state.rs Co-authored-by: Lei, HUANG <[email protected]> Signed-off-by: discord9 <[email protected]>
Signed-off-by: discord9 <[email protected]>
c26930d
to
0fb11c4
Compare
Signed-off-by: discord9 <[email protected]>
Signed-off-by: discord9 <[email protected]>
Signed-off-by: discord9 <[email protected]>
Signed-off-by: discord9 <[email protected]>
src/flow/src/batching_mode/engine.rs
Outdated
@@ -370,7 +370,7 @@ impl BatchingEngine { | |||
let task = self.tasks.read().await.get(&flow_id).cloned(); | |||
let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?; | |||
|
|||
task.mark_all_windows_as_dirty()?; | |||
// task.mark_all_windows_as_dirty()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, why is this line of code commented out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, why is this line of code commented out?
Added explain in comment, as it is usually prohibitive to update the entire time range(unless no time window is found, but that's also handled now), change to only update dirty time window instead, this behavior shouldn't be breaking though, as it still update dirty time window
Signed-off-by: discord9 <[email protected]>
bugbot run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Window Splitting Logic Errors
The gen_filter_exprs
function's window splitting logic has three issues:
- A division-by-zero vulnerability exists when calculating
times
ifwindow_size.num_seconds()
is zero. - The condition
surplus.num_seconds() < window_size.num_seconds()
is too strict; it should be<=
to prevent unnecessary splitting whensurplus
is exactly onewindow_size
. - Casting the
i64
times
variable toi32
forsplit_offset
calculation can lead to integer overflow, resulting in incorrect split points.
src/flow/src/batching_mode/state.rs#L266-L274
greptimedb/src/flow/src/batching_mode/state.rs
Lines 266 to 274 in ad11b3e
// split at window_size * times | |
let surplus = max_time_range - cur_time_range; | |
if surplus.num_seconds() < window_size.num_seconds() { | |
// Skip splitting if surplus is smaller than window_size | |
break; | |
} | |
let times = surplus.num_seconds() / window_size.num_seconds(); | |
let split_offset = window_size * times as i32; |
Bug: Flow Flush Command Fails to Process All Data
The removal of task.mark_all_windows_as_dirty()?;
from flush_flow_inner
fundamentally changes its behavior. Previously, this ensured all time windows were marked dirty, forcing a complete recalculation and flush of all relevant data. Now, flush_flow_inner
only processes already dirty windows, breaking the ADMIN FLUSH_FLOW
command's expected functionality to flush all data. This can lead to missed data and empty results in scenarios where a full flush is expected.
src/flow/src/batching_mode/engine.rs#L369-L403
greptimedb/src/flow/src/batching_mode/engine.rs
Lines 369 to 403 in ad11b3e
/// As flush the whole time range is usually prohibitively expensive. | |
pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> { | |
debug!("Try flush flow {flow_id}"); | |
let task = self.tasks.read().await.get(&flow_id).cloned(); | |
let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?; | |
let time_window_size = task | |
.config | |
.time_window_expr | |
.as_ref() | |
.and_then(|expr| *expr.time_window_size()); | |
let cur_dirty_window_cnt = time_window_size.map(|time_window_size| { | |
task.state | |
.read() | |
.unwrap() | |
.dirty_time_windows | |
.effective_count(&time_window_size) | |
}); | |
let res = task | |
.gen_exec_once( | |
&self.query_engine, | |
&self.frontend_client, | |
cur_dirty_window_cnt, | |
) | |
.await?; | |
let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize; | |
debug!( | |
"Successfully flush flow {flow_id}, affected rows={}", | |
affected_rows | |
); | |
Ok(affected_rows) | |
} |
BugBot free trial expires on June 24, 2025
You have used $0.00 of your $5.00 spend limit so far. Manage your spend limit in the Cursor dashboard.
Was this report helpful? Give feedback by reacting with 👍 or 👎
Signed-off-by: discord9 <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Enhance batching mode to split large dirty time windows into smaller ones, capping query spans by a configurable window count, while updating related metrics and tests.
- Add
max_window_cnt
parameter to batching task and engine flush to limit windows per query. - Implement window‐splitting logic in
DirtyTimeWindows
and exposeeffective_count
. - Rename metrics to reflect window size and update test SQL/result files for deterministic timestamps.
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated no comments.
Show a summary per file
File | Description |
---|---|
tests/cases/standalone/common/flow/flow_rebuild.sql | Use fixed 0::TimestampSecond and SELECT * in SQL tests |
tests/cases/standalone/common/flow/flow_rebuild.result | Update expected columns for SELECT * output |
tests/cases/standalone/common/flow/flow_flush.result | Remove legacy table output and replace with placeholders |
src/flow/src/metrics.rs | Rename histogram metric and add stalled window size metric |
src/flow/src/batching_mode/task.rs | Add max_window_cnt arg to gen_exec_once and gen_insert_plan |
src/flow/src/batching_mode/state.rs | Implement window splitting, add effective_count , update metrics |
src/flow/src/batching_mode/engine.rs | Pass max_window_cnt into gen_exec_once during flush |
Comments suppressed due to low confidence (3)
src/flow/src/batching_mode/state.rs:264
- [nitpick] The variable
x
is not descriptive. Consider renaming it to something likeduration
orwindow_duration
for clarity.
let Some(x) = end.sub(start) else {
tests/cases/standalone/common/flow/flow_flush.result:45
- It looks like stray '+++' and '++' lines were inserted into the expected result file. Please remove these to restore the correct expected output or replace them with the accurate table delimiter syntax.
++
src/flow/src/batching_mode/state.rs:639
- [nitpick] This commented‐out slice of
testcases
appears to be leftover debug code. Consider removing it to keep the test suite clean.
// let len = testcases.len();
I hereby agree to the terms of the GreptimeDB CLA.
Refer to a related PR or issue link (optional)
What's changed and what's your intention?
as title, split large dirty time window into smaller ones for query, so that each flow can run a moderate size query without waiting for it to finish for too long, and avoid repeat computation due to query too eargely
PR Checklist
Please convert it to a draft if some of the following conditions are not met.