Skip to content

Commit 2dd7859

Browse files
Merge branch 'master' into master-fix12
2 parents 4f5bbf2 + 5676379 commit 2dd7859

File tree

396 files changed

+7555
-1295
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

396 files changed

+7555
-1295
lines changed

be/src/cloud/cloud_base_compaction.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,9 @@ Status CloudBaseCompaction::execute_compact() {
302302
.tag("output_segments", _output_rowset->num_segments())
303303
.tag("output_rowset_data_size", _output_rowset->data_disk_size())
304304
.tag("output_rowset_index_size", _output_rowset->index_disk_size())
305-
.tag("output_rowset_total_size", _output_rowset->total_disk_size());
305+
.tag("output_rowset_total_size", _output_rowset->total_disk_size())
306+
.tag("local_read_bytes", _local_read_bytes_total)
307+
.tag("remote_read_bytes", _remote_read_bytes_total);
306308

307309
//_compaction_succeed = true;
308310
_state = CompactionState::SUCCESS;

be/src/cloud/cloud_cumulative_compaction.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,9 @@ Status CloudCumulativeCompaction::execute_compact() {
226226
.tag("tablet_max_version", _tablet->max_version_unlocked())
227227
.tag("cumulative_point", cloud_tablet()->cumulative_layer_point())
228228
.tag("num_rowsets", cloud_tablet()->fetch_add_approximate_num_rowsets(0))
229-
.tag("cumu_num_rowsets", cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0));
229+
.tag("cumu_num_rowsets", cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0))
230+
.tag("local_read_bytes", _local_read_bytes_total)
231+
.tag("remote_read_bytes", _remote_read_bytes_total);
230232

231233
_state = CompactionState::SUCCESS;
232234

