Skip to content

Commit a40e2f8

Browse files
[BugFix] Fix the problem of column mode partial update in cross cluster replication (backport #40692) (#40933)
Co-authored-by: xiangguangyxg <[email protected]>
1 parent 8134704 commit a40e2f8

File tree

8 files changed

+121
-14
lines changed

8 files changed

+121
-14
lines changed

be/src/agent/publish_version.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ void run_publish_version_task(ThreadPoolToken* token, const TPublishVersionReque
124124
tablet_span->SetAttribute("txn_id", transaction_id);
125125
tablet_span->SetAttribute("tablet_id", task.tablet_id);
126126
tablet_span->SetAttribute("version", task.version);
127-
if (!task.rowset) {
127+
if (!is_replication_txn && !task.rowset) {
128128
task.st = Status::NotFound(
129129
fmt::format("rowset not found of tablet: {}, txn_id: {}", task.tablet_id, task.txn_id));
130130
LOG(WARNING) << task.st;

be/src/storage/delta_column_group.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ class DeltaColumnGroup {
6464
}
6565
return column_files;
6666
}
67+
68+
std::vector<std::vector<uint32_t>>& column_ids() { return _column_ids; }
6769
const std::vector<std::vector<uint32_t>>& column_ids() const { return _column_ids; }
6870
int64_t version() const { return _version; }
6971

be/src/storage/lake/replication_txn_manager.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,10 @@ Status ReplicationTxnManager::replicate_remote_snapshot(const TReplicateSnapshot
378378
Status ReplicationTxnManager::convert_rowset_meta(const RowsetMeta& rowset_meta, TTransactionId transaction_id,
379379
TxnLogPB::OpWrite* op_write,
380380
std::unordered_map<std::string, std::string>* filename_map) {
381+
if (rowset_meta.is_column_mode_partial_update()) {
382+
return Status::NotSupported("Column mode partial update is not supported in shared-data mode");
383+
}
384+
381385
// Convert rowset metadata
382386
auto* rowset_metadata = op_write->mutable_rowset();
383387
rowset_metadata->set_id(rowset_meta.get_rowset_seg_id());

be/src/storage/replication_txn_manager.cpp

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,9 @@ Status ReplicationTxnManager::replicate_remote_snapshot(const TReplicateSnapshot
519519
WritableFileOptions opts{.sync_on_close = true, .mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE};
520520
ASSIGN_OR_RETURN(auto output_file, fs::new_writable_file(opts, tablet_snapshot_dir_path + file_name));
521521

522-
if (HasSuffixString(file_name, ".dat") && !column_unique_id_map.empty()) {
522+
if (!column_unique_id_map.empty() &&
523+
(HasSuffixString(file_name, ".dat") || HasSuffixString(file_name, ".upt") ||
524+
HasSuffixString(file_name, ".cols"))) {
523525
return std::make_unique<SegmentStreamConverter>(file_name, file_size, std::move(output_file),
524526
&column_unique_id_map);
525527
}
@@ -559,7 +561,10 @@ static Status convert_rowset_meta_pb(RowsetMetaPB* rowset_meta_pb,
559561
Status ReplicationTxnManager::convert_snapshot_for_none_primary(
560562
const std::string& tablet_snapshot_path, std::unordered_map<uint32_t, uint32_t>* column_unique_id_map,
561563
const TReplicateSnapshotRequest& request) {
562-
std::string src_header_file_path = tablet_snapshot_path + std::to_string(request.src_tablet_id) + ".hdr";
564+
std::string src_tablet_id_path = tablet_snapshot_path + std::to_string(request.src_tablet_id);
565+
std::string src_header_file_path = src_tablet_id_path + ".hdr";
566+
std::string src_dcgs_snapshot_file_path = src_tablet_id_path + ".dcgs_snapshot";
567+
563568
TabletMeta tablet_meta;
564569
RETURN_IF_ERROR(tablet_meta.create_from_file(src_header_file_path));
565570

@@ -589,6 +594,33 @@ Status ReplicationTxnManager::convert_snapshot_for_none_primary(
589594
}
590595
}
591596

597+
if (fs::path_exist(src_dcgs_snapshot_file_path)) {
598+
DeltaColumnGroupSnapshotPB dcg_snapshot_pb;
599+
RETURN_IF_ERROR(DeltaColumnGroupListHelper::parse_snapshot(src_dcgs_snapshot_file_path, dcg_snapshot_pb));
600+
for (auto& tablet_id : *dcg_snapshot_pb.mutable_tablet_id()) {
601+
tablet_id = request.tablet_id;
602+
}
603+
for (auto& dcg_list : *dcg_snapshot_pb.mutable_dcg_lists()) {
604+
for (auto& dcg : *dcg_list.mutable_dcgs()) {
605+
for (auto& dcg_column_ids : *dcg.mutable_column_ids()) {
606+
RETURN_IF_ERROR(ReplicationUtils::convert_column_unique_ids(dcg_column_ids.mutable_column_ids(),
607+
*column_unique_id_map));
608+
}
609+
}
610+
}
611+
612+
std::string dcgs_snapshot_file_path =
613+
tablet_snapshot_path + std::to_string(request.tablet_id) + ".dcgs_snapshot";
614+
RETURN_IF_ERROR(DeltaColumnGroupListHelper::save_snapshot(dcgs_snapshot_file_path, dcg_snapshot_pb));
615+
616+
if (request.tablet_id != request.src_tablet_id) {
617+
auto status = fs::delete_file(src_dcgs_snapshot_file_path);
618+
if (!status.ok()) {
619+
LOG(WARNING) << "Failed to delete file: " << src_dcgs_snapshot_file_path << ", " << status;
620+
}
621+
}
622+
}
623+
592624
RETURN_IF_ERROR(SnapshotManager::instance()->convert_rowset_ids(tablet_snapshot_path, request.tablet_id,
593625
request.schema_hash));
594626
return Status::OK();
@@ -616,6 +648,14 @@ Status ReplicationTxnManager::convert_snapshot_for_primary(const std::string& ta
616648
RETURN_IF_ERROR(convert_rowset_meta_pb(&rowset_meta, column_unique_id_map, request));
617649
}
618650

651+
for (auto& [segment_id, dcg_list] : snapshot_meta.delta_column_groups()) {
652+
for (auto& dcg : dcg_list) {
653+
for (auto& dcg_column_ids : dcg->column_ids()) {
654+
RETURN_IF_ERROR(ReplicationUtils::convert_column_unique_ids(&dcg_column_ids, *column_unique_id_map));
655+
}
656+
}
657+
}
658+
619659
RETURN_IF_ERROR(snapshot_meta.serialize_to_file(snapshot_meta_file_path));
620660

621661
RETURN_IF_ERROR(SnapshotManager::instance()->assign_new_rowset_id(&snapshot_meta, tablet_snapshot_path));

be/src/storage/replication_utils.cpp

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -336,14 +336,9 @@ StatusOr<std::string> ReplicationUtils::download_remote_snapshot_file(
336336

337337
Status ReplicationUtils::convert_rowset_txn_meta(RowsetTxnMetaPB* rowset_txn_meta,
338338
const std::unordered_map<uint32_t, uint32_t>& column_unique_id_map) {
339-
for (auto& column_unique_id : *rowset_txn_meta->mutable_partial_update_column_unique_ids()) {
340-
auto iter = column_unique_id_map.find(column_unique_id);
341-
if (iter == column_unique_id_map.end()) {
342-
LOG(ERROR) << "Column not found, column unique id: " << column_unique_id;
343-
return Status::InternalError("Column not found");
344-
}
345-
column_unique_id = iter->second;
346-
}
339+
RETURN_IF_ERROR(convert_column_unique_ids(rowset_txn_meta->mutable_partial_update_column_unique_ids(),
340+
column_unique_id_map));
341+
347342
if (rowset_txn_meta->has_auto_increment_partial_update_column_uid()) {
348343
auto iter = column_unique_id_map.find(rowset_txn_meta->auto_increment_partial_update_column_uid());
349344
if (iter == column_unique_id_map.end()) {

be/src/storage/replication_utils.h

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class ReplicationUtils {
4545
TSchemaHash remote_schema_hash,
4646
const std::string& file_name, uint64_t timeout_sec);
4747

48-
static constexpr uint32_t kFakeColumnUniqueId = -1;
48+
static constexpr uint32_t kFakeColumnUniqueId = INT_MAX;
4949

5050
template <typename T>
5151
static void calc_column_unique_id_map(const T& source_columns, const T& target_columns,
@@ -68,7 +68,7 @@ class ReplicationUtils {
6868
}
6969

7070
if (need_convert) {
71-
column_unique_id_map->emplace(kFakeColumnUniqueId, 0);
71+
column_unique_id_map->emplace(kFakeColumnUniqueId, INT_MAX);
7272
} else {
7373
column_unique_id_map->clear();
7474
}
@@ -86,10 +86,29 @@ class ReplicationUtils {
8686
column.set_unique_id(iter->second);
8787
} else {
8888
uint32_t column_unique_id = --column_unique_id_map->operator[](kFakeColumnUniqueId);
89-
column.set_unique_id(column_unique_id);
9089
column_unique_id_map->emplace(column.unique_id(), column_unique_id);
90+
column.set_unique_id(column_unique_id);
91+
}
92+
}
93+
}
94+
95+
template <typename T>
96+
static Status convert_column_unique_ids(T* column_unique_ids,
97+
const std::unordered_map<uint32_t, uint32_t>& column_unique_id_map) {
98+
if (column_unique_id_map.empty()) {
99+
return Status::OK();
100+
}
101+
102+
for (auto& column_unique_id : *column_unique_ids) {
103+
auto iter = column_unique_id_map.find(column_unique_id);
104+
if (iter == column_unique_id_map.end()) {
105+
LOG(ERROR) << "Column not found, column unique id: " << column_unique_id;
106+
return Status::InternalError("Column not found");
91107
}
108+
column_unique_id = iter->second;
92109
}
110+
111+
return Status::OK();
93112
}
94113

95114
static Status convert_rowset_txn_meta(RowsetTxnMetaPB* rowset_txn_meta,

be/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ set(EXEC_FILES
279279
./storage/push_handler_test.cpp
280280
./storage/range_test.cpp
281281
./storage/replication_txn_manager_test.cpp
282+
./storage/replication_utils_test.cpp
282283
./storage/segment_stream_converter_test.cpp
283284
./storage/row_source_mask_test.cpp
284285
./storage/union_iterator_test.cpp
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright 2021-present StarRocks, Inc. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "storage/replication_utils.h"
16+
17+
#include <gtest/gtest.h>
18+
19+
namespace starrocks {
20+
21+
class ReplicationUtilsTest : public testing::Test {
22+
public:
23+
ReplicationUtilsTest() = default;
24+
~ReplicationUtilsTest() override = default;
25+
26+
void SetUp() override {}
27+
void TearDown() override {}
28+
};
29+
30+
TEST_F(ReplicationUtilsTest, test_convert_column_unique_ids) {
31+
std::vector<uint32_t> column_unique_ids = {1, 2};
32+
std::unordered_map<uint32_t, uint32_t> column_unique_id_map = {{1, 10}, {2, 20}};
33+
34+
auto status = ReplicationUtils::convert_column_unique_ids(&column_unique_ids, column_unique_id_map);
35+
EXPECT_TRUE(status.ok()) << status;
36+
37+
column_unique_id_map.erase(1);
38+
status = ReplicationUtils::convert_column_unique_ids(&column_unique_ids, column_unique_id_map);
39+
EXPECT_FALSE(status.ok()) << status;
40+
41+
column_unique_id_map.clear();
42+
status = ReplicationUtils::convert_column_unique_ids(&column_unique_ids, column_unique_id_map);
43+
EXPECT_TRUE(status.ok()) << status;
44+
}
45+
46+
} // namespace starrocks

0 commit comments

Comments
 (0)