Skip to content

Commit 1104aa5

Browse files
committed
[Enhancement] ignore the exec_mem_limit in pipeline execution (#34120)
Signed-off-by: Murphy <[email protected]> Signed-off-by: Murphy <[email protected]> (cherry picked from commit 915d762)
1 parent 49544bf commit 1104aa5

File tree

7 files changed

+24
-49
lines changed

7 files changed

+24
-49
lines changed

be/src/exec/pipeline/fragment_executor.cpp

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,6 @@ Status FragmentExecutor::_prepare_runtime_state(ExecEnv* exec_env, const Unified
218218
const auto& query_globals = request.common().query_globals;
219219
const auto& query_options = request.common().query_options;
220220
const auto& t_desc_tbl = request.common().desc_tbl;
221-
const int32_t degree_of_parallelism = _calc_dop(exec_env, request);
222221
auto& wg = _wg;
223222

224223
_fragment_ctx->set_runtime_state(
@@ -228,17 +227,17 @@ Status FragmentExecutor::_prepare_runtime_state(ExecEnv* exec_env, const Unified
228227
runtime_state->set_fragment_ctx(_fragment_ctx.get());
229228
runtime_state->set_query_ctx(_query_ctx);
230229

230+
// Only consider the `query_mem_limit` variable
231+
// If query_mem_limit is <= 0, it would set to -1, which means no limit
231232
auto* parent_mem_tracker = wg->mem_tracker();
232-
auto per_instance_mem_limit = query_options.__isset.mem_limit ? query_options.mem_limit : -1;
233-
auto option_query_mem_limit = query_options.__isset.query_mem_limit ? query_options.query_mem_limit : -1;
234-
int64_t query_mem_limit = _query_ctx->compute_query_mem_limit(parent_mem_tracker->limit(), per_instance_mem_limit,
235-
degree_of_parallelism, option_query_mem_limit);
233+
int64_t option_query_mem_limit = query_options.__isset.query_mem_limit ? query_options.query_mem_limit : -1;
234+
if (option_query_mem_limit <= 0) option_query_mem_limit = -1;
236235
int64_t big_query_mem_limit = wg->use_big_query_mem_limit() ? wg->big_query_mem_limit() : -1;
237236
int64_t spill_mem_limit_bytes = -1;
238237
if (query_options.__isset.enable_spill && query_options.enable_spill == true) {
239-
spill_mem_limit_bytes = query_mem_limit * query_options.spill_mem_limit_threshold;
238+
spill_mem_limit_bytes = option_query_mem_limit * query_options.spill_mem_limit_threshold;
240239
}
241-
_query_ctx->init_mem_tracker(query_mem_limit, parent_mem_tracker, big_query_mem_limit, spill_mem_limit_bytes,
240+
_query_ctx->init_mem_tracker(option_query_mem_limit, parent_mem_tracker, big_query_mem_limit, spill_mem_limit_bytes,
242241
wg.get());
243242

244243
auto query_mem_tracker = _query_ctx->mem_tracker();

be/src/exec/pipeline/query_context.cpp

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -98,31 +98,6 @@ void QueryContext::cancel(const Status& status) {
9898
_fragment_mgr->cancel(status);
9999
}
100100

101-
int64_t QueryContext::compute_query_mem_limit(int64_t parent_mem_limit, int64_t per_instance_mem_limit,
102-
size_t pipeline_dop, int64_t option_query_mem_limit) {
103-
// no mem_limit
104-
if (per_instance_mem_limit <= 0 && option_query_mem_limit <= 0) {
105-
return -1;
106-
}
107-
108-
int64_t mem_limit;
109-
if (option_query_mem_limit > 0) {
110-
mem_limit = option_query_mem_limit;
111-
} else {
112-
mem_limit = per_instance_mem_limit;
113-
// query's mem_limit = per-instance mem_limit * num_instances * pipeline_dop
114-
static constexpr int64_t MEM_LIMIT_MAX = std::numeric_limits<int64_t>::max();
115-
if (MEM_LIMIT_MAX / total_fragments() / pipeline_dop > mem_limit) {
116-
mem_limit *= static_cast<int64_t>(total_fragments()) * pipeline_dop;
117-
} else {
118-
mem_limit = MEM_LIMIT_MAX;
119-
}
120-
}
121-
122-
// query's mem_limit never exceeds its parent's limit if it exists
123-
return parent_mem_limit == -1 ? mem_limit : std::min(parent_mem_limit, mem_limit);
124-
}
125-
126101
void QueryContext::init_mem_tracker(int64_t query_mem_limit, MemTracker* parent, int64_t big_query_mem_limit,
127102
int64_t spill_mem_limit, workgroup::WorkGroup* wg) {
128103
std::call_once(_init_mem_tracker_once, [=]() {

be/src/exec/pipeline/query_context.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,7 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
126126
DCHECK(_desc_tbl != nullptr);
127127
return _desc_tbl;
128128
}
129-
// If option_query_mem_limit > 0, use it directly.
130-
// Otherwise, use per_instance_mem_limit * num_fragments * pipeline_dop.
131-
int64_t compute_query_mem_limit(int64_t parent_mem_limit, int64_t per_instance_mem_limit, size_t pipeline_dop,
132-
int64_t option_query_mem_limit);
129+
133130
size_t total_fragments() { return _total_fragments; }
134131
/// Initialize the mem_tracker of this query.
135132
/// Positive `big_query_mem_limit` and non-null `wg` indicate

be/src/exec/scan_node.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
#include "exec/scan_node.h"
3636

37+
#include "exec/pipeline/query_context.h"
3738
#include "exec/pipeline/scan/morsel.h"
3839

3940
namespace starrocks {
@@ -59,7 +60,14 @@ Status ScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
5960
if (options.__isset.scan_use_query_mem_ratio) {
6061
mem_ratio = options.scan_use_query_mem_ratio;
6162
}
62-
_mem_limit = state->query_mem_tracker_ptr()->limit() * mem_ratio;
63+
if (runtime_state()->query_ctx()) {
64+
// Used in pipeline-engine
65+
_mem_limit = state->query_ctx()->get_static_query_mem_limit() * mem_ratio;
66+
} else if (runtime_state()->query_mem_tracker_ptr()) {
67+
// Fallback in non-pipeline
68+
_mem_limit = state->query_mem_tracker_ptr()->limit() * mem_ratio;
69+
}
70+
6371
return Status::OK();
6472
}
6573

be/test/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ set(EXEC_FILES
3939
./exec/pipeline/pipeline_file_scan_node_test.cpp
4040
./exec/pipeline/pipeline_test_base.cpp
4141
./exec/pipeline/query_context_manger_test.cpp
42-
./exec/pipeline/query_context_test.cpp
4342
./exec/pipeline/table_function_operator_test.cpp
4443
./exec/query_cache/query_cache_test.cpp
4544
./exec/query_cache/transform_operator.cpp

fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,8 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
9797

9898
/**
9999
* The mem limit of query on BE. It takes effects only when enabling pipeline engine.
100-
* - If `query_mem_limit` > 0, use it to limit the memory of a query.
101-
* The memory a query able to be used is just `query_mem_limit`.
102-
* - Otherwise, use `exec_mem_limit` to limit the memory of a query.
103-
* The memory a query able to be used is `exec_mem_limit * num_fragments * pipeline_dop`.
104-
* To maintain compatibility, the default value is 0.
100+
* If `query_mem_limit` > 0, use it to limit the memory of a query.
101+
* Otherwise, no limitation
105102
*/
106103
public static final String QUERY_MEM_LIMIT = "query_mem_limit";
107104

@@ -671,7 +668,9 @@ public static MaterializedViewRewriteMode parse(String str) {
671668
@VariableMgr.VarAttr(name = ENABLE_SHARED_SCAN)
672669
private boolean enableSharedScan = false;
673670

674-
// max memory used on every backend.
671+
// max memory used on each fragment instance
672+
// NOTE: only used for non-pipeline engine and stream_load
673+
// The pipeline engine uses the query_mem_limit
675674
public static final long DEFAULT_EXEC_MEM_LIMIT = 2147483648L;
676675
@VariableMgr.VarAttr(name = EXEC_MEM_LIMIT, flag = VariableMgr.INVISIBLE)
677676
public long maxExecMemByte = DEFAULT_EXEC_MEM_LIMIT;
@@ -2857,9 +2856,7 @@ public void setSkewJoinRandRange(int skewJoinRandRange) {
28572856
public TQueryOptions toThrift() {
28582857
TQueryOptions tResult = new TQueryOptions();
28592858
tResult.setMem_limit(maxExecMemByte);
2860-
if (queryMemLimit > 0) {
2861-
tResult.setQuery_mem_limit(queryMemLimit);
2862-
}
2859+
tResult.setQuery_mem_limit(queryMemLimit);
28632860

28642861
// Avoid integer overflow
28652862
tResult.setQuery_timeout(Math.min(Integer.MAX_VALUE / 1000, queryTimeoutS));

fe/fe-core/src/test/java/com/starrocks/qe/scheduler/JobSpecTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ public void testFromBrokerLoadJobSpec() throws Exception {
219219
jobSpec = coordinator.getJobSpec();
220220
Assert.assertEquals(loadMemLimit, jobSpec.getQueryOptions().getLoad_mem_limit());
221221
Assert.assertTrue(jobSpec.getQueryOptions().isSetMem_limit());
222-
Assert.assertFalse(jobSpec.getQueryOptions().isSetQuery_mem_limit());
222+
Assert.assertTrue(jobSpec.getQueryOptions().isSetQuery_mem_limit());
223223
}
224224

225225
@Test
@@ -289,7 +289,7 @@ public void testFromStreamLoadJobSpec() throws Exception {
289289
jobSpec = coordinator.getJobSpec();
290290
Assert.assertEquals(loadMemLimit, jobSpec.getQueryOptions().getLoad_mem_limit());
291291
Assert.assertTrue(jobSpec.getQueryOptions().isSetMem_limit());
292-
Assert.assertFalse(jobSpec.getQueryOptions().isSetQuery_mem_limit());
292+
Assert.assertTrue(jobSpec.getQueryOptions().isSetQuery_mem_limit());
293293
}
294294

295295
@Test

0 commit comments

Comments
 (0)