Skip to content

[Enhancement] ignore the exec_mem_limit in pipeline execution #34120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ Status FragmentExecutor::_prepare_runtime_state(ExecEnv* exec_env, const Unified
const auto& query_globals = request.common().query_globals;
const auto& query_options = request.common().query_options;
const auto& t_desc_tbl = request.common().desc_tbl;
const int32_t degree_of_parallelism = _calc_dop(exec_env, request);
auto& wg = _wg;

_fragment_ctx->set_runtime_state(
Expand All @@ -227,17 +226,17 @@ Status FragmentExecutor::_prepare_runtime_state(ExecEnv* exec_env, const Unified
runtime_state->set_fragment_ctx(_fragment_ctx.get());
runtime_state->set_query_ctx(_query_ctx);

// Only consider the `query_mem_limit` variable
// If query_mem_limit is <= 0, it would set to -1, which means no limit
auto* parent_mem_tracker = wg->mem_tracker();
auto per_instance_mem_limit = query_options.__isset.mem_limit ? query_options.mem_limit : -1;
auto option_query_mem_limit = query_options.__isset.query_mem_limit ? query_options.query_mem_limit : -1;
int64_t query_mem_limit = _query_ctx->compute_query_mem_limit(parent_mem_tracker->limit(), per_instance_mem_limit,
degree_of_parallelism, option_query_mem_limit);
int64_t option_query_mem_limit = query_options.__isset.query_mem_limit ? query_options.query_mem_limit : -1;
if (option_query_mem_limit <= 0) option_query_mem_limit = -1;
int64_t big_query_mem_limit = wg->use_big_query_mem_limit() ? wg->big_query_mem_limit() : -1;
int64_t spill_mem_limit_bytes = -1;
if (query_options.__isset.enable_spill && query_options.enable_spill == true) {
spill_mem_limit_bytes = query_mem_limit * query_options.spill_mem_limit_threshold;
if (query_options.__isset.enable_spill && query_options.enable_spill && option_query_mem_limit > 0) {
spill_mem_limit_bytes = option_query_mem_limit * query_options.spill_mem_limit_threshold;
}
_query_ctx->init_mem_tracker(query_mem_limit, parent_mem_tracker, big_query_mem_limit, spill_mem_limit_bytes,
_query_ctx->init_mem_tracker(option_query_mem_limit, parent_mem_tracker, big_query_mem_limit, spill_mem_limit_bytes,
wg.get());

auto query_mem_tracker = _query_ctx->mem_tracker();
Expand Down
24 changes: 0 additions & 24 deletions be/src/exec/pipeline/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,30 +98,6 @@ void QueryContext::cancel(const Status& status) {
_fragment_mgr->cancel(status);
}

int64_t QueryContext::compute_query_mem_limit(int64_t parent_mem_limit, int64_t per_instance_mem_limit,
size_t pipeline_dop, int64_t option_query_mem_limit) {
// no mem_limit
if (per_instance_mem_limit <= 0 && option_query_mem_limit <= 0) {
return -1;
}

int64_t mem_limit;
if (option_query_mem_limit > 0) {
mem_limit = option_query_mem_limit;
} else {
mem_limit = per_instance_mem_limit;
// query's mem_limit = per-instance mem_limit * num_instances * pipeline_dop
static constexpr int64_t MEM_LIMIT_MAX = std::numeric_limits<int64_t>::max();
if (MEM_LIMIT_MAX / total_fragments() / pipeline_dop > mem_limit) {
mem_limit *= static_cast<int64_t>(total_fragments()) * pipeline_dop;
} else {
mem_limit = MEM_LIMIT_MAX;
}
}

return mem_limit;
}

void QueryContext::init_mem_tracker(int64_t query_mem_limit, MemTracker* parent, int64_t big_query_mem_limit,
int64_t spill_mem_limit, workgroup::WorkGroup* wg) {
std::call_once(_init_mem_tracker_once, [=]() {
Expand Down
5 changes: 1 addition & 4 deletions be/src/exec/pipeline/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,7 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
DCHECK(_desc_tbl != nullptr);
return _desc_tbl;
}
// If option_query_mem_limit > 0, use it directly.
// Otherwise, use per_instance_mem_limit * num_fragments * pipeline_dop.
int64_t compute_query_mem_limit(int64_t parent_mem_limit, int64_t per_instance_mem_limit, size_t pipeline_dop,
int64_t option_query_mem_limit);

size_t total_fragments() { return _total_fragments; }
/// Initialize the mem_tracker of this query.
/// Positive `big_query_mem_limit` and non-null `wg` indicate
Expand Down
1 change: 0 additions & 1 deletion be/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ set(EXEC_FILES
./exec/pipeline/pipeline_file_scan_node_test.cpp
./exec/pipeline/pipeline_test_base.cpp
./exec/pipeline/query_context_manger_test.cpp
./exec/pipeline/query_context_test.cpp
./exec/pipeline/table_function_operator_test.cpp
./exec/query_cache/query_cache_test.cpp
./exec/query_cache/transform_operator.cpp
Expand Down
51 changes: 0 additions & 51 deletions be/test/exec/pipeline/query_context_test.cpp

This file was deleted.

15 changes: 6 additions & 9 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,8 @@ public class SessionVariable implements Serializable, Writable, Cloneable {

/**
* The mem limit of query on BE. It takes effects only when enabling pipeline engine.
* - If `query_mem_limit` > 0, use it to limit the memory of a query.
* The memory a query able to be used is just `query_mem_limit`.
* - Otherwise, use `exec_mem_limit` to limit the memory of a query.
* The memory a query able to be used is `exec_mem_limit * num_fragments * pipeline_dop`.
* To maintain compatibility, the default value is 0.
* If `query_mem_limit` > 0, use it to limit the memory of a query.
* Otherwise, no limitation
Comment on lines +100 to +101
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if set query_mem_limit = 1, can a query run?

*/
public static final String QUERY_MEM_LIMIT = "query_mem_limit";

Expand Down Expand Up @@ -681,7 +678,9 @@ public static MaterializedViewRewriteMode parse(String str) {
@VariableMgr.VarAttr(name = ENABLE_SHARED_SCAN)
private boolean enableSharedScan = false;

// max memory used on every backend.
// max memory used on each fragment instance
// NOTE: only used for non-pipeline engine and stream_load
// The pipeline engine uses the query_mem_limit
public static final long DEFAULT_EXEC_MEM_LIMIT = 2147483648L;
@VariableMgr.VarAttr(name = EXEC_MEM_LIMIT, flag = VariableMgr.INVISIBLE)
public long maxExecMemByte = DEFAULT_EXEC_MEM_LIMIT;
Expand Down Expand Up @@ -2932,9 +2931,7 @@ public void setSkewJoinRandRange(int skewJoinRandRange) {
public TQueryOptions toThrift() {
TQueryOptions tResult = new TQueryOptions();
tResult.setMem_limit(maxExecMemByte);
if (queryMemLimit > 0) {
tResult.setQuery_mem_limit(queryMemLimit);
}
tResult.setQuery_mem_limit(queryMemLimit);

// Avoid integer overflow
tResult.setQuery_timeout(Math.min(Integer.MAX_VALUE / 1000, queryTimeoutS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public void testFromBrokerLoadJobSpec() throws Exception {
jobSpec = coordinator.getJobSpec();
Assert.assertEquals(loadMemLimit, jobSpec.getQueryOptions().getLoad_mem_limit());
Assert.assertTrue(jobSpec.getQueryOptions().isSetMem_limit());
Assert.assertFalse(jobSpec.getQueryOptions().isSetQuery_mem_limit());
Assert.assertTrue(jobSpec.getQueryOptions().isSetQuery_mem_limit());
}

@Test
Expand Down Expand Up @@ -289,7 +289,7 @@ public void testFromStreamLoadJobSpec() throws Exception {
jobSpec = coordinator.getJobSpec();
Assert.assertEquals(loadMemLimit, jobSpec.getQueryOptions().getLoad_mem_limit());
Assert.assertTrue(jobSpec.getQueryOptions().isSetMem_limit());
Assert.assertFalse(jobSpec.getQueryOptions().isSetQuery_mem_limit());
Assert.assertTrue(jobSpec.getQueryOptions().isSetQuery_mem_limit());
}

@Test
Expand Down