Skip to content

Commit dd511b4

Browse files
authored
[Feature] Support disk disable/decommission (part1) (#37134)
Signed-off-by: gengjun-git <[email protected]>
1 parent bf2b96e commit dd511b4

35 files changed

+955
-246
lines changed

be/src/agent/heartbeat_server.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,14 @@ void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result, const TMaste
109109
// nothing to do
110110
}
111111

112+
if (master_info.__isset.disabled_disks) {
113+
_olap_engine->disable_disks(master_info.disabled_disks);
114+
}
115+
116+
if (master_info.__isset.decommissioned_disks) {
117+
_olap_engine->decommission_disks(master_info.decommissioned_disks);
118+
}
119+
112120
static auto num_hardware_cores = static_cast<int32_t>(CpuInfo::num_cores());
113121
if (res.ok()) {
114122
heartbeat_result.backend_info.__set_be_port(config::be_port);

be/src/storage/data_dir.cpp

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ DataDir::DataDir(const std::string& path, TStorageMedium::type storage_medium, T
7272
_available_bytes(0),
7373
_disk_capacity_bytes(0),
7474
_storage_medium(storage_medium),
75-
_is_used(false),
7675
_tablet_manager(tablet_manager),
7776
_txn_manager(txn_manager),
7877
_cluster_id_mgr(std::make_shared<ClusterIdMgr>(path)),
@@ -99,7 +98,7 @@ Status DataDir::init(bool read_only) {
9998
RETURN_IF_ERROR_WITH_WARN(_init_meta(read_only), "_init_meta failed");
10099
RETURN_IF_ERROR_WITH_WARN(init_persistent_index_dir(), "_init_persistent_index_dir failed");
101100

102-
_is_used = true;
101+
_state = DiskState::ONLINE;
103102
return Status::OK();
104103
}
105104

@@ -150,15 +149,26 @@ Status DataDir::set_cluster_id(int32_t cluster_id) {
150149
}
151150

152151
void DataDir::health_check() {
152+
const int retry_times = 10;
153153
// check disk
154-
if (_is_used) {
155-
Status res = _read_and_write_test_file();
156-
if (!res.ok()) {
157-
LOG(WARNING) << "store read/write test file occur IO Error. path=" << _path << ", res=" << res.to_string();
158-
if (is_io_error(res)) {
159-
_is_used = false;
154+
if (_state != DiskState::DECOMMISSIONED && _state != DiskState::DISABLED) {
155+
bool all_failed = true;
156+
for (int i = 0; i < retry_times; i++) {
157+
Status res = _read_and_write_test_file();
158+
if (res.ok() || !is_io_error(res)) {
159+
all_failed = false;
160+
break;
161+
} else {
162+
LOG(WARNING) << "store read/write test file occur IO Error. path=" << _path
163+
<< ", res=" << res.to_string();
160164
}
161165
}
166+
if (all_failed) {
167+
LOG(WARNING) << "store test failed " << retry_times << " times, set _state to OFFLINE. path=" << _path;
168+
_state = DiskState::OFFLINE;
169+
} else {
170+
_state = DiskState::ONLINE;
171+
}
162172
}
163173
}
164174

be/src/storage/data_dir.h

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ class Tablet;
5656
class TabletManager;
5757
class TxnManager;
5858

59+
enum DiskState {
60+
ONLINE,
61+
OFFLINE, // detected by health_check, tablets on OFFLINE disk will be dropped.
62+
DISABLED, // set by user, tablets on DISABLED disk will be dropped.
63+
DECOMMISSIONED // set by user, tablets on DECOMMISSIONED disk will be migrated to other disks.
64+
};
65+
5966
// A DataDir used to manage data in same path.
6067
// Now, After DataDir was created, it will never be deleted for easy implementation.
6168
class DataDir {
@@ -72,8 +79,9 @@ class DataDir {
7279

7380
const std::string& path() const { return _path; }
7481
int64_t path_hash() const { return _path_hash; }
75-
bool is_used() const { return _is_used; }
76-
void set_is_used(bool is_used) { _is_used = is_used; }
82+
bool is_used() const { return _state == DiskState::ONLINE || _state == DiskState::DECOMMISSIONED; }
83+
DiskState get_state() const { return _state; }
84+
void set_state(DiskState state) { _state = state; }
7785
int32_t cluster_id() const { return _cluster_id_mgr->cluster_id(); }
7886

7987
DataDirInfo get_dir_info() {
@@ -82,7 +90,7 @@ class DataDir {
8290
info.path_hash = _path_hash;
8391
info.disk_capacity = _disk_capacity_bytes;
8492
info.available = _available_bytes;
85-
info.is_used = _is_used;
93+
info.is_used = is_used();
8694
info.storage_medium = _storage_medium;
8795
return info;
8896
}
@@ -170,7 +178,7 @@ class DataDir {
170178
// the actual capacity of the disk of this data dir
171179
int64_t _disk_capacity_bytes;
172180
TStorageMedium::type _storage_medium;
173-
bool _is_used;
181+
DiskState _state;
174182

175183
TabletManager* _tablet_manager;
176184
TxnManager* _txn_manager;

be/src/storage/storage_engine.cpp

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ std::vector<DataDir*> StorageEngine::get_stores_for_create_tablet(TStorageMedium
473473
{
474474
std::lock_guard<std::mutex> l(_store_lock);
475475
for (auto& it : _store_map) {
476-
if (it.second->is_used()) {
476+
if (it.second->get_state() == DiskState::ONLINE) {
477477
if (_available_storage_medium_type_count == 1 || it.second->storage_medium() == storage_medium) {
478478
if (!it.second->capacity_limit_reached(0)) {
479479
stores.push_back(it.second);
@@ -1541,4 +1541,38 @@ void StorageEngine::clear_rowset_delta_column_group_cache(const Rowset& rowset)
15411541
}());
15421542
}
15431543

1544+
void StorageEngine::disable_disks(const std::vector<string>& disabled_disks) {
1545+
std::lock_guard<std::mutex> l(_store_lock);
1546+
1547+
for (auto& it : _store_map) {
1548+
if (std::find(disabled_disks.begin(), disabled_disks.end(), it.second->path()) != disabled_disks.end()) {
1549+
it.second->set_state(DiskState::DISABLED);
1550+
LOG(INFO) << "disk " << it.second->path() << " is set to DISABLED";
1551+
} else {
1552+
if (it.second->get_state() == DiskState::DISABLED) {
1553+
it.second->set_state(DiskState::ONLINE);
1554+
LOG(INFO) << "disk " << it.second->path() << " is recovered to ONLINE, previous state is DISABLED";
1555+
}
1556+
}
1557+
}
1558+
}
1559+
1560+
void StorageEngine::decommission_disks(const std::vector<string>& decommission_disks) {
1561+
std::lock_guard<std::mutex> l(_store_lock);
1562+
1563+
for (auto& it : _store_map) {
1564+
if (std::find(decommission_disks.begin(), decommission_disks.end(), it.second->path()) !=
1565+
decommission_disks.end()) {
1566+
it.second->set_state(DiskState::DECOMMISSIONED);
1567+
LOG(INFO) << "disk " << it.second->path() << " is set to DECOMMISSIONED";
1568+
} else {
1569+
if (it.second->get_state() == DiskState::DECOMMISSIONED) {
1570+
it.second->set_state(DiskState::ONLINE);
1571+
LOG(INFO) << "disk " << it.second->path()
1572+
<< " is recovered to ONLINE, previous state is DECOMMISSIONED";
1573+
}
1574+
}
1575+
}
1576+
}
1577+
15441578
} // namespace starrocks

be/src/storage/storage_engine.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,10 @@ class StorageEngine {
282282

283283
void clear_rowset_delta_column_group_cache(const Rowset& rowset);
284284

285+
void disable_disks(const std::vector<string>& disabled_disks);
286+
287+
void decommission_disks(const std::vector<string>& decommissioned_disks);
288+
285289
void wake_finish_publish_vesion_thread() {
286290
std::unique_lock<std::mutex> wl(_finish_publish_version_mutex);
287291
_finish_publish_version_cv.notify_one();

fe/fe-core/src/main/java/com/starrocks/catalog/DataProperty.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public static DataProperty getInferredDefaultDataProperty() {
9191
for (Backend backend : backends) {
9292
if (backend.hasPathHash()) {
9393
mediumSet.addAll(backend.getDisks().values().stream()
94-
.filter(v -> v.getState() == DiskInfo.DiskState.ONLINE)
94+
.filter(DiskInfo::canCreateTablet)
9595
.map(DiskInfo::getStorageMedium).collect(Collectors.toSet()));
9696
}
9797
}

fe/fe-core/src/main/java/com/starrocks/catalog/DiskInfo.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,14 @@
4949
public class DiskInfo implements Writable {
5050
public enum DiskState {
5151
ONLINE,
52-
OFFLINE
52+
// Reported from BE, if disk is detected as unavailable, state will be OFFLINE.
53+
// Tablets on OFFLINE disk will be dropped.
54+
OFFLINE,
55+
// Set by user, tablets on DISABLED disk will be dropped.
56+
DISABLED,
57+
// Set by user, tablets on DECOMMISSIONED disk will be cloned to other backends.
58+
// Before the decommission finish, the disk is still usable, i.e. can provide read and write capability.
59+
DECOMMISSIONED
5360
}
5461

5562
private static final Logger LOG = LogManager.getLogger(DiskInfo.class);
@@ -129,6 +136,14 @@ public DiskState getState() {
129136
return state;
130137
}
131138

139+
public boolean canReadWrite() {
140+
return state == DiskState.ONLINE || state == DiskState.DECOMMISSIONED;
141+
}
142+
143+
public boolean canCreateTablet() {
144+
return state == DiskState.ONLINE;
145+
}
146+
132147
// return true if changed
133148
public boolean setState(DiskState state) {
134149
if (this.state != state) {

fe/fe-core/src/main/java/com/starrocks/catalog/LocalTablet.java

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public enum TabletStatus {
8383
COLOCATE_MISMATCH, // replicas do not all locate in right colocate backends set.
8484
COLOCATE_REDUNDANT, // replicas match the colocate backends set, but redundant.
8585
NEED_FURTHER_REPAIR, // one of replicas need a definite repair.
86+
DISK_MIGRATION, // The disk where the replica is located is decommissioned.
8687
}
8788

8889
// Most read only accesses to replicas should acquire db lock, to prevent
@@ -586,7 +587,8 @@ private Pair<TabletStatus, TabletSchedCtx.Priority> getHealthStatusWithPriorityU
586587

587588
int alive = 0;
588589
int aliveAndVersionComplete = 0;
589-
int stable = 0;
590+
int backendStable = 0;
591+
int diskStable = 0;
590592

591593
Replica needFurtherRepairReplica = null;
592594
Set<String> hosts = Sets.newHashSet();
@@ -617,7 +619,13 @@ private Pair<TabletStatus, TabletSchedCtx.Priority> getHealthStatusWithPriorityU
617619
// this replica is alive, version complete, but backend is not available
618620
continue;
619621
}
620-
stable++;
622+
backendStable++;
623+
624+
if (backend.isDiskDecommissioned(replica.getPathHash())) {
625+
// disk in decommission state
626+
continue;
627+
}
628+
diskStable++;
621629
}
622630

623631
// 1. alive replicas are not enough
@@ -628,7 +636,7 @@ private Pair<TabletStatus, TabletSchedCtx.Priority> getHealthStatusWithPriorityU
628636
if (needRecoverWithEmptyTablet(systemInfoService)) {
629637
LOG.info("need to forcefully recover with empty tablet for {}, replica info:{}",
630638
id, getReplicaInfos());
631-
return createRedundantSchedCtx(TabletStatus.FORCE_REDUNDANT, TabletSchedCtx.Priority.VERY_HIGH,
639+
return createRedundantSchedCtx(TabletStatus.FORCE_REDUNDANT, Priority.VERY_HIGH,
632640
needFurtherRepairReplica);
633641
}
634642

@@ -644,7 +652,7 @@ private Pair<TabletStatus, TabletSchedCtx.Priority> getHealthStatusWithPriorityU
644652
// at least one backend for new replica.
645653
// 4. replicationNum > 1: if replication num is set to 1, do not delete any replica, for safety reason
646654
// For example: 3 replica, 3 be, one set bad, we need to forcefully delete one first
647-
return createRedundantSchedCtx(TabletStatus.FORCE_REDUNDANT, TabletSchedCtx.Priority.VERY_HIGH,
655+
return createRedundantSchedCtx(TabletStatus.FORCE_REDUNDANT, Priority.VERY_HIGH,
648656
needFurtherRepairReplica);
649657
} else {
650658
List<Long> availableBEs = systemInfoService.getAvailableBackendIds();
@@ -654,26 +662,26 @@ private Pair<TabletStatus, TabletSchedCtx.Priority> getHealthStatusWithPriorityU
654662
// of load task won't be blocked either.
655663
if (availableBEs.size() > alive) {
656664
if (alive < (replicationNum / 2) + 1) {
657-
return Pair.create(TabletStatus.REPLICA_MISSING, TabletSchedCtx.Priority.HIGH);
665+
return Pair.create(TabletStatus.REPLICA_MISSING, Priority.HIGH);
658666
} else if (alive < replicationNum) {
659-
return Pair.create(TabletStatus.REPLICA_MISSING, TabletSchedCtx.Priority.NORMAL);
667+
return Pair.create(TabletStatus.REPLICA_MISSING, Priority.NORMAL);
660668
}
661669
}
662670
}
663671

664672
// 2. version complete replicas are not enough
665673
if (aliveAndVersionComplete < (replicationNum / 2) + 1) {
666-
return Pair.create(TabletStatus.VERSION_INCOMPLETE, TabletSchedCtx.Priority.HIGH);
674+
return Pair.create(TabletStatus.VERSION_INCOMPLETE, Priority.HIGH);
667675
} else if (aliveAndVersionComplete < replicationNum) {
668-
return Pair.create(TabletStatus.VERSION_INCOMPLETE, TabletSchedCtx.Priority.NORMAL);
676+
return Pair.create(TabletStatus.VERSION_INCOMPLETE, Priority.NORMAL);
669677
} else if (aliveAndVersionComplete > replicationNum) {
670678
// we set REDUNDANT as VERY_HIGH, because delete redundant replicas can free the space quickly.
671-
return createRedundantSchedCtx(TabletStatus.REDUNDANT, TabletSchedCtx.Priority.VERY_HIGH,
679+
return createRedundantSchedCtx(TabletStatus.REDUNDANT, Priority.VERY_HIGH,
672680
needFurtherRepairReplica);
673681
}
674682

675683
// 3. replica is under relocating
676-
if (stable < replicationNum) {
684+
if (backendStable < replicationNum) {
677685
List<Long> replicaBeIds = replicas.stream()
678686
.map(Replica::getBackendId).collect(Collectors.toList());
679687
List<Long> availableBeIds = aliveBeIdsInCluster.stream()
@@ -683,25 +691,30 @@ private Pair<TabletStatus, TabletSchedCtx.Priority> getHealthStatusWithPriorityU
683691
&& availableBeIds.size() >= replicationNum
684692
&& replicationNum > 1) { // Doesn't have any BE that can be chosen to create a new replica
685693
return createRedundantSchedCtx(TabletStatus.FORCE_REDUNDANT,
686-
stable < (replicationNum / 2) + 1 ? TabletSchedCtx.Priority.NORMAL :
687-
TabletSchedCtx.Priority.LOW, needFurtherRepairReplica);
694+
backendStable < (replicationNum / 2) + 1 ? Priority.NORMAL :
695+
Priority.LOW, needFurtherRepairReplica);
688696
}
689-
if (stable < (replicationNum / 2) + 1) {
690-
return Pair.create(TabletStatus.REPLICA_RELOCATING, TabletSchedCtx.Priority.NORMAL);
697+
if (backendStable < (replicationNum / 2) + 1) {
698+
return Pair.create(TabletStatus.REPLICA_RELOCATING, Priority.NORMAL);
691699
} else {
692700
return Pair.create(TabletStatus.REPLICA_RELOCATING, Priority.LOW);
693701
}
694702
}
695703

696-
// 4. replica redundant
704+
// 4. disk decommission
705+
if (diskStable < replicationNum) {
706+
return Pair.create(TabletStatus.DISK_MIGRATION, Priority.NORMAL);
707+
}
708+
709+
// 5. replica redundant
697710
if (replicas.size() > replicationNum) {
698711
// we set REDUNDANT as VERY_HIGH, because delete redundant replicas can free the space quickly.
699-
return createRedundantSchedCtx(TabletStatus.REDUNDANT, TabletSchedCtx.Priority.VERY_HIGH,
712+
return createRedundantSchedCtx(TabletStatus.REDUNDANT, Priority.VERY_HIGH,
700713
needFurtherRepairReplica);
701714
}
702715

703-
// 5. healthy
704-
return Pair.create(TabletStatus.HEALTHY, TabletSchedCtx.Priority.NORMAL);
716+
// 6. healthy
717+
return Pair.create(TabletStatus.HEALTHY, Priority.NORMAL);
705718
}
706719

707720
public TabletStatus getColocateHealthStatus(long visibleVersion,
@@ -752,6 +765,7 @@ && containsAnyHighPrioBackend(replicaBackendIds, Config.tablet_sched_colocate_ba
752765
}
753766
}
754767

768+
int diskStableCnt = 0;
755769
// 2. check version completeness
756770
for (Replica replica : replicas) {
757771
// do not check the replica that is not in the colocate backend set,
@@ -771,9 +785,19 @@ && containsAnyHighPrioBackend(replicaBackendIds, Config.tablet_sched_colocate_ba
771785
// this replica is alive but version incomplete
772786
return TabletStatus.VERSION_INCOMPLETE;
773787
}
788+
789+
Backend backend = GlobalStateMgr.getCurrentSystemInfo().getBackend(replica.getBackendId());
790+
if (backend != null && !backend.isDiskDecommissioned(replica.getPathHash())) {
791+
diskStableCnt++;
792+
}
793+
}
794+
795+
// 3. check disk decommission
796+
if (diskStableCnt < replicationNum) {
797+
return TabletStatus.DISK_MIGRATION;
774798
}
775799

776-
// 3. check redundant
800+
// 4. check redundant
777801
if (replicas.size() > replicationNum) {
778802
return TabletStatus.COLOCATE_REDUNDANT;
779803
}

0 commit comments

Comments
 (0)