Skip to content

Commit 6eee96f

Browse files
authored
[Enhancement] Skip read pindex when we pick all rowsets to do compaction for primary key table(backport #36819) (#37436)
Why I'm doing: At the last step of primary key table, we need to update pindex and we will read the pindex first and update pindex. If we pick all rowsets to compaction, read pindex is not necessary and we could rebuild pindex directly, it could reduce the disk read IO. What I'm doing: Skip pindex read and rebuild pindex directly if compaction pick all rowsets. Signed-off-by: zhangqiang <[email protected]>
1 parent 6b3185b commit 6eee96f

File tree

6 files changed

+187
-27
lines changed

6 files changed

+187
-27
lines changed

be/src/common/config.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ CONF_mInt64(max_compaction_candidate_num, "40960");
308308

309309
CONF_mInt32(update_compaction_check_interval_seconds, "10");
310310
CONF_mInt32(update_compaction_num_threads_per_disk, "1");
311-
CONF_Int32(update_compaction_per_tablet_min_interval_seconds, "120"); // 2min
311+
CONF_mInt32(update_compaction_per_tablet_min_interval_seconds, "120"); // 2min
312312
CONF_mInt64(max_update_compaction_num_singleton_deltas, "1000");
313313
CONF_mInt64(update_compaction_size_threshold, "268435456");
314314
CONF_mInt64(update_compaction_result_bytes, "1073741824");
@@ -987,6 +987,9 @@ CONF_mInt64(pindex_major_compaction_schedule_interval_seconds, "15");
987987
// enable persistent index compression
988988
CONF_mBool(enable_pindex_compression, "false");
989989

990+
// If primary compaction pick all rowsets, we could rebuild pindex directly and skip read from index.
991+
CONF_mBool(enable_pindex_rebuild_in_compaction, "false");
992+
990993
// Used by query cache, cache entries are evicted when it exceeds its capacity(500MB in default)
991994
CONF_Int64(query_cache_capacity, "536870912");
992995

be/src/storage/persistent_index.cpp

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2076,6 +2076,17 @@ void ShardByLengthMutableIndex::clear() {
20762076
}
20772077
}
20782078

2079+
Status ShardByLengthMutableIndex::create_index_file(std::string& path) {
2080+
if (_index_file != nullptr) {
2081+
std::string msg = strings::Substitute("l0 index file already exist: $0", _index_file->filename());
2082+
return Status::InternalError(msg);
2083+
}
2084+
ASSIGN_OR_RETURN(_fs, FileSystem::CreateSharedFromString(_path));
2085+
WritableFileOptions wblock_opts{.sync_on_close = true, .mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE};
2086+
ASSIGN_OR_RETURN(_index_file, _fs->new_writable_file(wblock_opts, path));
2087+
return Status::OK();
2088+
}
2089+
20792090
#ifdef __SSE2__
20802091