be/src/cloud/cloud_full_compaction.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,9 @@ Status CloudFullCompaction::execute_compact() {
182182
.tag("output_segments", _output_rowset->num_segments())
183183
.tag("output_rowset_data_size", _output_rowset->data_disk_size())
184184
.tag("output_rowset_index_size", _output_rowset->index_disk_size())
185-
.tag("output_rowset_total_size", _output_rowset->total_disk_size());
185+
.tag("output_rowset_total_size", _output_rowset->total_disk_size())
186+
.tag("local_read_bytes", _local_read_bytes_total)
187+
.tag("remote_read_bytes", _remote_read_bytes_total);
186188

187189
_state = CompactionState::SUCCESS;
188190

be/src/cloud/cloud_storage_engine.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
#include "olap/storage_policy.h"
5959
#include "runtime/memory/cache_manager.h"
6060
#include "util/parse_util.h"
61+
#include "util/time.h"
6162
#include "vec/common/assert_cast.h"
6263

6364
namespace doris {
@@ -445,14 +446,14 @@ void CloudStorageEngine::_compaction_tasks_producer_callback() {
445446

446447
int64_t interval = config::generate_compaction_tasks_interval_ms;
447448
do {
449+
int64_t cur_time = UnixMillis();
448450
if (!config::disable_auto_compaction) {
449451
Status st = _adjust_compaction_thread_num();
450452
if (!st.ok()) {
451453
break;
452454
}
453455

454456
bool check_score = false;
455-
int64_t cur_time = UnixMillis();
456457
if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) {
457458
compaction_type = CompactionType::CUMULATIVE_COMPACTION;
458459
round++;
@@ -504,6 +505,9 @@ void CloudStorageEngine::_compaction_tasks_producer_callback() {
504505
} else {
505506
interval = config::check_auto_compaction_interval_seconds * 1000;
506507
}
508+
int64_t end_time = UnixMillis();
509+
DorisMetrics::instance()->compaction_producer_callback_a_round_time->set_value(end_time -
510+
cur_time);
507511
} while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
508512
}
509513

be/src/cloud/cloud_tablet.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "cloud/cloud_tablet.h"
1919

20+
#include <bvar/bvar.h>
2021
#include <gen_cpp/olap_file.pb.h>
2122
#include <rapidjson/document.h>
2223
#include <rapidjson/encodings.h>
@@ -57,6 +58,11 @@ namespace doris {
5758
#include "common/compile_check_begin.h"
5859
using namespace ErrorCode;
5960

61+
bvar::LatencyRecorder g_cu_compaction_get_delete_bitmap_lock_time_ms(
62+
"cu_compaction_get_delete_bitmap_lock_time_ms");
63+
bvar::LatencyRecorder g_base_compaction_get_delete_bitmap_lock_time_ms(
64+
"base_compaction_get_delete_bitmap_lock_time_ms");
65+
6066
static constexpr int LOAD_INITIATOR_ID = -1;
6167

6268
CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta)
@@ -991,6 +997,11 @@ Status CloudTablet::calc_delete_bitmap_for_compaction(
991997
RETURN_IF_ERROR(_engine.meta_mgr().get_delete_bitmap_update_lock(
992998
*this, COMPACTION_DELETE_BITMAP_LOCK_ID, initiator));
993999
int64_t t2 = MonotonicMicros();
1000+
if (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION) {
1001+
g_cu_compaction_get_delete_bitmap_lock_time_ms << (t2 - t1) / 1000;
1002+
} else if (compaction_type == ReaderType::READER_BASE_COMPACTION) {
1003+
g_base_compaction_get_delete_bitmap_lock_time_ms << (t2 - t1) / 1000;
1004+
}
9941005
get_delete_bitmap_lock_start_time = t2;
9951006
RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this));
9961007
int64_t t3 = MonotonicMicros();

be/src/common/config.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -614,9 +614,9 @@ DEFINE_mInt32(olap_table_sink_send_interval_microseconds, "1000");
614614
DEFINE_mDouble(olap_table_sink_send_interval_auto_partition_factor, "0.001");
615615

616616
// Fragment thread pool
617-
DEFINE_Int32(fragment_mgr_asynic_work_pool_thread_num_min, "16");
618-
DEFINE_Int32(fragment_mgr_asynic_work_pool_thread_num_max, "512");
619-
DEFINE_Int32(fragment_mgr_asynic_work_pool_queue_size, "4096");
617+
DEFINE_Int32(fragment_mgr_async_work_pool_thread_num_min, "16");
618+
DEFINE_Int32(fragment_mgr_async_work_pool_thread_num_max, "512");
619+
DEFINE_Int32(fragment_mgr_async_work_pool_queue_size, "4096");
620620

621621
// Control the number of disks on the machine. If 0, this comes from the system settings.
622622
DEFINE_Int32(num_disks, "0");

be/src/common/config.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -650,9 +650,9 @@ DECLARE_mInt32(olap_table_sink_send_interval_microseconds);
650650
DECLARE_mDouble(olap_table_sink_send_interval_auto_partition_factor);
651651

652652
// Fragment thread pool
653-
DECLARE_Int32(fragment_mgr_asynic_work_pool_thread_num_min);
654-
DECLARE_Int32(fragment_mgr_asynic_work_pool_thread_num_max);
655-
DECLARE_Int32(fragment_mgr_asynic_work_pool_queue_size);
653+
DECLARE_Int32(fragment_mgr_async_work_pool_thread_num_min);
654+
DECLARE_Int32(fragment_mgr_async_work_pool_thread_num_max);
655+
DECLARE_Int32(fragment_mgr_async_work_pool_queue_size);
656656

657657
// Control the number of disks on the machine. If 0, this comes from the system settings.
658658
DECLARE_Int32(num_disks);

be/src/exec/es/es_scroll_parser.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -847,7 +847,8 @@ Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,
847847
break;
848848
}
849849
case TYPE_JSONB: {
850-
JsonBinaryValue binary_val(json_value_to_string(col));
850+
JsonBinaryValue binary_val;
851+
RETURN_IF_ERROR(binary_val.init_from_json_string(json_value_to_string(col)));
851852
vectorized::JsonbField json(binary_val.value(), binary_val.size());
852853
col_ptr->insert(vectorized::Field::create_field<TYPE_JSONB>(json));
853854
break;

be/src/exec/rowid_fetcher.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,9 @@ Status RowIDFetcher::_merge_rpc_results(const PMultiGetRequest& request,
173173
}
174174
}
175175
for (int i = 0; i < resp.binary_row_data_size(); ++i) {
176-
vectorized::JsonbSerializeUtil::jsonb_to_block(
176+
RETURN_IF_ERROR(vectorized::JsonbSerializeUtil::jsonb_to_block(
177177
serdes, resp.binary_row_data(i).data(), resp.binary_row_data(i).size(),
178-
col_uid_to_idx, *output_block, default_values, {});
178+
col_uid_to_idx, *output_block, default_values, {}));
179179
}
180180
return Status::OK();
181181
}
@@ -797,10 +797,10 @@ Status RowIdStorageReader::read_doris_format_row(
797797
},
798798
lookup_row_data_ms));
799799

