Skip to content

Commit b8b0cb8

Browse files
murphyatworkZhangg7723
authored andcommitted
[Enhancement] ignore the exec_mem_limit in pipeline execution (StarRocks#34120)
Signed-off-by: Murphy <[email protected]> Signed-off-by: Murphy <[email protected]> Signed-off-by: 张敢 <[email protected]>
1 parent b2974e6 commit b8b0cb8

File tree

7 files changed

+16
-99
lines changed

7 files changed

+16
-99
lines changed

be/src/exec/pipeline/fragment_executor.cpp

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,6 @@ Status FragmentExecutor::_prepare_runtime_state(ExecEnv* exec_env, const Unified
220220
const auto& query_globals = request.common().query_globals;
221221
const auto& query_options = request.common().query_options;
222222
const auto& t_desc_tbl = request.common().desc_tbl;
223-
const int32_t degree_of_parallelism = _calc_dop(exec_env, request);
224223
auto& wg = _wg;
225224

226225
_fragment_ctx->set_runtime_state(
@@ -230,17 +229,17 @@ Status FragmentExecutor::_prepare_runtime_state(ExecEnv* exec_env, const Unified
230229
runtime_state->set_fragment_ctx(_fragment_ctx.get());
231230
runtime_state->set_query_ctx(_query_ctx);
232231

232+
// Only consider the `query_mem_limit` variable
233+
// If query_mem_limit is <= 0, it would set to -1, which means no limit
233234
auto* parent_mem_tracker = wg->mem_tracker();
234-
auto per_instance_mem_limit = query_options.__isset.mem_limit ? query_options.mem_limit : -1;
235-
auto option_query_mem_limit = query_options.__isset.query_mem_limit ? query_options.query_mem_limit : -1;
236-
int64_t query_mem_limit = _query_ctx->compute_query_mem_limit(parent_mem_tracker->limit(), per_instance_mem_limit,
237-
degree_of_parallelism, option_query_mem_limit);
235+
int64_t option_query_mem_limit = query_options.__isset.query_mem_limit ? query_options.query_mem_limit : -1;
236+
if (option_query_mem_limit <= 0) option_query_mem_limit = -1;
238237
int64_t big_query_mem_limit = wg->use_big_query_mem_limit() ? wg->big_query_mem_limit() : -1;
239238
int64_t spill_mem_limit_bytes = -1;
240-
if (query_options.__isset.enable_spill && query_options.enable_spill == true) {
241-
spill_mem_limit_bytes = query_mem_limit * query_options.spill_mem_limit_threshold;
239+
if (query_options.__isset.enable_spill && query_options.enable_spill && option_query_mem_limit > 0) {
240+
spill_mem_limit_bytes = option_query_mem_limit * query_options.spill_mem_limit_threshold;
242241
}
243-
_query_ctx->init_mem_tracker(query_mem_limit, parent_mem_tracker, big_query_mem_limit, spill_mem_limit_bytes,
242+
_query_ctx->init_mem_tracker(option_query_mem_limit, parent_mem_tracker, big_query_mem_limit, spill_mem_limit_bytes,
244243
wg.get());
245244

246245
auto query_mem_tracker = _query_ctx->mem_tracker();

be/src/exec/pipeline/query_context.cpp

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -98,30 +98,6 @@ void QueryContext::cancel(const Status& status) {
9898
_fragment_mgr->cancel(status);
9999
}
100100

101-
int64_t QueryContext::compute_query_mem_limit(int64_t parent_mem_limit, int64_t per_instance_mem_limit,
102-
size_t pipeline_dop, int64_t option_query_mem_limit) {
103-
// no mem_limit
104-
if (per_instance_mem_limit <= 0 && option_query_mem_limit <= 0) {
105-
return -1;
106-
}
107-
108-
int64_t mem_limit;
109-
if (option_query_mem_limit > 0) {
110-
mem_limit = option_query_mem_limit;
111-
} else {
112-
mem_limit = per_instance_mem_limit;
113-
// query's mem_limit = per-instance mem_limit * num_instances * pipeline_dop
114-
static constexpr int64_t MEM_LIMIT_MAX = std::numeric_limits<int64_t>::max();
115-
if (MEM_LIMIT_MAX / total_fragments() / pipeline_dop > mem_limit) {
116-
mem_limit *= static_cast<int64_t>(total_fragments()) * pipeline_dop;
117-
} else {
118-
mem_limit = MEM_LIMIT_MAX;
119-
}
120-
}
121-
122-
return mem_limit;
123-
}
124-
125101
void QueryContext::init_mem_tracker(int64_t query_mem_limit, MemTracker* parent, int64_t big_query_mem_limit,
126102
int64_t spill_mem_limit, workgroup::WorkGroup* wg) {
127103
std::call_once(_init_mem_tracker_once, [=]() {

be/src/exec/pipeline/query_context.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,7 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
126126
DCHECK(_desc_tbl != nullptr);
127127
return _desc_tbl;
128128
}
129-
// If option_query_mem_limit > 0, use it directly.
130-
// Otherwise, use per_instance_mem_limit * num_fragments * pipeline_dop.
131-
int64_t compute_query_mem_limit(int64_t parent_mem_limit, int64_t per_instance_mem_limit, size_t pipeline_dop,
132-
int64_t option_query_mem_limit);
129+
133130
size_t total_fragments() { return _total_fragments; }
134131
/// Initialize the mem_tracker of this query.
135132
/// Positive `big_query_mem_limit` and non-null `wg` indicate

be/test/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ set(EXEC_FILES
3939
./exec/pipeline/pipeline_file_scan_node_test.cpp
4040
./exec/pipeline/pipeline_test_base.cpp
4141
./exec/pipeline/query_context_manger_test.cpp
42-
./exec/pipeline/query_context_test.cpp
4342
./exec/pipeline/table_function_operator_test.cpp
4443
./exec/query_cache/query_cache_test.cpp
4544
./exec/query_cache/transform_operator.cpp

be/test/exec/pipeline/query_context_test.cpp

Lines changed: 0 additions & 51 deletions
This file was deleted.

fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,8 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
9797

9898
/**
9999
* The mem limit of query on BE. It takes effects only when enabling pipeline engine.
100-
* - If `query_mem_limit` > 0, use it to limit the memory of a query.
101-
* The memory a query able to be used is just `query_mem_limit`.
102-
* - Otherwise, use `exec_mem_limit` to limit the memory of a query.
103-
* The memory a query able to be used is `exec_mem_limit * num_fragments * pipeline_dop`.
104-
* To maintain compatibility, the default value is 0.
100+
* If `query_mem_limit` > 0, use it to limit the memory of a query.
101+
* Otherwise, no limitation
105102
*/
106103
public static final String QUERY_MEM_LIMIT = "query_mem_limit";
107104

@@ -681,7 +678,9 @@ public static MaterializedViewRewriteMode parse(String str) {
681678
@VariableMgr.VarAttr(name = ENABLE_SHARED_SCAN)
682679
private boolean enableSharedScan = false;
683680

684-
// max memory used on every backend.
681+
// max memory used on each fragment instance
682+
// NOTE: only used for non-pipeline engine and stream_load
683+
// The pipeline engine uses the query_mem_limit
685684
public static final long DEFAULT_EXEC_MEM_LIMIT = 2147483648L;
686685
@VariableMgr.VarAttr(name = EXEC_MEM_LIMIT, flag = VariableMgr.INVISIBLE)
687686
public long maxExecMemByte = DEFAULT_EXEC_MEM_LIMIT;
@@ -2932,9 +2931,7 @@ public void setSkewJoinRandRange(int skewJoinRandRange) {
29322931
public TQueryOptions toThrift() {
29332932
TQueryOptions tResult = new TQueryOptions();
29342933
tResult.setMem_limit(maxExecMemByte);
2935-
if (queryMemLimit > 0) {
2936-
tResult.setQuery_mem_limit(queryMemLimit);
2937-
}
2934+
tResult.setQuery_mem_limit(queryMemLimit);
29382935

29392936
// Avoid integer overflow
29402937
tResult.setQuery_timeout(Math.min(Integer.MAX_VALUE / 1000, queryTimeoutS));

fe/fe-core/src/test/java/com/starrocks/qe/scheduler/JobSpecTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ public void testFromBrokerLoadJobSpec() throws Exception {
219219
jobSpec = coordinator.getJobSpec();
220220
Assert.assertEquals(loadMemLimit, jobSpec.getQueryOptions().getLoad_mem_limit());
221221
Assert.assertTrue(jobSpec.getQueryOptions().isSetMem_limit());
222-
Assert.assertFalse(jobSpec.getQueryOptions().isSetQuery_mem_limit());
222+
Assert.assertTrue(jobSpec.getQueryOptions().isSetQuery_mem_limit());
223223
}
224224

225225
@Test
@@ -289,7 +289,7 @@ public void testFromStreamLoadJobSpec() throws Exception {
289289
jobSpec = coordinator.getJobSpec();
290290
Assert.assertEquals(loadMemLimit, jobSpec.getQueryOptions().getLoad_mem_limit());
291291
Assert.assertTrue(jobSpec.getQueryOptions().isSetMem_limit());
292-
Assert.assertFalse(jobSpec.getQueryOptions().isSetQuery_mem_limit());
292+
Assert.assertTrue(jobSpec.getQueryOptions().isSetQuery_mem_limit());
293293
}
294294

295295
@Test

0 commit comments

Comments
 (0)