20812092
#include <emmintrin.h>
@@ -2962,6 +2973,7 @@ Status PersistentIndex::load_from_tablet(Tablet* tablet) {
29622973
index_meta.clear_l2_versions();
29632974
index_meta.clear_l2_version_merged();
29642975
index_meta.set_key_size(_key_size);
2976+
index_meta.set_size(0);
29652977
index_meta.set_format_version(PERSISTENT_INDEX_VERSION_4);
29662978
lastest_applied_version.to_pb(index_meta.mutable_version());
29672979
MutableIndexMetaPB* l0_meta = index_meta.mutable_l0_meta();
@@ -4509,12 +4521,18 @@ Status PersistentIndex::TEST_major_compaction(PersistentIndexMetaPB& index_meta)
45094521
// 2. merge l2 files to new l2 file
45104522
// 3. modify PersistentIndexMetaPB and make this step atomic.
45114523
Status PersistentIndex::major_compaction(Tablet* tablet) {
4524+
if (_cancel_major_compaction) {
4525+
return Status::InternalError("cancel major compaction");
4526+
}
45124527
bool expect_running_state = false;
45134528
if (!_major_compaction_running.compare_exchange_strong(expect_running_state, true)) {
45144529
// already in compaction
45154530
return Status::OK();
45164531
}
4517-
DeferOp defer([&]() { _major_compaction_running.store(false); });
4532+
DeferOp defer([&]() {
4533+
_major_compaction_running.store(false);
4534+
_cancel_major_compaction = false;
4535+
});
45184536
// merge all l2 files
45194537
PersistentIndexMetaPB prev_index_meta;
45204538
RETURN_IF_ERROR(
@@ -4540,6 +4558,9 @@ Status PersistentIndex::major_compaction(Tablet* tablet) {
45404558
// 3. modify PersistentIndexMetaPB and reload index, protected by index lock
45414559
{
45424560
std::lock_guard lg(*tablet->updates()->get_index_lock());
4561+
if (_cancel_major_compaction) {
4562+
return Status::OK();
4563+
}
45434564
PersistentIndexMetaPB index_meta;
45444565
RETURN_IF_ERROR(
45454566
TabletMetaManager::get_persistent_index_meta(tablet->data_dir(), tablet->tablet_id(), &index_meta));
@@ -4618,4 +4639,58 @@ double PersistentIndex::get_write_amp_score() const {
46184639
}
46194640
}
46204641

4642+
Status PersistentIndex::reset(Tablet* tablet, EditVersion version, PersistentIndexMetaPB* index_meta) {
4643+
std::unique_lock wrlock(_lock);
4644+
_cancel_major_compaction = true;
4645+
4646+
const TabletSchema& tablet_schema = tablet->tablet_schema();
4647+
vector<ColumnId> pk_columns(tablet_schema.num_key_columns());
4648+
for (auto i = 0; i < tablet_schema.num_key_columns(); i++) {
4649+
pk_columns[i] = (ColumnId)i;
4650+
}
4651+
auto pkey_schema = ChunkHelper::convert_schema(tablet_schema, pk_columns);
4652+
size_t fix_size = PrimaryKeyEncoder::get_encoded_fixed_size(pkey_schema);
4653+
4654+
if (_l0) {
4655+
_l0.reset();
4656+
}
4657+
RETURN_IF_ERROR(create(fix_size, version));
4658+
4659+
_l1_vec.clear();
4660+
_usage_and_size_by_key_length.clear();
4661+
_l1_merged_num.clear();
4662+
_l2_versions.clear();
4663+
_l2_vec.clear();
4664+
_has_l1 = false;
4665+
_dump_snapshot = true;
4666+
4667+
std::string file_path = get_l0_index_file_name(_path, version);
4668+
RETURN_IF_ERROR(_l0->create_index_file(file_path));
4669+
RETURN_IF_ERROR(_reload_usage_and_size_by_key_length(0, 0, false));
4670+
4671+
index_meta->clear_l0_meta();
4672+
index_meta->clear_l1_version();
4673+
index_meta->clear_l2_versions();
4674+
index_meta->clear_l2_version_merged();
4675+
index_meta->set_key_size(_key_size);
4676+
index_meta->set_size(0);
4677+
index_meta->set_format_version(PERSISTENT_INDEX_VERSION_4);
4678+
version.to_pb(index_meta->mutable_version());
4679+
MutableIndexMetaPB* l0_meta = index_meta->mutable_l0_meta();
4680+
l0_meta->clear_wals();
4681+
IndexSnapshotMetaPB* snapshot = l0_meta->mutable_snapshot();
4682+
version.to_pb(snapshot->mutable_version());
4683+
PagePointerPB* data = snapshot->mutable_data();
4684+
data->set_offset(0);
4685+
data->set_size(0);
4686+
4687+
return Status::OK();
4688+
}
4689+
4690+
void PersistentIndex::reset_cancel_major_compaction() {
4691+
if (!_major_compaction_running.load(std::memory_order_relaxed)) {
4692+
_cancel_major_compaction = false;
4693+
}
4694+
}
4695+
46214696
} // namespace starrocks

be/src/storage/persistent_index.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,8 @@ class ShardByLengthMutableIndex {
333333

334334
void clear();
335335

336+
Status create_index_file(std::string& path);
337+
336338
static StatusOr<std::unique_ptr<ShardByLengthMutableIndex>> create(size_t key_size, const std::string& path);
337339

338340
private:
@@ -671,6 +673,10 @@ class PersistentIndex {
671673
return res;
672674
}
673675

676+
Status reset(Tablet* tablet, EditVersion version, PersistentIndexMetaPB* index_meta);
677+
678+
void reset_cancel_major_compaction();
679+
674680
protected:
675681
Status _delete_expired_index_file(const EditVersion& l0_version, const EditVersion& l1_version,
676682
const EditVersionWithMerge& min_l2_version);
@@ -776,6 +782,9 @@ class PersistentIndex {
776782

777783
bool _dump_snapshot = false;
778784
bool _flushed = false;
785+
bool _cancel_major_compaction = false;
786+
787+
private:
779788
bool _need_bloom_filter = false;
780789

781790
mutable std::mutex _get_lock;

be/src/storage/primary_index.cpp

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1276,9 +1276,10 @@ Status PrimaryIndex::insert(uint32_t rssid, const vector<uint32_t>& rowids, cons
12761276
DCHECK(_status.ok() && (_pkey_to_rssid_rowid || _persistent_index));
12771277
if (_persistent_index != nullptr) {
12781278
auto scope = IOProfiler::scope(IOProfiler::TAG_PKINDEX, _tablet_id);
1279-
_insert_into_persistent_index(rssid, rowids, pks);
1279+
return _insert_into_persistent_index(rssid, rowids, pks);
1280+
} else {
1281+
return _pkey_to_rssid_rowid->insert(rssid, rowids, pks, 0, pks.size());
12801282
}
1281-
return _pkey_to_rssid_rowid->insert(rssid, rowids, pks, 0, pks.size());
12821283
}
12831284

12841285
Status PrimaryIndex::insert(uint32_t rssid, uint32_t rowid_start, const Column& pks) {
@@ -1411,4 +1412,41 @@ Status PrimaryIndex::major_compaction(Tablet* tablet) {
14111412
}
14121413
}
14131414

1415+
Status PrimaryIndex::reset(Tablet* tablet, EditVersion version, PersistentIndexMetaPB* index_meta) {
1416+
std::lock_guard<std::mutex> lg(_lock);
1417+
_table_id = tablet->belonged_table_id();
1418+
_tablet_id = tablet->tablet_id();
1419+
const TabletSchema& tablet_schema = tablet->tablet_schema();
1420+
vector<ColumnId> pk_columns(tablet_schema.num_key_columns());
1421+
for (auto i = 0; i < tablet_schema.num_key_columns(); i++) {
1422+
pk_columns[i] = (ColumnId)i;
1423+
}
1424+
auto pkey_schema = ChunkHelper::convert_schema(tablet_schema, pk_columns);
1425+
_set_schema(pkey_schema);
1426+
1427+
size_t fix_size = PrimaryKeyEncoder::get_encoded_fixed_size(_pk_schema);
1428+
1429+
if (tablet->get_enable_persistent_index() && (fix_size <= 128)) {
1430+
if (_persistent_index != nullptr) {
1431+
_persistent_index.reset();
1432+
}
1433+
_persistent_index = std::make_unique<PersistentIndex>(tablet->schema_hash_path());
1434+
RETURN_IF_ERROR(_persistent_index->reset(tablet, version, index_meta));
1435+
} else {
1436+
if (_pkey_to_rssid_rowid != nullptr) {
1437+
_pkey_to_rssid_rowid.reset();
1438+
}
1439+
_pkey_to_rssid_rowid = create_hash_index(_enc_pk_type, _key_size);
1440+
}
1441+
_loaded = true;
1442+
1443+
return Status::OK();
1444+
}
1445+
1446+
void PrimaryIndex::reset_cancel_major_compaction() {
1447+
if (_persistent_index != nullptr) {
1448+
_persistent_index->reset_cancel_major_compaction();
1449+
}
1450+
}
1451+
14141452
} // namespace starrocks

be/src/storage/primary_index.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,10 @@ class PrimaryIndex {
135135

136136
size_t key_size() { return _key_size; }
137137

138+
Status reset(Tablet* tablet, EditVersion version, PersistentIndexMetaPB* index_meta);
139+
140+
void reset_cancel_major_compaction();
141+
138142
protected:
139143
void _set_schema(const Schema& pk_schema);
140144

be/src/storage/tablet_updates.cpp

Lines changed: 54 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1642,11 +1642,41 @@ void TabletUpdates::_apply_compaction_commit(const EditVersionInfo& version_info
16421642
auto index_entry = manager->index_cache().get_or_create(tablet_id);
16431643
index_entry->update_expire_time(MonotonicMillis() + manager->get_cache_expire_ms());
16441644
auto& index = index_entry->value();
1645-
auto st = index.load(&_tablet);
1646-
manager->index_cache().update_object_size(index_entry, index.memory_usage());
1645+
1646+
Status st;
1647+
PersistentIndexMetaPB index_meta;
1648+
1649+
bool rebuild_index = (version_info.rowsets.size() == 1 && config::enable_pindex_rebuild_in_compaction);
1650+
// only one output rowset, compaction pick all rowsets, so we can skip pindex read and rebuild index
1651+
if (rebuild_index) {
1652+
st = index.reset(&_tablet, version_info.version, &index_meta);
1653+
} else {
1654+
st = index.load(&_tablet);
1655+
}
16471656
// `enable_persistent_index` of tablet maybe change by alter, we should get `enable_persistent_index` from index to
16481657
// avoid inconsistency between persistent index file and PersistentIndexMeta
16491658
bool enable_persistent_index = index.enable_persistent_index();
1659+
if (enable_persistent_index && !rebuild_index) {
1660+
st = TabletMetaManager::get_persistent_index_meta(_tablet.data_dir(), tablet_id, &index_meta);
1661+
if (!st.ok() && !st.is_not_found()) {
1662+
std::string msg = strings::Substitute("get persistent index meta failed: $0 $1", st.to_string(),
1663+
_debug_string(false, true));
1664+
LOG(ERROR) << msg;
1665+
_set_error(msg);
1666+
return;
1667+
}
1668+
}
1669+
1670+
manager->index_cache().update_object_size(index_entry, index.memory_usage());
1671+
// release or remove index entry when function end
1672+
DeferOp index_defer([&]() {
1673+
index.reset_cancel_major_compaction();
1674+
if (enable_persistent_index ^ _tablet.get_enable_persistent_index()) {
1675+
manager->index_cache().remove(index_entry);
1676+
} else {
1677+
manager->index_cache().release(index_entry);
1678+
}
1679+
});
16501680
if (!st.ok()) {
16511681
manager->index_cache().remove(index_entry);
16521682
_compaction_state.reset();
@@ -1708,8 +1738,28 @@ void TabletUpdates::_apply_compaction_commit(const EditVersionInfo& version_info
17081738
total_rows += pk_col->size();
17091739
uint32_t rssid = rowset_id + i;
17101740
tmp_deletes.clear();
1711-
// replace will not grow hashtable, so don't need to check memory limit
1712-
index.try_replace(rssid, 0, *pk_col, max_src_rssid, &tmp_deletes);
1741+
if (rebuild_index) {
1742+
st = index.insert(rssid, 0, *pk_col);
1743+
if (!st.ok()) {
1744+
_compaction_state.reset();
1745+
std::string msg = strings::Substitute("_apply_compaction_commit error: index isnert failed: $0 $1",
1746+
st.to_string(), debug_string());
1747+
LOG(ERROR) << msg;
1748+
_set_error(msg);
1749+
return;
1750+
}
1751+
} else {
1752+
// replace will not grow hashtable, so don't need to check memory limit
1753+
st = index.try_replace(rssid, 0, *pk_col, max_src_rssid, &tmp_deletes);
1754+
if (!st.ok()) {
1755+
_compaction_state.reset();
1756+
std::string msg = strings::Substitute("_apply_compaction_commit error: index try replace failed: $0 $1",
1757+
st.to_string(), debug_string());
1758+
LOG(ERROR) << msg;
1759+
_set_error(msg);
1760+
return;
1761+
}
1762+
}
17131763
DelVectorPtr dv = std::make_shared<DelVector>();
17141764
if (tmp_deletes.empty()) {
17151765
dv->init(version.major(), nullptr, 0);
@@ -1724,16 +1774,6 @@ void TabletUpdates::_apply_compaction_commit(const EditVersionInfo& version_info
17241774
_compaction_state.reset();
17251775
int64_t t_index_delvec = MonotonicMillis();
17261776

1727-
PersistentIndexMetaPB index_meta;
1728-
if (enable_persistent_index) {
1729-
st = TabletMetaManager::get_persistent_index_meta(_tablet.data_dir(), tablet_id, &index_meta);
1730-
if (!st.ok() && !st.is_not_found()) {
1731-
std::string msg = strings::Substitute("get persistent index meta failed: $0", st.to_string());
1732-
LOG(ERROR) << msg << " " << _debug_string(false, true);
1733-
_set_error(msg);
1734-
return;
1735-
}
1736-
}
17371777
st = index.commit(&index_meta);
17381778
if (!st.ok()) {
17391779
std::string msg = strings::Substitute("primary index commit failed: $0", st.to_string());
@@ -1780,15 +1820,6 @@ void TabletUpdates::_apply_compaction_commit(const EditVersionInfo& version_info
17801820
return;
17811821
}
17821822

1783-
// if `enable_persistent_index` of tablet is change(maybe changed by alter table)
1784-
// we should try to remove the index_entry from cache
1785-
// Otherwise index may be used for later commits, keep in cache
1786-
if (enable_persistent_index ^ _tablet.get_enable_persistent_index()) {
1787-
manager->index_cache().remove(index_entry);
1788-
} else {
1789-
manager->index_cache().release(index_entry);
1790-
}
1791-
17921823
{
17931824
// Update the stats of affected rowsets.
17941825
std::lock_guard lg(_rowset_stats_lock);

0 commit comments

Comments
 (0)