800-
vectorized::JsonbSerializeUtil::jsonb_to_block(
800+
RETURN_IF_ERROR(vectorized::JsonbSerializeUtil::jsonb_to_block(
801801
row_store_read_struct.serdes, row_store_read_struct.row_store_buffer.data(),
802802
row_store_read_struct.row_store_buffer.size(), row_store_read_struct.col_uid_to_idx,
803-
result_block, row_store_read_struct.default_values, {});
803+
result_block, row_store_read_struct.default_values, {}));
804804
} else {
805805
for (int x = 0; x < slots.size(); ++x) {
806806
vectorized::MutableColumnPtr column =

be/src/exec/schema_scanner.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
#include "exec/schema_scanner/schema_active_queries_scanner.h"
3030
#include "exec/schema_scanner/schema_backend_active_tasks.h"
31+
#include "exec/schema_scanner/schema_backend_configuration_scanner.h"
3132
#include "exec/schema_scanner/schema_backend_kerberos_ticket_cache.h"
3233
#include "exec/schema_scanner/schema_catalog_meta_cache_stats_scanner.h"
3334
#include "exec/schema_scanner/schema_charsets_scanner.h"
@@ -69,7 +70,6 @@
6970
#include "vec/columns/column_nullable.h"
7071
#include "vec/columns/column_string.h"
7172
#include "vec/columns/column_vector.h"
72-
#include "vec/columns/columns_number.h"
7373
#include "vec/common/string_ref.h"
7474
#include "vec/core/block.h"
7575
#include "vec/core/column_with_type_and_name.h"
@@ -195,6 +195,8 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type
195195
return SchemaFilesScanner::create_unique();
196196
case TSchemaTableType::SCH_PARTITIONS:
197197
return SchemaPartitionsScanner::create_unique();
198+
case TSchemaTableType::SCH_BACKEND_CONFIGURATION:
199+
return SchemaBackendConfigurationScanner::create_unique();
198200
case TSchemaTableType::SCH_ROWSETS:
199201
return SchemaRowsetsScanner::create_unique();
200202
case TSchemaTableType::SCH_METADATA_NAME_IDS:
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "exec/schema_scanner/schema_backend_configuration_scanner.h"
19+
20+
#include <gen_cpp/Descriptors_types.h>
21+
22+
#include <string>
23+
24+
#include "runtime/define_primitive_type.h"
25+
#include "runtime/exec_env.h"
26+
#include "runtime/runtime_state.h"
27+
#include "vec/common/string_ref.h"
28+
#include "vec/core/block.h"
29+
30+
namespace doris {
31+
32+
std::vector<SchemaScanner::ColumnDesc> SchemaBackendConfigurationScanner::_s_tbls_columns = {
33+
// name, type, size, is_null
34+
{"BE_ID", TYPE_BIGINT, sizeof(int64_t), true},
35+
{"CONFIG_NAME", TYPE_STRING, sizeof(StringRef), true},
36+
{"CONFIG_TYPE", TYPE_STRING, sizeof(StringRef), true},
37+
{"CONFIG_VALUE", TYPE_STRING, sizeof(StringRef), true},
38+
{"IS_MUTABLE", TYPE_BOOLEAN, sizeof(bool), true}};
39+
40+
SchemaBackendConfigurationScanner::SchemaBackendConfigurationScanner()
41+
: SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_BACKEND_CONFIGURATION),
42+
_backend_id(ExecEnv::GetInstance()->cluster_info()->backend_id) {}
43+
44+
SchemaBackendConfigurationScanner::~SchemaBackendConfigurationScanner() = default;
45+
46+
Status SchemaBackendConfigurationScanner::start(doris::RuntimeState* state) {
47+
_config_infos = config::get_config_info();
48+
return Status::OK();
49+
}
50+
51+
Status SchemaBackendConfigurationScanner::get_next_block_internal(vectorized::Block* block,
52+
bool* eos) {
53+
if (!_is_init) {
54+
return Status::InternalError("Used before initialized.");
55+
}
56+
57+
if (nullptr == block || nullptr == eos) {
58+
return Status::InternalError("input pointer is nullptr.");
59+
}
60+
61+
*eos = true;
62+
if (_config_infos.empty()) {
63+
return Status::OK();
64+
}
65+
66+
for (size_t col_idx = 0; col_idx < _s_tbls_columns.size(); ++col_idx) {
67+
size_t row_num = _config_infos.size();
68+
std::vector<StringRef> str_refs(row_num);
69+
std::vector<int8_t> bool_vals(row_num);
70+
std::vector<void*> datas(row_num);
71+
std::vector<std::string> column_values(row_num);
72+
73+
for (size_t row_idx = 0; row_idx < row_num; ++row_idx) {
74+
// be_id
75+
if (col_idx == 0) {
76+
datas[row_idx] = &_backend_id;
77+
} else {
78+
// config
79+
const auto& row = _config_infos[row_idx];
80+
if (row.size() != _s_tbls_columns.size() - 1) {
81+
return Status::InternalError(
82+
"backend configs info meet invalid schema, schema_size={}, "
83+
"input_data_size={}",
84+
_config_infos.size(), row.size());
85+
}
86+
87+
std::string& column_value =
88+
column_values[row_idx]; // Reference to the actual string in the vector
89+
column_value = row[col_idx - 1];
90+
if (_s_tbls_columns[col_idx].type == TYPE_BOOLEAN) {
91+
bool_vals[row_idx] = column_value == "true" ? 1 : 0;
92+
datas[row_idx] = &bool_vals[row_idx];
93+
} else {
94+
str_refs[row_idx] =
95+
StringRef(column_values[row_idx].data(), column_values[row_idx].size());
96+
datas[row_idx] = &str_refs[row_idx];
97+
}
98+
}
99+
}
100+
RETURN_IF_ERROR(fill_dest_column_for_range(block, col_idx, datas));
101+
}
102+
return Status::OK();
103+
}
104+
} // namespace doris
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <vector>
21+
22+
#include "common/status.h"
23+
#include "exec/schema_scanner.h"
24+
25+
namespace doris {
26+
class RuntimeState;
27+
namespace vectorized {
28+
class Block;
29+
} // namespace vectorized
30+
31+
class SchemaBackendConfigurationScanner : public SchemaScanner {
32+
ENABLE_FACTORY_CREATOR(SchemaBackendConfigurationScanner);
33+
34+
public:
35+
SchemaBackendConfigurationScanner();
36+
~SchemaBackendConfigurationScanner() override;
37+
38+
Status start(RuntimeState* state) override;
39+
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
40+
41+
static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
42+
43+
private:
44+
int64_t _backend_id;
45+
std::vector<std::vector<std::string>> _config_infos;
46+
};
47+
} // namespace doris

be/src/exec/schema_scanner/schema_helper.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,4 +151,13 @@ Status SchemaHelper::fetch_routine_load_job(const std::string& ip, const int32_t
151151
});
152152
}
153153

154+
Status SchemaHelper::fetch_schema_table_data(const std::string& ip, const int32_t port,
155+
const TFetchSchemaTableDataRequest& request,
156+
TFetchSchemaTableDataResult* result) {
157+
return ThriftRpcHelper::rpc<FrontendServiceClient>(
158+
ip, port, [&request, &result](FrontendServiceConnection& client) {
159+
client->fetchSchemaTableData(*result, request);
160+
});
161+
}
162+
154163
} // namespace doris

be/src/exec/schema_scanner/schema_helper.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ class TDescribeTablesParams;
2828
class TDescribeTablesResult;
2929
class TFetchRoutineLoadJobRequest;
3030
class TFetchRoutineLoadJobResult;
31+
class TFetchSchemaTableDataRequest;
32+
class TFetchSchemaTableDataResult;
3133
class TGetDbsParams;
3234
class TGetDbsResult;
3335
class TGetTablesParams;
@@ -90,6 +92,10 @@ class SchemaHelper {
9092
static Status fetch_routine_load_job(const std::string& ip, const int32_t port,
9193
const TFetchRoutineLoadJobRequest& request,
9294
TFetchRoutineLoadJobResult* result);
95+
96+
static Status fetch_schema_table_data(const std::string& ip, const int32_t port,
97+
const TFetchSchemaTableDataRequest& request,
98+
TFetchSchemaTableDataResult* result);
9399
};
94100

95101
} // namespace doris

0 commit comments

Comments
 (0)