diff --git a/conanfile.py b/conanfile.py index 1becd88d..5384e09d 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomeObjectConan(ConanFile): name = "homeobject" - version = "2.4.1" + version = "2.4.2" homepage = "https://github.com/eBay/HomeObject" description = "Blob Store built on HomeReplication" diff --git a/src/include/homeobject/shard_manager.hpp b/src/include/homeobject/shard_manager.hpp index bfc111c9..d71777c9 100644 --- a/src/include/homeobject/shard_manager.hpp +++ b/src/include/homeobject/shard_manager.hpp @@ -27,7 +27,6 @@ struct ShardInfo { uint64_t last_modified_time; uint64_t available_capacity_bytes; uint64_t total_capacity_bytes; - uint64_t deleted_capacity_bytes; std::optional< peer_id_t > current_leader{std::nullopt}; auto operator<=>(ShardInfo const& rhs) const { return id <=> rhs.id; } diff --git a/src/lib/homeobject_impl.hpp b/src/lib/homeobject_impl.hpp index 03a17775..509c2aad 100644 --- a/src/lib/homeobject_impl.hpp +++ b/src/lib/homeobject_impl.hpp @@ -162,8 +162,8 @@ class HomeObjectImpl : public HomeObject, /// BlobManager BlobManager::AsyncResult< blob_id_t > put(shard_id_t shard, Blob&&, trace_id_t tid) final; - BlobManager::AsyncResult< Blob > get(shard_id_t shard, blob_id_t const& blob, uint64_t off, - uint64_t len, trace_id_t tid) const final; + BlobManager::AsyncResult< Blob > get(shard_id_t shard, blob_id_t const& blob, uint64_t off, uint64_t len, + trace_id_t tid) const final; BlobManager::NullAsyncResult del(shard_id_t shard, blob_id_t const& blob, trace_id_t tid) final; }; diff --git a/src/lib/homestore_backend/CMakeLists.txt b/src/lib/homestore_backend/CMakeLists.txt index d1c44195..2c518f4e 100644 --- a/src/lib/homestore_backend/CMakeLists.txt +++ b/src/lib/homestore_backend/CMakeLists.txt @@ -127,7 +127,10 @@ add_test(NAME HomestoreResyncTestWithLeaderRestart add_executable(homestore_test_gc) target_sources(homestore_test_gc PRIVATE $) target_link_libraries(homestore_test_gc PUBLIC homeobject_homestore ${COMMON_TEST_DEPS}) -add_test(NAME HomestoreTestGC COMMAND homestore_test_pg -csv error --executor immediate --config_path ./ - --override_config homestore_config.consensus.snapshot_freq_distance:0 - --override_config homestore_config.consensus.max_grpc_message_size:138412032) +add_test(NAME HomestoreTestGC COMMAND homestore_test_gc -csv error --executor immediate --config_path ./ + --override_config hs_backend_config.enable_gc=true + --override_config hs_backend_config.gc_garbage_rate_threshold=0 + --override_config hs_backend_config.gc_scan_interval_sec=5 + # remove this until gc supports baseline resync + --override_config homestore_config.consensus.snapshot_freq_distance:0) diff --git a/src/lib/homestore_backend/gc_manager.cpp b/src/lib/homestore_backend/gc_manager.cpp index 4379db13..f5e71b1c 100644 --- a/src/lib/homestore_backend/gc_manager.cpp +++ b/src/lib/homestore_backend/gc_manager.cpp @@ -76,12 +76,11 @@ void GCManager::on_gc_actor_meta_blk_found(sisl::byte_view const& buf, void* met void GCManager::on_reserved_chunk_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie) { homestore::superblk< GCManager::gc_reserved_chunk_superblk > reserved_chunk_sb( GCManager::_gc_reserved_chunk_meta_name); - reserved_chunk_sb.load(buf, meta_cookie); - auto chunk_id = reserved_chunk_sb->chunk_id; + auto chunk_id = reserved_chunk_sb.load(buf, meta_cookie)->chunk_id; auto pdev_id = m_chunk_selector->get_extend_vchunk(chunk_id)->get_pdev_id(); auto gc_actor = get_pdev_gc_actor(pdev_id); RELEASE_ASSERT(gc_actor, "can not get gc actor for pdev {}!", pdev_id); - gc_actor->add_reserved_chunk(chunk_id); + gc_actor->add_reserved_chunk(std::move(reserved_chunk_sb)); // mark a reserved chunk as gc state, so that it will not be selected as a gc candidate m_chunk_selector->try_mark_chunk_to_gc_state(chunk_id, true /* force */); } @@ -94,10 +93,12 @@ void GCManager::start() { LOGINFO("start gc actor for pdev={}", pdev_id); } + const auto gc_scan_interval_sec = HS_BACKEND_DYNAMIC_CONFIG(gc_scan_interval_sec); + m_gc_timer_hdl = iomanager.schedule_global_timer( - GC_SCAN_INTERVAL_SEC * 1000 * 1000 * 1000, true, nullptr /*cookie*/, iomgr::reactor_regex::all_user, + gc_scan_interval_sec * 1000 * 1000 * 1000, true, nullptr /*cookie*/, iomgr::reactor_regex::all_user, [this](void*) { scan_chunks_for_gc(); }, true /* wait_to_schedule */); - LOGINFO("gc scheduler timer has started, interval is set to {} seconds", GC_SCAN_INTERVAL_SEC); + LOGINFO("gc scheduler timer has started, interval is set to {} seconds", gc_scan_interval_sec); } void GCManager::stop() { @@ -152,32 +153,41 @@ std::shared_ptr< GCManager::pdev_gc_actor > GCManager::get_pdev_gc_actor(uint32_ bool GCManager::is_eligible_for_gc(chunk_id_t chunk_id) { auto chunk = m_chunk_selector->get_extend_vchunk(chunk_id); + const auto defrag_blk_num = chunk->get_defrag_nblks(); + + if (!defrag_blk_num) { + LOGDEBUG("chunk_id={} has no defrag blk, skip gc", chunk_id); + return false; + } + // 1 if the chunk state is inuse, it is occupied by a open shard, so it can not be selected and we don't need gc it. // 2 if the chunk state is gc, it means this chunk is being gc, or this is a reserved chunk, so we don't need gc it. if (chunk->m_state != ChunkState::AVAILABLE) { - LOGINFO("chunk_id={} state is {}, not eligible for gc", chunk_id, chunk->m_state) + LOGDEBUG("chunk_id={} state is {}, not eligible for gc", chunk_id, chunk->m_state) return false; } // it does not belong to any pg, so we don't need to gc it. if (!chunk->m_pg_id.has_value()) { - LOGINFO("chunk_id={} belongs to no pg, not eligible for gc", chunk_id) + LOGDEBUG("chunk_id={} belongs to no pg, not eligible for gc", chunk_id) return false; } - LOGINFO("chunk_id={} is eligible for gc, belongs to pg {}", chunk_id, chunk->m_pg_id.value()); + LOGDEBUG("chunk_id={} is eligible for gc, belongs to pg {}", chunk_id, chunk->m_pg_id.value()); + const auto total_blk_num = chunk->get_total_blks(); - auto defrag_blk_num = chunk->get_defrag_nblks(); - auto total_blk_num = chunk->get_total_blks(); + const auto gc_garbage_rate_threshold = HS_BACKEND_DYNAMIC_CONFIG(gc_garbage_rate_threshold); // defrag_blk_num > (GC_THRESHOLD_PERCENT/100) * total_blk_num, to avoid floating point number calculation // TODO: avoid overflow here. - return 100 * defrag_blk_num > total_blk_num * GC_GARBAGE_RATE_THRESHOLD; + return 100 * defrag_blk_num >= total_blk_num * gc_garbage_rate_threshold; } void GCManager::scan_chunks_for_gc() { + const auto reserved_chunk_num_per_pdev = HS_BACKEND_DYNAMIC_CONFIG(reserved_chunk_num_per_pdev); + const auto reserved_chunk_num_per_pdev_for_egc = HS_BACKEND_DYNAMIC_CONFIG(reserved_chunk_num_per_pdev_for_egc); + for (const auto& [pdev_id, chunks] : m_chunk_selector->get_pdev_chunks()) { - // in every iteration, we will select at most 2 * RESERVED_CHUNK_NUM_PER_PDEV gc tasks - auto max_task_num = 2 * (RESERVED_CHUNK_NUM_PER_PDEV - RESERVED_CHUNK_NUM_DEDICATED_FOR_EGC); + auto max_task_num = 2 * (reserved_chunk_num_per_pdev - reserved_chunk_num_per_pdev_for_egc); auto it = m_pdev_gc_actors.find(pdev_id); RELEASE_ASSERT(it != m_pdev_gc_actors.end(), "can not find gc actor for pdev_id {} when scanning chunks for gc", pdev_id); @@ -212,7 +222,7 @@ GCManager::pdev_gc_actor::pdev_gc_actor(uint32_t pdev_id, std::shared_ptr< HeapC std::shared_ptr< GCBlobIndexTable > index_table, HSHomeObject* homeobject) : m_pdev_id{pdev_id}, m_chunk_selector{chunk_selector}, - m_reserved_chunk_queue{RESERVED_CHUNK_NUM_PER_PDEV}, + m_reserved_chunk_queue{HS_BACKEND_DYNAMIC_CONFIG(reserved_chunk_num_per_pdev)}, m_index_table{index_table}, m_hs_home_object{homeobject} { RELEASE_ASSERT(index_table, "index_table for a gc_actor should not be nullptr!!!"); @@ -224,14 +234,19 @@ void GCManager::pdev_gc_actor::start() { LOGERROR("pdev gc actor for pdev_id={} is already started, no need to start again!", m_pdev_id); return; } - RELEASE_ASSERT(RESERVED_CHUNK_NUM_PER_PDEV > RESERVED_CHUNK_NUM_DEDICATED_FOR_EGC, - "reserved chunk number {} per pdev should be greater than {}", RESERVED_CHUNK_NUM_PER_PDEV, - RESERVED_CHUNK_NUM_DEDICATED_FOR_EGC); + + const auto reserved_chunk_num_per_pdev = HS_BACKEND_DYNAMIC_CONFIG(reserved_chunk_num_per_pdev); + const auto reserved_chunk_num_per_pdev_for_egc = HS_BACKEND_DYNAMIC_CONFIG(reserved_chunk_num_per_pdev_for_egc); + + RELEASE_ASSERT(reserved_chunk_num_per_pdev > reserved_chunk_num_per_pdev_for_egc, + "reserved chunk number {} per pdev should be greater than {}", reserved_chunk_num_per_pdev, + reserved_chunk_num_per_pdev_for_egc); // thread number is the same as reserved chunk, which can make sure every gc thread can take a reserved chunk // for gc - m_gc_executor = std::make_shared< folly::IOThreadPoolExecutor >(RESERVED_CHUNK_NUM_PER_PDEV - - RESERVED_CHUNK_NUM_DEDICATED_FOR_EGC); - m_egc_executor = std::make_shared< folly::IOThreadPoolExecutor >(RESERVED_CHUNK_NUM_DEDICATED_FOR_EGC); + m_gc_executor = std::make_shared< folly::IOThreadPoolExecutor >(reserved_chunk_num_per_pdev - + reserved_chunk_num_per_pdev_for_egc); + + m_egc_executor = std::make_shared< folly::IOThreadPoolExecutor >(reserved_chunk_num_per_pdev_for_egc); LOGINFO("pdev gc actor for pdev_id={} has started", m_pdev_id); } @@ -250,7 +265,10 @@ void GCManager::pdev_gc_actor::stop() { LOGINFO("pdev gc actor for pdev_id={} has stopped", m_pdev_id); } -void GCManager::pdev_gc_actor::add_reserved_chunk(chunk_id_t chunk_id) { +void GCManager::pdev_gc_actor::add_reserved_chunk( + homestore::superblk< GCManager::gc_reserved_chunk_superblk > reserved_chunk_sb) { + auto chunk_id = reserved_chunk_sb->chunk_id; + m_reserved_chunks.emplace_back(std::move(reserved_chunk_sb)); m_reserved_chunk_queue.blockingWrite(chunk_id); } @@ -261,12 +279,12 @@ folly::SemiFuture< bool > GCManager::pdev_gc_actor::add_gc_task(uint8_t priority if (sisl_unlikely(priority == static_cast< uint8_t >(task_priority::emergent))) { m_egc_executor->add([this, priority, move_from_chunk, promise = std::move(promise)]() mutable { - LOGINFO("start emergent gc task : move_from_chunk_id={}, priority={}", move_from_chunk, priority); + LOGDEBUG("start emergent gc task : move_from_chunk_id={}, priority={}", move_from_chunk, priority); process_gc_task(move_from_chunk, priority, std::move(promise)); }); } else { m_gc_executor->add([this, priority, move_from_chunk, promise = std::move(promise)]() mutable { - LOGINFO("start gc task : move_from_chunk_id={}, priority={}", move_from_chunk, priority); + LOGDEBUG("start gc task : move_from_chunk_id={}, priority={}", move_from_chunk, priority); process_gc_task(move_from_chunk, priority, std::move(promise)); }); } @@ -286,30 +304,35 @@ void GCManager::pdev_gc_actor::handle_recovered_gc_task(const GCManager::gc_task // 1 we need to move the move_to_chunk out of the reserved chunk queue std::list< chunk_id_t > reserved_chunks; - chunk_id_t chunk_id; + chunk_id_t chunk_id{0}; for (; m_reserved_chunk_queue.read(chunk_id);) { if (chunk_id == move_to_chunk) { - // we found the chunk to be moved, so we can stop reading + // we found the chunk to be moved to, so we can stop reading break; } reserved_chunks.emplace_back(chunk_id); } + + RELEASE_ASSERT(chunk_id == move_to_chunk, + "can not find move_to_chunk={} in reserved chunk queue, move_from_chunk={}, priority={}", + move_to_chunk, move_from_chunk, priority); + // now we need to put the reserved chunks back to the reserved chunk queue for (const auto& reserved_chunk : reserved_chunks) { m_reserved_chunk_queue.blockingWrite(reserved_chunk); } - // 2 we need to select the move_from_chunk out of per pg chunk heap in chunk selector if it is a gc task with - // normal priority. for the task with emergent priority, it is already selected since it is now used for an open - // shard. - + // 2 make the move_from_chunk to gc state auto vchunk = m_chunk_selector->get_extend_vchunk(move_from_chunk); RELEASE_ASSERT(vchunk, "can not find vchunk for pchunk {} in chunk selector!", move_from_chunk); RELEASE_ASSERT(vchunk->m_pg_id.has_value(), "chunk_id={} is expected to belong to a pg, but not !", move_from_chunk); m_chunk_selector->try_mark_chunk_to_gc_state(move_from_chunk, true /* force */); - // 3 now we can switch the two chunks. + + // 3 now we can switch the two chunks. we can make sure that all the valid data in move_from_chunk has been copied + // to move_to_chunk, and all the blob -> (new pba) have been written to the gc index table. what we need to do is + // just updating blob indexes in pg index table according to the blob indexes in gc index table. if (!replace_blob_index(move_from_chunk, move_to_chunk, priority)) { RELEASE_ASSERT(false, "failed to handle recovered gc task for move_from_chunk={} to move_to_chunk={} with priority={}", @@ -319,6 +342,124 @@ void GCManager::pdev_gc_actor::handle_recovered_gc_task(const GCManager::gc_task bool GCManager::pdev_gc_actor::replace_blob_index(chunk_id_t move_from_chunk, chunk_id_t move_to_chunk, uint8_t priority) { + // 1 get all blob indexes from gc index table + std::vector< std::pair< BlobRouteByChunkKey, BlobRouteValue > > valid_blob_indexes; + auto start_key = BlobRouteByChunkKey{BlobRouteByChunk(move_to_chunk, 0, 0)}; + auto end_key = BlobRouteByChunkKey{BlobRouteByChunk{move_to_chunk, std::numeric_limits< uint64_t >::max(), + std::numeric_limits< uint64_t >::max()}}; + homestore::BtreeQueryRequest< BlobRouteByChunkKey > query_req{homestore::BtreeKeyRange< BlobRouteByChunkKey >{ + std::move(start_key), true /* inclusive */, std::move(end_key), true /* inclusive */ + }}; + + auto ret = m_index_table->query(query_req, valid_blob_indexes); + if (ret != homestore::btree_status_t::success) { + // "ret != homestore::btree_status_t::has_more" is not expetced here, since we are querying all the pbas in one + // time. + // TODO:: handle the error case here. + LOGERROR("Failed to query blobs in gc index table for ret={} move_to_chunk={}", ret, move_to_chunk); + m_reserved_chunk_queue.blockingWrite(move_to_chunk); + return false; + } + + // 2 get pg index table + auto move_from_vchunk = m_chunk_selector->get_extend_vchunk(move_from_chunk); + RELEASE_ASSERT(move_from_vchunk->m_pg_id.has_value(), "chunk_id={} is expected to belong to a pg, but not!", + move_from_chunk); + auto pg_id = move_from_vchunk->m_pg_id.value(); + auto hs_pg = m_hs_home_object->get_hs_pg(pg_id); + + // TODO:: add logic to handle pg_index_table is nullptr if destroying pg happens when GC + RELEASE_ASSERT(hs_pg, "Unknown PG for pg_id={}", pg_id); + auto pg_index_table = hs_pg->index_table_; + RELEASE_ASSERT(pg_index_table, "Index table not found for PG pg_id={}", pg_id); + + // 3 update pg index table according to the query result of gc index table. + // BtreeRangePutRequest only support update a range of keys to the same value, so we need to update the pg + // indextable here one by one. since the update of index table is very fast , and gc is not time sensitive, so + // we do this sequentially ATM. + + // TODO:: optimization, concurrently update pg index table. + for (const auto& [k, v] : valid_blob_indexes) { + const auto& shard = k.key().shard; + const auto& blob = k.key().blob; + BlobRouteKey index_key{BlobRoute{shard, blob}}; + + homestore::BtreeSinglePutRequest update_req{ + &index_key, &v, homestore::btree_put_type::UPDATE, nullptr, + [&pg_id, &shard, &blob](homestore::BtreeKey const& key, homestore::BtreeValue const& value_in_btree, + homestore::BtreeValue const& new_value) -> homestore::put_filter_decision { + BlobRouteValue existing_value{value_in_btree}; + if (existing_value.pbas() == HSHomeObject::tombstone_pbas) { + LOGDEBUG( + "remove tombstone when updating pg index after data copy , pg_id={}, shard_id={}, blob_id={}", + pg_id, shard, blob); + BlobRouteValue new_pba_value{new_value}; + homestore::data_service().async_free_blk(new_pba_value.pbas()); + return homestore::put_filter_decision::remove; + } + return homestore::put_filter_decision::replace; + }}; + + ret = pg_index_table->put(update_req); + + // 1 if the key exist, and the filter returns homestore::put_filter_decision::replace, then ret will be + // homestore::btree_status_t::success + + // 2 if the key exist , and the filter returns homestore::put_filter_decision::remove, the ret will be + // homestore::btree_status_t::filtered_out.(this might happen is a key is deleted after data copy but before + // replace index) + + // 3 if the key doest not exist, the ret will be homestore::btree_status_t::not_found(this might + // happen when crash recovery) + + if (ret != homestore::btree_status_t::success && ret != homestore::btree_status_t::filtered_out && + ret != homestore::btree_status_t::not_found) { + LOGERROR( + "Failed to update blob in pg index table for move_from_chunk={}, error status = {}, move_to_chunk={}", + move_from_chunk, ret, move_to_chunk); + // pg index table might be partial updated, we can not put move_to_chunk back to the queue + // m_reserved_chunk_queue.blockingWrite(move_to_chunk); + return false; + } + LOGDEBUG("update index table for pg={}, ret={}, move_from_chunk={}, move_to_chunk={}, shard={}, blob={}", pg_id, + ret, move_from_chunk, move_to_chunk, shard, blob); + } + + // TODO:: revisit the following part with the consideration of persisting order for recovery. + + // 4 update pg metablk and related in-memory data structures + m_hs_home_object->update_pg_meta_after_gc(pg_id, move_from_chunk, move_to_chunk); + + // 5 update shard metablk and related in-memory data structures + m_hs_home_object->update_shard_meta_after_gc(move_from_chunk, move_to_chunk); + + // 6 change the pg_id and vchunk_id of the move_to_chunk according to move_from_chunk + auto move_to_vchunk = m_chunk_selector->get_extend_vchunk(move_to_chunk); + move_to_vchunk->m_pg_id = move_from_vchunk->m_pg_id; + move_to_vchunk->m_v_chunk_id = move_from_vchunk->m_v_chunk_id; + + // 7 update the chunk state of move_to_chunk, so that it can be select for creating shard or continue putting blob + move_from_vchunk->m_pg_id = std::nullopt; + move_from_vchunk->m_v_chunk_id = std::nullopt; + move_from_vchunk->m_state = ChunkState::GC; + + // 8 change the move_from_chunk as the new reserved chunk + for (auto& reserved_chunk : m_reserved_chunks) { + if (reserved_chunk->chunk_id == move_to_chunk) { + reserved_chunk->chunk_id = move_from_chunk; + reserved_chunk.write(); + m_reserved_chunk_queue.blockingWrite(move_from_chunk); + break; + } + } + + // 9 update the state of move_to_chunk, so that it can be used for creating shard or putting blob. we need to do + // this after reserved_chunk meta blk is updated, so that if crash happens, we recovery the move_to_chunk is the + // same as that before crash. here, the same means not new put_blob or create_shard happens to it, the data on the + // chunk is the same as before. + move_to_vchunk->m_state = + priority == static_cast< uint8_t >(task_priority::normal) ? ChunkState::AVAILABLE : ChunkState::INUSE; + return true; } @@ -343,88 +484,437 @@ sisl::sg_list GCManager::pdev_gc_actor::generate_shard_super_blk_sg_list(shard_i return shard_sb_sgs; } +// note that, when we copy data, there is not create shard or put blob in this chunk, only delete blob might happen. bool GCManager::pdev_gc_actor::copy_valid_data(chunk_id_t move_from_chunk, chunk_id_t move_to_chunk, bool is_emergent) { + auto move_to_vchunk = m_chunk_selector->get_extend_vchunk(move_to_chunk); + auto move_from_vchunk = m_chunk_selector->get_extend_vchunk(move_from_chunk); + + RELEASE_ASSERT(move_to_vchunk->m_state == ChunkState::GC, "move_to_chunk={} should be in GC state, but in state {}", + move_to_chunk, move_to_vchunk->m_state); + RELEASE_ASSERT(move_from_vchunk->m_state == ChunkState::GC, + "move_from_chunk={} should be in GC state, but in state {}", move_from_chunk, + move_from_vchunk->m_state); + + auto move_to_chunk_total_blks = move_to_vchunk->get_total_blks(); + auto move_to_chunk_available_blks = move_to_vchunk->available_blks(); + + RELEASE_ASSERT(move_to_chunk_total_blks == move_to_chunk_available_blks, + "move_to_chunk should be empty, total_blks={}, available_blks={}, move_to_chunk_id={}", + move_to_chunk_total_blks, move_to_chunk_available_blks, move_to_chunk); + + auto shards = m_hs_home_object->get_shards_in_chunk(move_from_chunk); + if (shards.empty()) { + LOGWARN("no shard found in move_from_chunk, chunk_id={}, ", move_from_chunk); + return true; + } + + auto& data_service = homestore::data_service(); + + const auto last_shard_id = *(shards.rbegin()); + const auto& shard_info = m_hs_home_object->_get_hs_shard(last_shard_id)->info; + const auto& shard_state = shard_info.state; + + // the last shard that triggers emergent gc should be in open state + if (shard_state != ShardInfo::State::OPEN && is_emergent) { + LOGERROR("shard state is not open for emergent gc, shard_id={} !!!", last_shard_id); + return false; + } + + homestore::blk_alloc_hints hints; + hints.chunk_id_hint = move_to_chunk; + homestore::MultiBlkId out_blkids; + + const auto pg_id = shard_info.placement_group; + auto pg_index_table = m_hs_home_object->get_hs_pg(pg_id)->index_table_; + auto blk_size = data_service.get_blk_size(); + + for (const auto& shard_id : shards) { + bool is_last_shard = (shard_id == last_shard_id); + std::vector< std::pair< BlobRouteKey, BlobRouteValue > > valid_blob_indexes; + auto start_key = BlobRouteKey{BlobRoute{shard_id, std::numeric_limits< uint64_t >::min()}}; + auto end_key = BlobRouteKey{BlobRoute{shard_id, std::numeric_limits< uint64_t >::max()}}; + +#if 0 + // range_remove will hit "Node lock and refresh failed" in some case and reture a not_found even if some key has + // been remove, then + // 1 we can not know whether should we try , it will not return retry. + // 2 if not_found happens, whether it means the shard is empty, or just a failure when searching. + // 3 valid_blob_indexes will lost some keys since "Node lock and refresh failed" happen and the call will return + // in advance + + // so not use this until index svc has fixed this. delete all the tombstone keys in pg index table + // and get the valid blob keys + homestore::BtreeRangeRemoveRequest< BlobRouteKey > range_remove_req{ + homestore::BtreeKeyRange< BlobRouteKey >{ + std::move(start_key), true /* inclusive */, std::move(end_key), true /* inclusive */ + }, + nullptr, std::numeric_limits< uint32_t >::max(), + [&valid_blob_indexes](homestore::BtreeKey const& key, homestore::BtreeValue const& value) mutable -> bool { + BlobRouteValue existing_value{value}; + if (existing_value.pbas() == HSHomeObject::tombstone_pbas) { + // delete tombstone key value + return true; + } + valid_blob_indexes.emplace_back(key, value); + return false; + }}; + + auto status = pg_index_table->remove(range_remove_req); + if (status != homestore::btree_status_t::success && + status != homestore::btree_status_t::not_found /*empty shard*/) { + LOGWARN("can not range remove blobs with tombstone in pg index table , pg_id={}, status={}", pg_id, status); + return false; + } +#endif + + // query will never hit "Node lock and refresh failed" and never need to retry + homestore::BtreeQueryRequest< BlobRouteKey > query_req{ + homestore::BtreeKeyRange< BlobRouteKey >{std::move(start_key), true /* inclusive */, std::move(end_key), + true /* inclusive */}, + homestore::BtreeQueryType::SWEEP_NON_INTRUSIVE_PAGINATION_QUERY, + std::numeric_limits< uint32_t >::max() /* blob count in a shard will not exceed uint32_t_max*/, + [](homestore::BtreeKey const& key, homestore::BtreeValue const& value) -> bool { + BlobRouteValue existing_value{value}; + if (existing_value.pbas() == HSHomeObject::tombstone_pbas) { return false; } + return true; + }}; + + auto const status = pg_index_table->query(query_req, valid_blob_indexes); + if (status != homestore::btree_status_t::success) { + LOGERROR("Failed to query blobs in index table for status={} shard={}", status, shard_id); + return false; + } + + auto is_last_shard_in_emergent_chunk = is_emergent && is_last_shard; + + if (valid_blob_indexes.empty()) { + LOGDEBUG("empty shard found in move_from_chunk, chunk_id={}, shard_id={}", move_from_chunk, shard_id); + // TODO::send a delete shard request to raft channel. there is a case that when we are doing gc, the + // shard becomes empty, need to handle this case + + // we should always write a shard header for the last shard of emergent gc. + if (!is_last_shard_in_emergent_chunk) continue; + } else { + LOGDEBUG("{} valid blobs found in move_from_chunk, chunk_id={}, shard_id={}", valid_blob_indexes.size(), + move_from_chunk, shard_id); + } + + // prepare a shard header for this shard in move_to_chunk + sisl::sg_list header_sgs = generate_shard_super_blk_sg_list(shard_id); + + // we now generate shard header from metablk. the shard state in shard header blk should be open, but for sealed + // shard, the state in the generated in-memory header_sgs is sealed. + + if (!is_last_shard_in_emergent_chunk) { + // for the sealed shard, the shard state in header should also be open.now, the written header is the same + // as footer except the shard state, so we lost the original header. + r_cast< HSHomeObject::shard_info_superblk* >(header_sgs.iovs[0].iov_base)->info.state = + ShardInfo::State::OPEN; + + // TODO:: get the original header from the move_from_chunk and change the following part if needed. + /* + uint64_t created_time; + uint64_t last_modified_time; + uint64_t available_capacity_bytes; + uint64_t total_capacity_bytes; + */ + } + + // for emergent gc, we directly use the current shard header as the new header + + // TODO::involve ratelimiter in the following code, where read/write are scheduled. or do we need a central + // ratelimter shard by all components except client io? + auto succeed_copying_shard = + // 1 write the shard header to move_to_chunk + data_service.async_alloc_write(header_sgs, hints, out_blkids) + .thenValue([this, &hints, &move_to_chunk, &move_from_chunk, &is_emergent, &is_last_shard, &shard_id, + &blk_size, &valid_blob_indexes, &data_service, + header_sgs = std::move(header_sgs)](auto&& err) { + RELEASE_ASSERT(header_sgs.iovs.size() == 1, "header_sgs.iovs.size() should be 1, but not!"); + iomanager.iobuf_free(reinterpret_cast< uint8_t* >(header_sgs.iovs[0].iov_base)); + if (err) { + LOGERROR("Failed to write shard header for move_to_chunk={} shard_id={}, err={}", move_to_chunk, + shard_id, err.value()); + return folly::makeFuture< bool >(false); + } + + if (valid_blob_indexes.empty()) { + RELEASE_ASSERT(is_emergent, + "find empty shard in move_from_chunk={} " + "but is_emergent is false, shard_id={}", + move_from_chunk, shard_id); + return folly::makeFuture< bool >(true); + } + + std::vector< folly::Future< bool > > futs; + + // 2 copy all the valid blobs in the shard from move_from_chunk to move_to_chunk + for (const auto& [k, v] : valid_blob_indexes) { + // k is shard_id + blob_id, v is multiblk id + auto pba = v.pbas(); + auto total_size = pba.blk_count() * blk_size; + + // buffer for read and write data + sisl::sg_list data_sgs; + data_sgs.size = total_size; + data_sgs.iovs.emplace_back( + iovec{.iov_base = iomanager.iobuf_alloc(blk_size, total_size), .iov_len = total_size}); + + futs.emplace_back(std::move( + // read blob from move_from_chunk + data_service.async_read(pba, data_sgs, total_size) + .thenValue([this, k, &hints, &move_from_chunk, &move_to_chunk, &data_service, + data_sgs = std::move(data_sgs)](auto&& err) { + RELEASE_ASSERT(data_sgs.iovs.size() == 1, + "data_sgs.iovs.size() should be 1, but not!"); + if (err) { + LOGERROR("Failed to read blob from move_from_chunk={}, shard_id={}, " + "blob_id={}: err={}", + move_from_chunk, k.key().shard, k.key().blob, err.value()); + iomanager.iobuf_free(reinterpret_cast< uint8_t* >(data_sgs.iovs[0].iov_base)); + return folly::makeFuture< bool >(false); + } + + // write the blob to the move_to_chunk. we do not care about the blob order in a + // shard since we can not guarantee a certain order + homestore::MultiBlkId new_pba; + return data_service.async_alloc_write(data_sgs, hints, new_pba) + .thenValue([this, k, new_pba, &move_to_chunk, + data_sgs = std::move(data_sgs)](auto&& err) { + RELEASE_ASSERT(data_sgs.iovs.size() == 1, + "data_sgs.iovs.size() should be 1, but not!"); + iomanager.iobuf_free( + reinterpret_cast< uint8_t* >(data_sgs.iovs[0].iov_base)); + if (err) { + LOGERROR("Failed to write blob to move_to_chunk={}, shard_id={}, " + "blob_id={}, err={}", + move_to_chunk, k.key().shard, k.key().blob, err.value()); + return false; + } + + // insert a new entry to gc index table for this blob. [move_to_chunk_id, + // shard_id, blob_id] -> [new pba] + BlobRouteByChunkKey key{ + BlobRouteByChunk{move_to_chunk, k.key().shard, k.key().blob}}; + BlobRouteValue value{new_pba}, existing_value; + + homestore::BtreeSinglePutRequest put_req{ + &key, &value, homestore::btree_put_type::INSERT, &existing_value}; + auto status = m_index_table->put(put_req); + if (status != homestore::btree_status_t::success) { + LOGERROR("Failed to insert new key to gc index table for " + "move_to_chunk={}, shard_id={}, blob_id={}, err={}", + move_to_chunk, k.key().shard, k.key().blob, status); + return false; + } + LOGDEBUG("successfully insert new key to gc index table for " + "move_to_chunk={}, shard_id={}, blob_id={}", + move_to_chunk, k.key().shard, k.key().blob); + return true; + }); + }))); + } + + // 3 write a shard footer for this shard + sisl::sg_list footer_sgs = generate_shard_super_blk_sg_list(shard_id); + return folly::collectAllUnsafe(futs) + .thenValue([this, &is_emergent, &is_last_shard, &shard_id, &blk_size, &hints, &move_to_chunk, + &data_service, footer_sgs](auto&& results) { + for (auto const& ok : results) { + RELEASE_ASSERT(ok.hasValue(), "we never throw any exception when copying data"); + if (!ok.value()) { + // if any op fails, we drop this gc task. + return folly::makeFuture< std::error_code >( + std::make_error_code(std::errc::operation_canceled)); + } + } + + // the shard that triggers emergent gc should be the last shard in the chunk, so it should + // be open and we skip writing the footer for this case. + if (is_emergent && is_last_shard) { + LOGDEBUG( + "skip writing the footer for move_to_chunk={} shard_id={} for emergent gc task", + move_to_chunk, shard_id); + return folly::makeFuture< std::error_code >(std::error_code{}); + } + homestore::MultiBlkId out_blkids; + return data_service.async_alloc_write(footer_sgs, hints, out_blkids); + }) + .thenValue([this, &move_to_chunk, &shard_id, footer_sgs](auto&& err) { + RELEASE_ASSERT(footer_sgs.iovs.size() == 1, "footer_sgs.iovs.size() should be 1, but not!"); + iomanager.iobuf_free(reinterpret_cast< uint8_t* >(footer_sgs.iovs[0].iov_base)); + + if (err) { + LOGERROR("Failed to write shard footer for move_to_chunk={} shard_id={}, " + "err={}", + move_to_chunk, shard_id, err.value()); + return false; + } + return true; + }); + }) + .get(); + + if (!succeed_copying_shard) { + LOGERROR("Failed to copy all blobs from move_from_chunk={} to move_to_chunk={} for shard_id={}", + move_from_chunk, move_to_chunk, shard_id); + return false; + } + + LOGDEBUG("successfully copy blobs from move_from_chunk={} to move_to_chunk={} for shard_id={}", move_from_chunk, + move_to_chunk, shard_id); + } + + LOGDEBUG("all valid blobs are copied from move_from_chunk={} to move_to_chunk={}", move_from_chunk, move_to_chunk); + + // we need to commit_blk for the move_to_chunk to make sure the last offset of append_blk_allocator is updated. + // However, we don`t know the exact last blk in move_to_chunk. for normal, we can use the footer blk of the last + // shard as the last blk. But, for emergent gc, all the blks in the last shard are written concurrently and there is + // no footer for the last shard. so we use a fake multiblk here to make sure the append_blk_allocator is committed + // to the exact last offset. + const auto current_occupied_blk_count = move_to_vchunk->get_total_blks() - move_to_vchunk->available_blks(); + homestore::MultiBlkId commit_blk_id(0, current_occupied_blk_count, move_to_chunk); + if (data_service.commit_blk(commit_blk_id) != homestore::BlkAllocStatus::SUCCESS) { + LOGERROR("fail to commit_blk for move_to_chunk={}, move_from_chunk={}", move_to_chunk, move_from_chunk); + return false; + } + + // remove all the tombstone keys in pg index table for this chunk + // TODO:: we can enable the range_remove above and delete this part after the indexsvc issue is fixed + for (const auto& shard_id : shards) { + auto start_key = BlobRouteKey{BlobRoute{shard_id, std::numeric_limits< uint64_t >::min()}}; + auto end_key = BlobRouteKey{BlobRoute{shard_id, std::numeric_limits< uint64_t >::max()}}; + homestore::BtreeRangeRemoveRequest< BlobRouteKey > range_remove_req{ + homestore::BtreeKeyRange< BlobRouteKey >{ + std::move(start_key), true /* inclusive */, std::move(end_key), true /* inclusive */ + }, + nullptr, std::numeric_limits< uint32_t >::max(), + [](homestore::BtreeKey const& key, homestore::BtreeValue const& value) -> bool { + BlobRouteValue existing_value{value}; + if (existing_value.pbas() == HSHomeObject::tombstone_pbas) { + // delete tombstone key value + return true; + } + return false; + }}; + + auto status = pg_index_table->remove(range_remove_req); + if (status != homestore::btree_status_t::success && + status != homestore::btree_status_t::not_found /*empty shard*/) { + // if fail to remove tombstone, it does not matter and they will be removed in the next gc task. + LOGWARN("fail to remove tombstone for shard={}, pg_id={}, status={}", shard_id, pg_id, status); + } + // TODO:: after the completion of indexsvc, we need to retry according to the returned status. + + LOGDEBUG("remove tombstone for pg={}, shard={}, ret={}, move_from_chunk={}, move_to_chunk={},", pg_id, shard_id, + status, move_from_chunk, move_to_chunk); + } + return true; } -void GCManager::pdev_gc_actor::purge_reserved_chunk(chunk_id_t chunk) { +bool GCManager::pdev_gc_actor::purge_reserved_chunk(chunk_id_t chunk) { auto vchunk = m_chunk_selector->get_extend_vchunk(chunk); RELEASE_ASSERT(!vchunk->m_pg_id.has_value(), "chunk_id={} is expected to be a reserved chunk, and not belong to a pg", chunk); + RELEASE_ASSERT(vchunk->m_state == ChunkState::GC, + "chunk_id={} is a reserved chunk, expected to have a GC state, but actuall state is {} ", chunk, + vchunk->m_state); vchunk->reset(); // reset the chunk to make sure it is empty // clear all the entries of this chunk in the gc index table auto start_key = BlobRouteByChunkKey{BlobRouteByChunk(chunk, 0, 0)}; auto end_key = BlobRouteByChunkKey{ BlobRouteByChunk{chunk, std::numeric_limits< uint64_t >::max(), std::numeric_limits< uint64_t >::max()}}; + homestore::BtreeRangeRemoveRequest< BlobRouteByChunkKey > range_remove_req{ homestore::BtreeKeyRange< BlobRouteByChunkKey >{ - std::move(start_key), true /* inclusive */, std::move(end_key), true /* inclusive */ + std::move(start_key), true /* inclusive */ + , + std::move(end_key), true /* inclusive */ }}; + auto status = m_index_table->remove(range_remove_req); - if (status != homestore::btree_status_t::success) { - // TODO:: handle the error case here! - RELEASE_ASSERT(false, "can not clear gc index table, chunk_id={}", chunk); + if (status != homestore::btree_status_t::success && + status != homestore::btree_status_t::not_found /*already empty*/) { + LOGWARN("fail to purge gc index for chunk={}", chunk); + return false; } + + return true; } void GCManager::pdev_gc_actor::process_gc_task(chunk_id_t move_from_chunk, uint8_t priority, folly::Promise< bool > task) { - LOGINFO("start process gc task for move_from_chunk={} with priority={} ", move_from_chunk, priority); - - // make chunk to gc state, so that it can be select for creating shard - auto succeed = m_chunk_selector->try_mark_chunk_to_gc_state( - move_from_chunk, priority == static_cast< uint8_t >(task_priority::emergent) /* force */); + LOGDEBUG("start process gc task for move_from_chunk={} with priority={} ", move_from_chunk, priority); + auto vchunk = m_chunk_selector->get_extend_vchunk(move_from_chunk); - // the move_from_chunk probably now is used by an open shard, so we need to check if it can be marked as gc - // state. - if (!succeed) { - LOGWARN("move_from_chunk={} is expected to be mark as gc state, but not!", move_from_chunk); + if (vchunk->m_state != ChunkState::GC) { + LOGWARN("move_from_chunk={} is expected to in GC state, but not!", move_from_chunk); task.setValue(false); return; } + RELEASE_ASSERT(vchunk->m_pg_id.has_value(), "chunk_id={} is expected to belong to a pg, but not!", move_from_chunk); + chunk_id_t move_to_chunk; - // wait for a reserved chunk to be available + + // wait for a reserved chunk to be available. now, the amount of threads in the folly executor(thread pool) is equal + // to the amount of reserved number, so we can make sure that a gc task handle thread can always get a reserved + // chunk, so acutally the blockingRead here will not block in any case and return immediately. m_reserved_chunk_queue.blockingRead(move_to_chunk); - LOGINFO("gc task for move_from_chunk={} to move_to_chunk={} with priority={} start copying data", move_from_chunk, - move_to_chunk, priority); + LOGDEBUG("gc task for move_from_chunk={} to move_to_chunk={} with priority={} start copying data", move_from_chunk, + move_to_chunk, priority); - purge_reserved_chunk(move_to_chunk); + if (!purge_reserved_chunk(move_to_chunk)) { + LOGWARN("can not purge move_to_chunk={}", move_to_chunk); + task.setValue(false); + m_reserved_chunk_queue.blockingWrite(move_to_chunk); + return; + } - if (!copy_valid_data(move_from_chunk, move_to_chunk)) { + if (!copy_valid_data(move_from_chunk, move_to_chunk, priority == static_cast< uint8_t >(task_priority::emergent))) { LOGWARN("failed to copy data from move_from_chunk={} to move_to_chunk={} with priority={}", move_from_chunk, move_to_chunk, priority); task.setValue(false); + m_reserved_chunk_queue.blockingWrite(move_to_chunk); return; } // trigger cp to make sure the offset the the append blk allocator and the wbcache of gc index table are both // flushed. auto fut = homestore::hs()->cp_mgr().trigger_cp_flush(true /* force */); - LOGINFO("CP Flush {} after copy data from move_from_chunk {} to move_to_chunk {} with priority={}", - std::move(fut).get() ? "success" : "failed", move_from_chunk, move_to_chunk, priority); + RELEASE_ASSERT(std::move(fut).get(), "expect gc index table and blk allocator to be flushed but failed!"); - // after data copy, we need persist the gc task meta blk, so that if crash happens, we can recover it after - // restart. + // after data copy, we persist the gc task meta blk. now, we can make sure all the valid blobs are successfully + // copyed and new blob indexes have be written to gc index table before gc task superblk is persisted. homestore::superblk< GCManager::gc_task_superblk > gc_task_sb{GCManager::_gc_task_meta_name}; gc_task_sb.create(sizeof(GCManager::gc_task_superblk)); gc_task_sb->move_from_chunk = move_from_chunk; gc_task_sb->move_to_chunk = move_to_chunk; gc_task_sb->priority = priority; - // write the gc task meta blk to the meta service, so that it can be recovered after restart + // write the gc task meta blk to the meta service, so that it can be recovered when restarting gc_task_sb.write(); - // after the data copy is done, we can switch the two chunks. - LOGINFO("gc task for move_from_chunk={} to move_to_chunk={} with priority={} start switching chunk", - move_from_chunk, move_to_chunk, priority); + LOGDEBUG("gc task for move_from_chunk={} to move_to_chunk={} with priority={} start replacing blob index", + move_from_chunk, move_to_chunk, priority); + + // no matter fail or succeed, we need to put one reserved chunk to the queue to make sure the reserved chunk will + // not lose if (!replace_blob_index(move_from_chunk, move_to_chunk, priority)) { + // if we fail to replace blob index, the worst case is some of the valid blobs index is update, but others not. + // At this moment, we can not drop any one of move_from_chunk and move_to_chunk, since they both contains valid + // blob data. we can not go ahead and + + // TODO::add a method to restore the old index if any error happen when replacing blob index RELEASE_ASSERT(false, "failed to replace blob index, move_from_chunk={} to move_to_chunk={} with priority={}", move_from_chunk, move_to_chunk, priority); } - // TODO: change the chunk state of move_to_chunk to AVAILABLE so that it can be used for new shard. - // now we can complete the task. for emergent gc, we need wait for the gc task to be completed + // trigger again to make sure the pg index wbcache is persisted. + fut = homestore::hs()->cp_mgr().trigger_cp_flush(true /* force */); + RELEASE_ASSERT(std::move(fut).get(), "expect pg index table to be flushed but failed!"); + gc_task_sb.destroy(); task.setValue(true); LOGINFO("gc task for move_from_chunk={} to move_to_chunk={} with priority={} is completed", move_from_chunk, diff --git a/src/lib/homestore_backend/gc_manager.hpp b/src/lib/homestore_backend/gc_manager.hpp index ed348291..ac6ebb23 100644 --- a/src/lib/homestore_backend/gc_manager.hpp +++ b/src/lib/homestore_backend/gc_manager.hpp @@ -16,24 +16,11 @@ #include "heap_chunk_selector.h" #include "index_kv.hpp" +#include "hs_backend_config.hpp" namespace homeobject { class HSHomeObject; -// TODO:: make those #define below configurable - -// Default number of chunks reserved for GC per pdev -#define RESERVED_CHUNK_NUM_PER_PDEV 6 -// reserved chunk number dedicated for emergent GC -#define RESERVED_CHUNK_NUM_DEDICATED_FOR_EGC 2 -// GC interval in seconds, this is used to schedule the GC timer -#define GC_SCAN_INTERVAL_SEC 10llu -// Garbage rate threshold, bigger than which, a gc task will be scheduled for a chunk -#define GC_GARBAGE_RATE_THRESHOLD 80 -// limit the io resource that gc thread can take, so that it will not impact the client io. -// assuming the throughput of a HDD is 300M/s(including read and write) and gc can take 10% of the io resource, which is -// 30M/s. A block is 4K, so gc can read/write 30M/s / 4K = 7680 blocks per second. -#define MAX_READ_WRITE_BLOCK_COUNT_PER_SECOND 7680 ENUM(task_priority, uint8_t, emergent = 0, normal, priority_count); @@ -72,7 +59,7 @@ class GCManager { struct gc_reserved_chunk_superblk { chunk_id_t chunk_id; - static std::string name() { return _gc_actor_meta_name; } + static std::string name() { return _gc_reserved_chunk_meta_name; } }; #pragma pack() @@ -114,7 +101,7 @@ class GCManager { pdev_gc_actor& operator=(pdev_gc_actor&&) = delete; public: - void add_reserved_chunk(chunk_id_t chunk_id); + void add_reserved_chunk(homestore::superblk< GCManager::gc_reserved_chunk_superblk > reserved_chunk_sb); folly::SemiFuture< bool > add_gc_task(uint8_t priority, chunk_id_t move_from_chunk); void handle_recovered_gc_task(const GCManager::gc_task_superblk* gc_task); void start(); @@ -136,7 +123,7 @@ class GCManager { // before we select a reserved chunk and start gc, we need: // 1 clear all the entries of this chunk in the gc index table // 2 reset this chunk to make sure it is empty. - void purge_reserved_chunk(chunk_id_t move_to_chunk); + bool purge_reserved_chunk(chunk_id_t move_to_chunk); private: // utils @@ -148,10 +135,18 @@ class GCManager { folly::MPMCQueue< chunk_id_t > m_reserved_chunk_queue; std::shared_ptr< GCBlobIndexTable > m_index_table; HSHomeObject* m_hs_home_object{nullptr}; - RateLimiter m_rate_limiter{MAX_READ_WRITE_BLOCK_COUNT_PER_SECOND}; + + // limit the io resource that gc thread can take, so that it will not impact the client io. + // assuming the throughput of a HDD is 300M/s(including read and write) and gc can take 10% of the io resource, + // which is 30M/s. A block is 4K, so gc can read/write 30M/s / 4K = 7680 blocks per second. + RateLimiter m_rate_limiter{HS_BACKEND_DYNAMIC_CONFIG(max_read_write_block_count_per_second)}; + std::shared_ptr< folly::IOThreadPoolExecutor > m_gc_executor; std::shared_ptr< folly::IOThreadPoolExecutor > m_egc_executor; std::atomic_bool m_is_stopped{true}; + // since we have a very small number of reserved chunks, a vector is enough + // TODO:: use a map if we have a large number of reserved chunks + std::vector< homestore::superblk< GCManager::gc_reserved_chunk_superblk > > m_reserved_chunks; }; public: @@ -179,9 +174,9 @@ class GCManager { void start(); void stop(); -private: void scan_chunks_for_gc(); +private: void on_gc_task_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie); void on_gc_actor_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie); void on_reserved_chunk_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie); diff --git a/src/lib/homestore_backend/heap_chunk_selector.cpp b/src/lib/homestore_backend/heap_chunk_selector.cpp index 8ca0a938..28d6bb9e 100644 --- a/src/lib/homestore_backend/heap_chunk_selector.cpp +++ b/src/lib/homestore_backend/heap_chunk_selector.cpp @@ -263,6 +263,45 @@ std::optional< uint32_t > HeapChunkSelector::select_chunks_for_pg(pg_id_t pg_id, return num_chunk; } +void HeapChunkSelector::switch_chunks_for_pg(const pg_id_t pg_id, const chunk_num_t old_chunk_id, + const chunk_num_t new_chunk_id) { + LOGDEBUGMOD(homeobject, "switch chunks for pg_id={}, old_chunk={}, new_chunk={}", pg_id, old_chunk_id, + new_chunk_id); + + auto EXVchunk_old = get_extend_vchunk(old_chunk_id); + auto EXVchunk_new = get_extend_vchunk(new_chunk_id); + + auto old_available_blks = EXVchunk_old->available_blks(); + auto new_available_blks = EXVchunk_new->available_blks(); + + RELEASE_ASSERT(EXVchunk_old->m_v_chunk_id.has_value(), "old_chunk_id={} should has a vchunk_id", old_chunk_id); + RELEASE_ASSERT(EXVchunk_old->m_pg_id.has_value(), "old_chunk_id={} should belongs to a pg", old_chunk_id); + RELEASE_ASSERT(EXVchunk_old->m_pg_id.value() == pg_id, "old_chunk_id={} should belongs to pg={}", old_chunk_id, + pg_id); + + auto v_chunk_id = EXVchunk_old->m_v_chunk_id.value(); + + std::unique_lock lock(m_chunk_selector_mtx); + auto pg_it = m_per_pg_chunks.find(pg_id); + RELEASE_ASSERT(pg_it != m_per_pg_chunks.end(), "No pg_chunk_collection found for pg={}", pg_id); + auto& pg_chunk_collection = pg_it->second; + + std::unique_lock lk(pg_chunk_collection->mtx); + auto& pg_chunks = pg_chunk_collection->m_pg_chunks; + + RELEASE_ASSERT(pg_chunks[v_chunk_id]->get_chunk_id() == old_chunk_id, + "vchunk={} for pg={} should have a pchunk={} , but have a pchunk={}", v_chunk_id, pg_id, + old_chunk_id, pg_chunks[v_chunk_id]->get_chunk_id()); + + pg_chunks[v_chunk_id] = EXVchunk_new; + + LOGINFOMOD(homeobject, + "vchunk={} in pg_chunk_collection for pg_id={} has been update from pchunk_id={} to pchunk_id={}", + v_chunk_id, pg_id, old_chunk_id, new_chunk_id); + + pg_chunk_collection->available_blk_count += new_available_blks - old_available_blks; +} + bool HeapChunkSelector::recover_pg_chunks(pg_id_t pg_id, std::vector< chunk_num_t >&& p_chunk_ids) { std::unique_lock lock_guard(m_chunk_selector_mtx); // check pg exist diff --git a/src/lib/homestore_backend/heap_chunk_selector.h b/src/lib/homestore_backend/heap_chunk_selector.h index 9891cba1..e1d451a1 100644 --- a/src/lib/homestore_backend/heap_chunk_selector.h +++ b/src/lib/homestore_backend/heap_chunk_selector.h @@ -128,7 +128,7 @@ class HeapChunkSelector : public homestore::ChunkSelector { // this should be called on each pg meta blk found bool recover_pg_chunks(pg_id_t pg_id, std::vector< chunk_num_t >&& p_chunk_ids); - // this should be called after all pg meta blk recovered + // this should be called after all pg meta blk and gc reserved chunk recovered void build_pdev_available_chunk_heap(); // this should be called after ShardManager is initialized and get all the open shards @@ -198,6 +198,8 @@ class HeapChunkSelector : public homestore::ChunkSelector { homestore::cshared< ExtendedVChunk > get_extend_vchunk(const chunk_num_t chunk_id) const; + void switch_chunks_for_pg(const pg_id_t pg_id, const chunk_num_t old_chunk_id, const chunk_num_t new_chunk_id); + private: void add_chunk_internal(const chunk_num_t, bool add_to_heap = true); diff --git a/src/lib/homestore_backend/hs_backend_config.fbs b/src/lib/homestore_backend/hs_backend_config.fbs index ded10ff7..944a9f95 100644 --- a/src/lib/homestore_backend/hs_backend_config.fbs +++ b/src/lib/homestore_backend/hs_backend_config.fbs @@ -18,6 +18,26 @@ table HSBackendSettings { //Reserved space in a chunk reserved_bytes_in_chunk: uint64 = 16777216 (hotswap); + + //Enable GC + //TODO: make this hotswap after gc is well tested + enable_gc: bool = false; + + //Total reserved chunk num (dedicated for gc/egc) per pdev + reserved_chunk_num_per_pdev: uint8 = 6; + + //Reserved chunk number (dedicated for egc) per pdev + reserved_chunk_num_per_pdev_for_egc: uint8 = 2; + + //GC scan interval(second) + gc_scan_interval_sec: uint64 = 60; + + //GC garbage rate threshold, upon which a chunk will be selected for gc + gc_garbage_rate_threshold: uint8 = 80; + + //max read/write block count per second, which is used by ratelimiter to limit the io resource taken by gc + max_read_write_block_count_per_second: uint16 = 7680; + } root_type HSBackendSettings; diff --git a/src/lib/homestore_backend/hs_blob_manager.cpp b/src/lib/homestore_backend/hs_blob_manager.cpp index 36cc44d6..61762d88 100644 --- a/src/lib/homestore_backend/hs_blob_manager.cpp +++ b/src/lib/homestore_backend/hs_blob_manager.cpp @@ -527,13 +527,18 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis if (recovery_done_) { BLOGE(tid, blob_info.shard_id, blob_info.blob_id, "Failed to move blob to tombstone, error={}", r.error()); if (ctx) ctx->promise_.setValue(folly::makeUnexpected(r.error())); - return; } else { if (ctx) ctx->promise_.setValue(BlobManager::Result< BlobInfo >(blob_info)); - return; } + + // if we return directly, for the perspective of statemachin, this lsn has been committed successfully. so there + // will be no more deletion for this blob, and as a result , blob leak happens + RELEASE_ASSERT(false, "fail to commit delete blob log for pg={}, shard={}, blob={}", msg_header->pg_id, + msg_header->shard_id, blob_info.blob_id); } + LOGD("shard_id={}, blob_id={} has been moved to tombstone, lsn={}", blob_info.shard_id, blob_info.blob_id, lsn); + auto& multiBlks = r.value(); if (multiBlks != tombstone_pbas) { repl_dev->async_free_blks(lsn, multiBlks); diff --git a/src/lib/homestore_backend/hs_cp_callbacks.cpp b/src/lib/homestore_backend/hs_cp_callbacks.cpp index f54a0edd..03175031 100644 --- a/src/lib/homestore_backend/hs_cp_callbacks.cpp +++ b/src/lib/homestore_backend/hs_cp_callbacks.cpp @@ -32,20 +32,21 @@ std::unique_ptr< CPContext > HSHomeObject::MyCPCallbacks::on_switchover_cp(CP* c folly::Future< bool > HSHomeObject::MyCPCallbacks::cp_flush(CP* cp) { std::vector< HSHomeObject::HS_PG* > dirty_pg_list; dirty_pg_list.reserve(home_obj_._pg_map.size()); - { - std::shared_lock lock_guard(home_obj_._pg_lock); - for (auto const& [id, pg] : home_obj_._pg_map) { - auto hs_pg = static_cast< HSHomeObject::HS_PG* >(pg.get()); - - // All dirty durable entries are updated in the superblk. We persist outside the pg_lock - if (!hs_pg->is_dirty_.exchange(false)) { continue; } - - hs_pg->pg_sb_->blob_sequence_num = hs_pg->durable_entities().blob_sequence_num.load(); - hs_pg->pg_sb_->active_blob_count = hs_pg->durable_entities().active_blob_count.load(); - hs_pg->pg_sb_->tombstone_blob_count = hs_pg->durable_entities().tombstone_blob_count.load(); - hs_pg->pg_sb_->total_occupied_blk_count = hs_pg->durable_entities().total_occupied_blk_count.load(); - dirty_pg_list.push_back(hs_pg); - } + + // the metablk update in cp flush might have a confict with gc, which will also try to update metablk, so we need + // hold the unique lock until all the updates are completed + std::unique_lock lock_guard(home_obj_._pg_lock); + for (auto const& [id, pg] : home_obj_._pg_map) { + auto hs_pg = static_cast< HSHomeObject::HS_PG* >(pg.get()); + + // All dirty durable entries are updated in the superblk. We persist outside the pg_lock + if (!hs_pg->is_dirty_.exchange(false)) { continue; } + + hs_pg->pg_sb_->blob_sequence_num = hs_pg->durable_entities().blob_sequence_num.load(); + hs_pg->pg_sb_->active_blob_count = hs_pg->durable_entities().active_blob_count.load(); + hs_pg->pg_sb_->tombstone_blob_count = hs_pg->durable_entities().tombstone_blob_count.load(); + hs_pg->pg_sb_->total_occupied_blk_count = hs_pg->durable_entities().total_occupied_blk_count.load(); + dirty_pg_list.push_back(hs_pg); } for (auto& hs_pg : dirty_pg_list) { diff --git a/src/lib/homestore_backend/hs_homeobject.cpp b/src/lib/homestore_backend/hs_homeobject.cpp index f06ffdf9..85e57674 100644 --- a/src/lib/homestore_backend/hs_homeobject.cpp +++ b/src/lib/homestore_backend/hs_homeobject.cpp @@ -223,10 +223,11 @@ void HSHomeObject::init_homestore() { repl_app->on_repl_devs_init_completed(); // we need to regitster the meta blk handlers after metaservice is ready. - gc_mgr_ = std::make_unique< GCManager >(chunk_selector_, this); + gc_mgr_ = std::make_shared< GCManager >(chunk_selector_, this); // if this is the first time we are starting, we need to create gc metablk for each pdev, which record the // reserved chunks and indextable. auto pdev_chunks = chunk_selector_->get_pdev_chunks(); + const auto reserved_chunk_num_per_pdev = HS_BACKEND_DYNAMIC_CONFIG(reserved_chunk_num_per_pdev); for (auto const& [pdev_id, chunks] : pdev_chunks) { // 1 create gc index table for each pdev auto gc_index_table = create_gc_index_table(); @@ -241,12 +242,12 @@ void HSHomeObject::init_homestore() { gc_actor_sb->index_table_uuid = uuid; gc_actor_sb.write(); - RELEASE_ASSERT(chunks.size() > RESERVED_CHUNK_NUM_PER_PDEV, + RELEASE_ASSERT(chunks.size() > reserved_chunk_num_per_pdev, "pdev {} has {} chunks, but we need at least {} chunks for reserved chunk", pdev_id, - chunks.size(), RESERVED_CHUNK_NUM_PER_PDEV); + chunks.size(), reserved_chunk_num_per_pdev); // 3 create reserved chunk meta blk for each pdev, which contains the reserved chunks. - for (size_t i = 0; i < RESERVED_CHUNK_NUM_PER_PDEV; ++i) { + for (size_t i = 0; i < reserved_chunk_num_per_pdev; ++i) { auto chunk = chunks[i]; homestore::superblk< GCManager::gc_reserved_chunk_superblk > reserved_chunk_sb{ GCManager::_gc_reserved_chunk_meta_name}; @@ -352,15 +353,21 @@ void HSHomeObject::init_cp() { void HSHomeObject::init_gc() { using namespace homestore; - if (!gc_mgr_) gc_mgr_ = std::make_unique< GCManager >(chunk_selector_, this); + if (!gc_mgr_) gc_mgr_ = std::make_shared< GCManager >(chunk_selector_, this); // when initializing, there is not gc task. we need to recover reserved chunks here, so that the reserved chunks // will not be put into pdev heap when built HomeStore::instance()->meta_service().read_sub_sb(GCManager::_gc_actor_meta_name); HomeStore::instance()->meta_service().read_sub_sb(GCManager::_gc_reserved_chunk_meta_name); HomeStore::instance()->meta_service().read_sub_sb(GCManager::_gc_task_meta_name); - // TODO::enable gc after we have data copy - // gc_mgr_->start(); + const auto enable_gc = HS_BACKEND_DYNAMIC_CONFIG(enable_gc); + + if (enable_gc) { + LOGI("Starting GC"); + gc_mgr_->start(); + } else { + LOGI("GC is disabled"); + } } // void HSHomeObject::trigger_timed_events() { persist_pg_sb(); } @@ -460,4 +467,13 @@ std::shared_ptr< GCBlobIndexTable > HSHomeObject::get_gc_index_table(std::string return it->second; } +void HSHomeObject::trigger_immediate_gc() { + if (!gc_mgr_) { + LOGI("scan chunks for gc immediately"); + gc_mgr_->scan_chunks_for_gc(); + } else { + LOGE("GC is not enabled"); + } +} + } // namespace homeobject diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index ec707e9f..5b653caa 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -83,7 +83,7 @@ class HSHomeObject : public HomeObjectImpl { std::once_flag replica_restart_flag_; // mapping from chunk to shard list. - folly::ConcurrentHashMap< homestore::chunk_num_t, std::set< shard_id_t > > chunk_to_shards_map_; + std::unordered_map< homestore::chunk_num_t, std::set< shard_id_t > > chunk_to_shards_map_; public: #pragma pack(1) @@ -353,7 +353,7 @@ class HSHomeObject : public HomeObjectImpl { HS_Shard(homestore::superblk< shard_info_superblk >&& sb); ~HS_Shard() override = default; - void update_info(const ShardInfo& info); + void update_info(const ShardInfo& info, std::optional< homestore::chunk_num_t > p_chunk_id = std::nullopt); auto p_chunk_id() const { return sb_->p_chunk_id; } }; @@ -628,7 +628,7 @@ class HSHomeObject : public HomeObjectImpl { std::unordered_map< homestore::group_id_t, homestore::superblk< snapshot_ctx_superblk > > snp_ctx_sbs_; mutable std::shared_mutex snp_sbs_lock_; shared< HeapChunkSelector > chunk_selector_; - std::unique_ptr< GCManager > gc_mgr_; + shared< GCManager > gc_mgr_; unique< HttpManager > http_mgr_; bool recovery_done_{false}; @@ -783,6 +783,9 @@ class HSHomeObject : public HomeObjectImpl { */ std::optional< homestore::chunk_num_t > get_shard_p_chunk_id(shard_id_t id) const; + void update_shard_meta_after_gc(const homestore::chunk_num_t move_from_chunk, + const homestore::chunk_num_t move_to_chunk); + /** * @brief Retrieves the chunk number associated with the given shard ID. * @@ -835,6 +838,7 @@ class HSHomeObject : public HomeObjectImpl { cintrusive< homestore::repl_req_ctx >& hs_ctx); cshared< HeapChunkSelector > chunk_selector() const { return chunk_selector_; } + cshared< GCManager > gc_manager() const { return gc_mgr_; } // Blob manager related. void on_blob_message_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, @@ -854,7 +858,12 @@ class HSHomeObject : public HomeObjectImpl { recover_index_table(homestore::superblk< homestore::index_table_sb >&& sb); std::optional< pg_id_t > get_pg_id_with_group_id(homestore::group_id_t group_id) const; - const auto get_shards_in_chunk(homestore::chunk_num_t chunk_id) const { return chunk_to_shards_map_.at(chunk_id); } + const std::set< shard_id_t > get_shards_in_chunk(homestore::chunk_num_t chunk_id) const; + + void update_pg_meta_after_gc(const pg_id_t pg_id, const homestore::chunk_num_t move_from_chunk, + const homestore::chunk_num_t move_to_chunk); + + uint32_t get_pg_tombstone_blob_count(pg_id_t pg_id) const; // Snapshot persistence related sisl::io_blob_safe get_snapshot_sb_data(homestore::group_id_t group_id); @@ -862,6 +871,7 @@ class HSHomeObject : public HomeObjectImpl { void destroy_snapshot_sb(homestore::group_id_t group_id); const Shard* _get_hs_shard(const shard_id_t shard_id) const; std::shared_ptr< GCBlobIndexTable > get_gc_index_table(std::string uuid) const; + void trigger_immediate_gc(); private: std::shared_ptr< BlobIndexTable > create_pg_index_table(); diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index ba0b80bc..f6352a00 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -726,4 +726,120 @@ void HSHomeObject::on_create_pg_message_rollback(int64_t lsn, sisl::blob const& } } +uint32_t HSHomeObject::get_pg_tombstone_blob_count(pg_id_t pg_id) const { + // caller should hold the _pg_lock + auto hs_pg = _get_hs_pg_unlocked(pg_id); + if (hs_pg == nullptr) { + LOGW("get pg tombstone blob count with unknown pg={}", pg_id); + return 0; + } + + uint32_t tombstone_blob_count{0}; + + auto start_key = + BlobRouteKey{BlobRoute{uint64_t(pg_id) << homeobject::shard_width, std::numeric_limits< uint64_t >::min()}}; + auto end_key = + BlobRouteKey{BlobRoute{uint64_t(pg_id + 1) << homeobject::shard_width, std::numeric_limits< uint64_t >::min()}}; + + homestore::BtreeQueryRequest< BlobRouteKey > query_req{ + homestore::BtreeKeyRange< BlobRouteKey >{std::move(start_key), true /* inclusive */, std::move(end_key), + false /* inclusive */}, + homestore::BtreeQueryType::SWEEP_NON_INTRUSIVE_PAGINATION_QUERY, + std::numeric_limits< uint32_t >::max() /* blob count in a shard will not exceed uint32_t_max*/, + [&tombstone_blob_count](homestore::BtreeKey const& key, homestore::BtreeValue const& value) mutable -> bool { + BlobRouteValue existing_value{value}; + if (existing_value.pbas() == HSHomeObject::tombstone_pbas) { tombstone_blob_count++; } + return false; + }}; + + std::vector< std::pair< BlobRouteKey, BlobRouteValue > > valid_blob_indexes; + + auto const status = hs_pg->index_table_->query(query_req, valid_blob_indexes); + if (status != homestore::btree_status_t::success && status != homestore::btree_status_t::has_more) { + LOGERROR("Failed to query blobs in index table for pg={}", pg_id); + return 0; + } + // we should not have any valid blob indexes in tombstone query + RELEASE_ASSERT(valid_blob_indexes.empty(), "we only query tombstone in pg={}, but got vaild blob in return value", + pg_id); + + return tombstone_blob_count; +} + +void HSHomeObject::update_pg_meta_after_gc(const pg_id_t pg_id, const homestore::chunk_num_t move_from_chunk, + const homestore::chunk_num_t move_to_chunk) { + // 1 update pg metrics + std::unique_lock lck(_pg_lock); + auto iter = _pg_map.find(pg_id); + // TODO:: revisit here with the considering of destroying pg + RELEASE_ASSERT(iter != _pg_map.end(), "can not find pg_id={} in pg_map", pg_id); + auto hs_pg = dynamic_cast< HS_PG* >(iter->second.get()); + auto move_from_v_chunk = chunk_selector()->get_extend_vchunk(move_from_chunk); + + // TODO:: for now, when updating pchunk for a vchunk, we have to update the whole pg super blk. we can optimize this + // by persist a single superblk for each vchunk in the pg, so that we only need to update the vchunk superblk + // itself. + + auto pg_chunks = hs_pg->pg_sb_->get_chunk_ids_mutable(); + + RELEASE_ASSERT(move_from_v_chunk != nullptr, "can not find EXVchunk for chunk={}", move_from_chunk); + RELEASE_ASSERT(move_from_v_chunk->m_v_chunk_id.has_value(), "can not find vchunk for chunk={}, pg_id={}", + move_from_chunk, pg_id); + auto v_chunk_id = move_from_v_chunk->m_v_chunk_id.value(); + + RELEASE_ASSERT(pg_chunks[v_chunk_id] == move_from_chunk, + "vchunk_id={} chunk_id={} is not equal to move_from_chunk={} for pg={}", v_chunk_id, + pg_chunks[v_chunk_id], move_from_chunk, pg_id); + + if (pg_chunks[v_chunk_id] == move_to_chunk) { + // this might happens when crash recovery. the crash happens after pg metablk is updated but before gc task + // metablk is updated. + LOGD("the pchunk_id for vchunk_id={} for pg_id={} is already {}, skip update pg metablk", v_chunk_id, pg_id, + move_to_chunk); + } + + LOGD("pchunk for vchunk={} of pg_id={} is updated from {} to {}", v_chunk_id, pg_id, move_from_chunk, + move_to_chunk); + // update the vchunk to new pchunk(move_to_chunk) + pg_chunks[v_chunk_id] = move_to_chunk; + + // TODO:hs_pg->shards_.size() will be decreased by 1 in delete_shard if gc finds a empty shard, which will be + // implemented later + hs_pg->durable_entities_update([this, move_from_v_chunk, &move_to_chunk, &move_from_chunk, &pg_id](auto& de) { + // active_blob_count is updated by put/delete blob, not change it here. + + // considering the complexity of gc crash recovery for tombstone_blob_count, we get it directly from index table + // , which is the most accurate. + + // TODO::do we need this as durable entity? remove it and got all the from pg index in real time. + de.tombstone_blob_count = get_pg_tombstone_blob_count(pg_id); + + auto move_to_v_chunk = chunk_selector()->get_extend_vchunk(move_to_chunk); + + auto total_occupied_blk_count_by_move_from_chunk = + move_from_v_chunk->get_total_blks() - move_from_v_chunk->available_blks(); + auto total_occupied_blk_count_by_move_to_chunk = + move_to_v_chunk->get_total_blks() - move_to_v_chunk->available_blks(); + + de.total_occupied_blk_count -= + total_occupied_blk_count_by_move_from_chunk - total_occupied_blk_count_by_move_to_chunk; + + LOGD("move_from_chunk={}, total_occupied_blk_count_by_move_from_chunk={}, move_to_chunk={}, " + "total_occupied_blk_count_by_move_to_chunk={}, total_occupied_blk_count={}", + move_from_chunk, total_occupied_blk_count_by_move_from_chunk, move_to_chunk, + total_occupied_blk_count_by_move_to_chunk, de.total_occupied_blk_count.load()); + }); + + hs_pg->pg_sb_->total_occupied_blk_count = + hs_pg->durable_entities().total_occupied_blk_count.load(std::memory_order_relaxed); + + hs_pg->pg_sb_->tombstone_blob_count = + hs_pg->durable_entities().tombstone_blob_count.load(std::memory_order_relaxed); + + hs_pg->pg_sb_.write(); + + // 2 change the pg_chunkcollection in chunk selector. + chunk_selector()->switch_chunks_for_pg(pg_id, move_from_chunk, move_to_chunk); +} + } // namespace homeobject diff --git a/src/lib/homestore_backend/hs_shard_manager.cpp b/src/lib/homestore_backend/hs_shard_manager.cpp index 124aa9e4..0334718a 100644 --- a/src/lib/homestore_backend/hs_shard_manager.cpp +++ b/src/lib/homestore_backend/hs_shard_manager.cpp @@ -86,7 +86,6 @@ std::string HSHomeObject::serialize_shard_info(const ShardInfo& info) { j["shard_info"]["modified_time"] = info.last_modified_time; j["shard_info"]["total_capacity"] = info.total_capacity_bytes; j["shard_info"]["available_capacity"] = info.available_capacity_bytes; - j["shard_info"]["deleted_capacity"] = info.deleted_capacity_bytes; return j.dump(); } @@ -101,7 +100,6 @@ ShardInfo HSHomeObject::deserialize_shard_info(const char* json_str, size_t str_ shard_info.last_modified_time = shard_json["shard_info"]["modified_time"].get< uint64_t >(); shard_info.available_capacity_bytes = shard_json["shard_info"]["available_capacity"].get< uint64_t >(); shard_info.total_capacity_bytes = shard_json["shard_info"]["total_capacity"].get< uint64_t >(); - shard_info.deleted_capacity_bytes = shard_json["shard_info"]["deleted_capacity"].get< uint64_t >(); return shard_info; } @@ -163,8 +161,7 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_ow .created_time = create_time, .last_modified_time = create_time, .available_capacity_bytes = size_bytes, - .total_capacity_bytes = size_bytes, - .deleted_capacity_bytes = 0}; + .total_capacity_bytes = size_bytes}; sb->p_chunk_id = 0; sb->v_chunk_id = v_chunkID.value(); @@ -497,6 +494,13 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); } SLOGD(tid, shard_info.id, "Commit done for sealing shard"); + auto hs_pg = get_hs_pg(shard_info.placement_group); + RELEASE_ASSERT(hs_pg != nullptr, "shardID=0x{:x}, pg={}, shard=0x{:x}, PG not found", shard_info.id, + (shard_info.id >> homeobject::shard_width), (shard_info.id & homeobject::shard_mask)); + const_cast< HS_PG* >(hs_pg)->durable_entities_update( + // shard_footer will also occupy one blk. + [](auto& de) { de.total_occupied_blk_count.fetch_add(1, std::memory_order_relaxed); }); + break; } default: @@ -541,7 +545,8 @@ void HSHomeObject::add_new_shard_to_map(std::unique_ptr< HS_Shard > shard) { (shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask)); const auto [it, h] = chunk_to_shards_map_.try_emplace(p_chunk_id, std::set< shard_id_t >()); - auto per_chunk_shard_list = it->second; + if (h) { LOGDEBUG("chunk_id={} is not in chunk_to_shards_map, add it", p_chunk_id); } + auto& per_chunk_shard_list = it->second; const auto inserted = (per_chunk_shard_list.emplace(shard_id)).second; RELEASE_ASSERT(inserted, "shardID=0x{:x}, pg={}, shard=0x{:x}, duplicated shard info", shard_id, (shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask)); @@ -575,6 +580,54 @@ std::optional< homestore::chunk_num_t > HSHomeObject::get_shard_p_chunk_id(shard return std::make_optional< homestore::chunk_num_t >(hs_shard->sb_->p_chunk_id); } +const std::set< shard_id_t > HSHomeObject::get_shards_in_chunk(homestore::chunk_num_t chunk_id) const { + const auto it = chunk_to_shards_map_.find(chunk_id); + if (it == chunk_to_shards_map_.cend()) { + LOGW("chunk_id={} not found in chunk_to_shards_map", chunk_id); + return {}; + } + return it->second; +} + +void HSHomeObject::update_shard_meta_after_gc(const homestore::chunk_num_t move_from_chunk, + const homestore::chunk_num_t move_to_chunk) { + auto shards = get_shards_in_chunk(move_from_chunk); + + // TODO::optimize this lock + std::scoped_lock lock_guard(_shard_lock); + for (const auto& shard_id : shards) { + auto shard_iter = _shard_map.find(shard_id); + + RELEASE_ASSERT( + shard_iter != _shard_map.end(), + "try to update shard meta blk after gc, but shard {} does not exist!! move_from_chunk={}, move_to_chunk={}", + shard_id, move_from_chunk, move_to_chunk); + + auto hs_shard = d_cast< HS_Shard* >((*shard_iter->second).get()); + if (hs_shard->p_chunk_id() == move_to_chunk) { + // this might happens when crash recovery. the crash happens after shard metablk is updated but before gc + // task metablk is updated. + LOGD("the pchunk_id for shard_id={} for is already {}, skip update shard metablk", shard_id, move_to_chunk); + } + + auto shard_info = hs_shard->info; + shard_info.last_modified_time = get_current_timestamp(); + if (shard_info.state == ShardInfo::State::SEALED) { shard_info.available_capacity_bytes = 0; } + + // total_capacity_bytes and available_capacity_bytes are not used since we never set a limitation for shard + // capacity + + hs_shard->update_info(shard_info, move_to_chunk); + LOGD("update shard={} pchunk from {} to {}", shard_id, move_from_chunk, move_to_chunk); + } + + // switch chunk id in chunk_to_shards_map + auto iter = chunk_to_shards_map_.find(move_from_chunk); + RELEASE_ASSERT(iter != chunk_to_shards_map_.end(), "can not find old chunk_id={}", move_from_chunk); + chunk_to_shards_map_.emplace(move_to_chunk, std::move(iter->second)); + chunk_to_shards_map_.erase(iter); +} + std::optional< homestore::chunk_num_t > HSHomeObject::get_shard_v_chunk_id(shard_id_t id) const { std::scoped_lock lock_guard(_shard_lock); auto shard_iter = _shard_map.find(id); @@ -670,7 +723,9 @@ HSHomeObject::HS_Shard::HS_Shard(ShardInfo shard_info, homestore::chunk_num_t p_ HSHomeObject::HS_Shard::HS_Shard(homestore::superblk< shard_info_superblk >&& sb) : Shard(sb->info), sb_(std::move(sb)) {} -void HSHomeObject::HS_Shard::update_info(const ShardInfo& shard_info) { +void HSHomeObject::HS_Shard::update_info(const ShardInfo& shard_info, + std::optional< homestore::chunk_num_t > p_chunk_id) { + if (p_chunk_id != std::nullopt) { sb_->p_chunk_id = p_chunk_id.value(); } info = shard_info; sb_->info = info; sb_.write(); diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index eba3a01d..5c337d1b 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -43,7 +43,6 @@ void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, c } void ReplicationStateMachine::notify_committed_lsn(int64_t lsn) { - LOGD("got committed lsn notification , lsn={}", lsn); // handle no_space_left error if we have any const auto [target_lsn, chunk_id] = get_no_space_left_error_info(); if (std::numeric_limits< homestore::repl_lsn_t >::max() == target_lsn) { @@ -268,12 +267,13 @@ ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, uint32_t } void ReplicationStateMachine::on_start_replace_member(const homestore::replica_member_info& member_out, - const homestore::replica_member_info& member_in, trace_id_t tid) { + const homestore::replica_member_info& member_in, trace_id_t tid) { home_object_->on_pg_start_replace_member(repl_dev()->group_id(), member_out, member_in, tid); } void ReplicationStateMachine::on_complete_replace_member(const homestore::replica_member_info& member_out, - const homestore::replica_member_info& member_in, trace_id_t tid) { + const homestore::replica_member_info& member_in, + trace_id_t tid) { home_object_->on_pg_complete_replace_member(repl_dev()->group_id(), member_out, member_in, tid); } @@ -372,8 +372,8 @@ int ReplicationStateMachine::read_snapshot_obj(std::shared_ptr< homestore::snaps // enable nuraft bg snapshot) occur at the same time, and free_user_snp_ctx is called first, pg_iter is // released, and then in read_snapshot_obj, pg_iter will be created with cur_obj_id_ = 0|0 while the // next_obj_id will be x|y which may hit into invalid objId condition. - // If inconsistency happens, reset the cursor to the beginning(0|0), and let follower to validate(lsn may change) and reset - // its cursor to the checkpoint to proceed with snapshot resync. + // If inconsistency happens, reset the cursor to the beginning(0|0), and let follower to validate(lsn may + // change) and reset its cursor to the checkpoint to proceed with snapshot resync. LOGW("Invalid objId in snapshot read, {}, current shard_seq_num={}, current batch_num={}, reset cursor to the " "beginning", log_str, pg_iter->cur_obj_id_.shard_seq_num, pg_iter->cur_obj_id_.batch_id); @@ -562,7 +562,7 @@ void ReplicationStateMachine::free_user_snp_ctx(void*& user_snp_ctx) { return; } std::lock_guard lk(m_snp_sync_ctx_lock); - auto pg_iter_ptr = static_cast*>(user_snp_ctx); + auto pg_iter_ptr = static_cast< std::shared_ptr< HSHomeObject::PGBlobIterator >* >(user_snp_ctx); LOGD("Freeing snapshot iterator={}, pg={} group={}", user_snp_ctx, (*pg_iter_ptr)->pg_id_, boost::uuids::to_string((*pg_iter_ptr)->group_id_)); delete pg_iter_ptr; diff --git a/src/lib/homestore_backend/snapshot_receive_handler.cpp b/src/lib/homestore_backend/snapshot_receive_handler.cpp index e2f5ae50..35982314 100644 --- a/src/lib/homestore_backend/snapshot_receive_handler.cpp +++ b/src/lib/homestore_backend/snapshot_receive_handler.cpp @@ -83,7 +83,6 @@ int HSHomeObject::SnapshotReceiveHandler::process_shard_snapshot_data(ResyncShar shard_sb->info.last_modified_time = shard_meta.last_modified_time(); shard_sb->info.available_capacity_bytes = shard_meta.total_capacity_bytes(); shard_sb->info.total_capacity_bytes = shard_meta.total_capacity_bytes(); - shard_sb->info.deleted_capacity_bytes = 0; shard_sb->v_chunk_id = shard_meta.vchunk_id(); homestore::blk_alloc_hints hints; diff --git a/src/lib/homestore_backend/tests/homeobj_fixture.hpp b/src/lib/homestore_backend/tests/homeobj_fixture.hpp index 1320dabd..2e9926cc 100644 --- a/src/lib/homestore_backend/tests/homeobj_fixture.hpp +++ b/src/lib/homestore_backend/tests/homeobj_fixture.hpp @@ -238,24 +238,24 @@ class HomeObjectFixture : public ::testing::Test { } // TODO:make this run in parallel - void put_blobs(std::map< pg_id_t, std::vector< shard_id_t > > const& pg_shard_id_vec, - uint64_t const num_blobs_per_shard, std::map< pg_id_t, blob_id_t >& pg_blob_id, - bool need_sync_before_start = true) { + std::map< shard_id_t, std::map< blob_id_t, uint64_t > > + put_blobs(std::map< pg_id_t, std::vector< shard_id_t > > const& pg_shard_id_vec, uint64_t const num_blobs_per_shard, + std::map< pg_id_t, blob_id_t >& pg_blob_id, bool need_sync_before_start = true) { g_helper->sync(); + std::map< shard_id_t, std::map< blob_id_t, uint64_t > > shard_blob_ids_map; for (const auto& [pg_id, shard_vec] : pg_shard_id_vec) { if (!am_i_in_pg(pg_id)) continue; // the blob_id of a pg is a continuous number starting from 0 and increasing by 1 blob_id_t current_blob_id{pg_blob_id[pg_id]}; - for (const auto& shard_id : shard_vec) { for (uint64_t k = 0; k < num_blobs_per_shard; k++) { run_on_pg_leader(pg_id, [&]() { auto put_blob = build_blob(current_blob_id); auto tid = generateRandomTraceId(); - LOGINFO("Put blob pg={} shard {} blob {} size {} data {} trace_id={}", pg_id, shard_id, - current_blob_id, put_blob.body.size(), - hex_bytes(put_blob.body.cbytes(), std::min(10u, put_blob.body.size())), tid); + LOGDEBUG("Put blob pg={} shard {} blob {} size {} data {} trace_id={}", pg_id, shard_id, + current_blob_id, put_blob.body.size(), + hex_bytes(put_blob.body.cbytes(), std::min(10u, put_blob.body.size())), tid); auto b = _obj_inst->blob_manager()->put(shard_id, std::move(put_blob), tid).get(); @@ -267,7 +267,9 @@ class HomeObjectFixture : public ::testing::Test { auto blob_id = b.value(); ASSERT_EQ(blob_id, current_blob_id) << "the predicted blob id is not correct!"; }); - + auto [it, _] = shard_blob_ids_map.try_emplace(shard_id, std::map< blob_id_t, uint64_t >()); + auto& blob_ids = it->second; + blob_ids[current_blob_id] = actual_written_blk_count_for_blob(current_blob_id); current_blob_id++; } } @@ -284,6 +286,8 @@ class HomeObjectFixture : public ::testing::Test { LOGINFO("shard {} blob {} is created locally, which means all the blob before {} are created", shard_id, last_blob_id, last_blob_id); } + + return shard_blob_ids_map; } void del_blob(pg_id_t pg_id, shard_id_t shard_id, blob_id_t blob_id) { @@ -300,6 +304,29 @@ class HomeObjectFixture : public ::testing::Test { } } + void del_blobs(pg_id_t pg_id, std::map< shard_id_t, std::set< blob_id_t > > const& shard_blob_ids_map) { + g_helper->sync(); + auto tid = generateRandomTraceId(); + for (const auto& [shard_id, blob_ids] : shard_blob_ids_map) { + for (const auto& blob_id : blob_ids) { + run_on_pg_leader(pg_id, [&]() { + auto g = _obj_inst->blob_manager()->del(shard_id, blob_id, tid).get(); + ASSERT_TRUE(g); + LOGDEBUG("delete blob, pg={} shard {} blob {} trace_id={}", pg_id, shard_id, blob_id, tid); + }); + } + } + + auto last_shard_id = shard_blob_ids_map.rbegin()->first; + auto last_blob_id = *(shard_blob_ids_map.rbegin()->second.rbegin()); + // wait for the last blob to be deleted locally + + while (blob_exist(last_shard_id, last_blob_id)) { + LOGINFO("waiting for shard {} blob {} to be deleted locally", last_shard_id, last_blob_id); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + } + // TODO:make this run in parallel void del_all_blobs(std::map< pg_id_t, std::vector< shard_id_t > > const& pg_shard_id_vec, uint64_t const num_blobs_per_shard, std::map< pg_id_t, blob_id_t >& pg_blob_id) { @@ -329,6 +356,34 @@ class HomeObjectFixture : public ::testing::Test { } } + uint32_t get_valid_blob_count_in_pg(const pg_id_t pg_id) { + auto hs_pg = _obj_inst->get_hs_pg(pg_id); + if (!hs_pg) { + LOGERROR("failed to get hs_pg for pg_id={}", pg_id); + return 0; + } + + auto start_key = + BlobRouteKey{BlobRoute{uint64_t(pg_id) << homeobject::shard_width, std::numeric_limits< uint64_t >::min()}}; + auto end_key = BlobRouteKey{ + BlobRoute{uint64_t(pg_id + 1) << homeobject::shard_width, std::numeric_limits< uint64_t >::min()}}; + + homestore::BtreeQueryRequest< BlobRouteKey > query_req{ + homestore::BtreeKeyRange< BlobRouteKey >{std::move(start_key), true /* inclusive */, std::move(end_key), + false /* inclusive */}, + homestore::BtreeQueryType::SWEEP_NON_INTRUSIVE_PAGINATION_QUERY, + std::numeric_limits< uint32_t >::max() /* blob count in a shard will not exceed uint32_t_max*/, + [](homestore::BtreeKey const& key, homestore::BtreeValue const& value) -> bool { + BlobRouteValue existing_value{value}; + if (existing_value.pbas() == HSHomeObject::tombstone_pbas) { return false; } + return true; + }}; + + std::vector< std::pair< BlobRouteKey, BlobRouteValue > > valid_blob_indexes; + hs_pg->index_table_->query(query_req, valid_blob_indexes); + return valid_blob_indexes.size(); + } + // TODO:make this run in parallel void verify_get_blob(std::map< pg_id_t, std::vector< shard_id_t > > const& pg_shard_id_vec, uint64_t const num_blobs_per_shard, bool const use_random_offset = false, @@ -379,6 +434,40 @@ class HomeObjectFixture : public ::testing::Test { } } + void verify_shard_blobs(const std::map< shard_id_t, std::set< blob_id_t > >& shard_blobs, + bool const use_random_offset = false) { + uint32_t off = 0, len = 0; + for (const auto& [shard_id, blob_ids] : shard_blobs) { + for (const auto& blob_id : blob_ids) { + auto tid = generateRandomTraceId(); + LOGDEBUG("going to verify blob shard {} blob {} trace_id={}", shard_id, blob_id, tid); + auto blob = build_blob(blob_id); + len = blob.body.size(); + if (use_random_offset) { + std::uniform_int_distribution< uint32_t > rand_off_gen{0u, len - 1u}; + std::uniform_int_distribution< uint32_t > rand_len_gen{1u, len}; + + off = rand_off_gen(rnd_engine); + len = rand_len_gen(rnd_engine); + if ((off + len) >= blob.body.size()) { len = blob.body.size() - off; } + } + + auto g = _obj_inst->blob_manager()->get(shard_id, blob_id, off, len, tid).get(); + ASSERT_TRUE(!!g) << "get blob fail, shard_id " << shard_id << " blob_id " << blob_id + << " replica number " << g_helper->replica_num(); + auto result = std::move(g.value()); + LOGDEBUG("get shard {} blob {} off {} len {} data {}", shard_id, blob_id, off, len, + hex_bytes(result.body.cbytes(), std::min(len, 10u))); + + EXPECT_EQ(result.body.size(), len); + EXPECT_EQ(std::memcmp(result.body.bytes(), blob.body.cbytes() + off, result.body.size()), 0); + EXPECT_EQ(result.user_key.size(), blob.user_key.size()); + EXPECT_EQ(blob.user_key, result.user_key); + EXPECT_EQ(blob.object_off, result.object_off); + } + } + } + void verify_obj_count(uint32_t num_pgs, uint32_t shards_per_pg, uint32_t blobs_per_shard, bool deleted_all = false) { uint32_t exp_active_blobs = deleted_all ? 0 : shards_per_pg * blobs_per_shard; @@ -467,7 +556,6 @@ class HomeObjectFixture : public ::testing::Test { EXPECT_EQ(lhs.last_modified_time, rhs.last_modified_time); EXPECT_EQ(lhs.available_capacity_bytes, rhs.available_capacity_bytes); EXPECT_EQ(lhs.total_capacity_bytes, rhs.total_capacity_bytes); - EXPECT_EQ(lhs.deleted_capacity_bytes, rhs.deleted_capacity_bytes); EXPECT_EQ(lhs.current_leader, rhs.current_leader); } @@ -611,6 +699,27 @@ class HomeObjectFixture : public ::testing::Test { BitsGenerator::gen_blob_bits(blob.body, blob_id); return blob; } + + uint64_t actual_written_blk_count_for_blob(blob_id_t blob_id) { + using homeobject::io_align; + auto blob = build_blob(blob_id); + auto blob_size = blob.body.size(); + + uint64_t actual_written_size{ + uint32_cast(sisl::round_up(sizeof(HSHomeObject::BlobHeader) + blob.user_key.size(), io_align))}; + + if (((r_cast< uintptr_t >(blob.body.cbytes()) % io_align) != 0) || ((blob_size % io_align) != 0)) { + blob_size = sisl::round_up(blob_size, io_align); + } + + actual_written_size += blob_size; + + auto pad_len = sisl::round_up(actual_written_size, HSHomeObject::_data_block_size) - actual_written_size; + if (pad_len) { actual_written_size += pad_len; } + + return actual_written_size / HSHomeObject::_data_block_size; + } + #ifdef _PRERELEASE void set_basic_flip(const std::string flip_name, int count = 1, uint32_t percent = 100) { flip::FlipCondition null_cond; diff --git a/src/lib/homestore_backend/tests/hs_blob_tests.cpp b/src/lib/homestore_backend/tests/hs_blob_tests.cpp index 1609a667..3d13db6f 100644 --- a/src/lib/homestore_backend/tests/hs_blob_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_blob_tests.cpp @@ -187,42 +187,7 @@ TEST_F(HomeObjectFixture, BasicPutGetBlobWithPushDataDisabled) { remove_flip("disable_leader_push_data"); } -// TEST_F(HomeObjectFixture, BasicPutGetBlobWithNoSpaceLeft) { -// set_basic_flip("simulate_no_space_left", std::numeric_limits< int >::max(), 50); -// -// // test recovery with pristine state firstly -// restart(); -// -// auto num_pgs = SISL_OPTIONS["num_pgs"].as< uint64_t >(); -// auto num_shards_per_pg = SISL_OPTIONS["num_shards"].as< uint64_t >() / num_pgs; -// -// auto num_blobs_per_shard = SISL_OPTIONS["num_blobs"].as< uint64_t >() / num_shards_per_pg; -// std::map< pg_id_t, std::vector< shard_id_t > > pg_shard_id_vec; -// -// // pg -> next blob_id in this pg -// std::map< pg_id_t, blob_id_t > pg_blob_id; -// -// for (uint64_t i = 1; i <= num_pgs; i++) { -// create_pg(i); -// pg_blob_id[i] = 0; -// for (uint64_t j = 0; j < num_shards_per_pg; j++) { -// auto shard = create_shard(i, 64 * Mi); -// pg_shard_id_vec[i].emplace_back(shard.id); -// LOGINFO("pg={} shard {}", i, shard.id); -// } -// } -// -// // Put blob for all shards in all pg's. -// put_blobs(pg_shard_id_vec, num_blobs_per_shard, pg_blob_id); -// -// // Verify all get blobs -// verify_get_blob(pg_shard_id_vec, num_blobs_per_shard); -// -// // Verify the stats -// verify_obj_count(num_pgs, num_blobs_per_shard, num_shards_per_pg, false /* deleted */); -// -// remove_flip("simulate_no_space_left"); -// } +// TODO:: add a test for no_space_left without flip. #endif diff --git a/src/lib/homestore_backend/tests/hs_gc_tests.cpp b/src/lib/homestore_backend/tests/hs_gc_tests.cpp index ddecf076..3274c39a 100644 --- a/src/lib/homestore_backend/tests/hs_gc_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_gc_tests.cpp @@ -1,5 +1,603 @@ #include "homeobj_fixture.hpp" TEST_F(HomeObjectFixture, BasicGC) { - // TODO:: add UT after we have data copy implememtion -} \ No newline at end of file + const auto num_pgs = SISL_OPTIONS["num_pgs"].as< uint64_t >(); + const auto num_shards_per_chunk = SISL_OPTIONS["num_shards"].as< uint64_t >(); + const auto num_blobs_per_shard = 2 * SISL_OPTIONS["num_blobs"].as< uint64_t >(); + + std::map< pg_id_t, std::vector< shard_id_t > > pg_shard_id_vec; + std::map< pg_id_t, blob_id_t > pg_blob_id; + std::map< pg_id_t, HSHomeObject::HS_PG* > HS_PG_map; + std::map< pg_id_t, uint64_t > pg_chunk_nums; + std::map< shard_id_t, std::map< blob_id_t, uint64_t > > shard_blob_ids_map; + auto chunk_selector = _obj_inst->chunk_selector(); + + // create pgs + for (uint64_t i = 1; i <= num_pgs; i++) { + create_pg(i); + auto hs_pg = _obj_inst->get_hs_pg(i); + ASSERT_TRUE(hs_pg != nullptr); + // do not use HS_PG_map[i] to change anything, const cast just for compiling + HS_PG_map[i] = const_cast< HSHomeObject::HS_PG* >(hs_pg); + pg_blob_id[i] = 0; + pg_chunk_nums[i] = chunk_selector->get_pg_chunks(i)->size(); + } + + // create multiple shards for each chunk + for (uint64_t i = 0; i < num_shards_per_chunk; i++) { + std::map< pg_id_t, std::vector< shard_id_t > > pg_open_shard_id_vec; + + // create a shard for each chunk + for (const auto& [pg_id, chunk_num] : pg_chunk_nums) { + for (uint64_t j = 0; j < chunk_num; j++) { + auto shard = create_shard(pg_id, 64 * Mi); + pg_open_shard_id_vec[pg_id].emplace_back(shard.id); + pg_shard_id_vec[pg_id].emplace_back(shard.id); + } + } + + // Put blob for all shards in all pg's. + auto new_shard_blob_ids_map = put_blobs(pg_open_shard_id_vec, num_blobs_per_shard, pg_blob_id); + for (const auto& [shard_id, blob_to_blk_count] : new_shard_blob_ids_map) { + shard_blob_ids_map[shard_id].insert(blob_to_blk_count.begin(), blob_to_blk_count.end()); + } + + // seal all shards and check + for (const auto& [pg_id, shard_vec] : pg_open_shard_id_vec) { + auto pg_chunks = chunk_selector->get_pg_chunks(pg_id); + for (const auto& shard_id : shard_vec) { + // seal the shards so that they can be selected for gc + auto shard_info = seal_shard(shard_id); + EXPECT_EQ(ShardInfo::State::SEALED, shard_info.state); + + auto chunk_opt = _obj_inst->get_shard_p_chunk_id(shard_id); + ASSERT_TRUE(chunk_opt.has_value()); + auto chunk_id = chunk_opt.value(); + + auto EXVchunk = chunk_selector->get_extend_vchunk(chunk_id); + ASSERT_TRUE(EXVchunk != nullptr); + ASSERT_EQ(EXVchunk->m_state, ChunkState::AVAILABLE); + ASSERT_TRUE(EXVchunk->m_v_chunk_id.has_value()); + auto vchunk_id = EXVchunk->m_v_chunk_id.value(); + ASSERT_EQ(pg_chunks->at(vchunk_id), chunk_id); + + ASSERT_TRUE(EXVchunk->m_pg_id.has_value()); + ASSERT_EQ(EXVchunk->m_pg_id.value(), pg_id); + } + } + + for (const auto& [pg_id, hs_pg] : HS_PG_map) { + uint64_t total_blob_occupied_blk_count{0}; + const auto& shard_vec = pg_shard_id_vec[pg_id]; + for (const auto& shard_id : shard_vec) { + total_blob_occupied_blk_count += 2; /*header and footer*/ + for (const auto& [_, blk_count] : shard_blob_ids_map[shard_id]) { + total_blob_occupied_blk_count += blk_count; + } + } + // check pg durable entities + ASSERT_EQ(hs_pg->durable_entities().total_occupied_blk_count, total_blob_occupied_blk_count); + + // check pg index table, the valid blob index count should be equal to the blob count + ASSERT_EQ(get_valid_blob_count_in_pg(pg_id), pg_blob_id[pg_id]); + } + } + + // delete half of the blobs per shard. + for (const auto& [pg_id, shard_vec] : pg_shard_id_vec) { + std::map< shard_id_t, std::set< blob_id_t > > shard_blob_ids_map_for_deletion; + for (const auto& shard_id : shard_vec) { + shard_blob_ids_map_for_deletion[shard_id]; + auto& blob_to_blk_count = shard_blob_ids_map[shard_id]; + for (uint64_t i = 0; i < num_blobs_per_shard / 2; i++) { + ASSERT_FALSE(blob_to_blk_count.empty()); + auto it = blob_to_blk_count.begin(); + auto blob_id = it->first; + shard_blob_ids_map_for_deletion[shard_id].insert(blob_id); + blob_to_blk_count.erase(it); + } + } + del_blobs(pg_id, shard_blob_ids_map_for_deletion); + } + + // wait until all the deleted blobs are reclaimed + bool all_deleted_blobs_have_been_gc{true}; + while (true) { + // we need to recalculate this everytime, since gc might update a pchunk of the vchunk for a pg + std::map< homestore::chunk_num_t, uint64_t > chunk_used_blk_count; + + for (const auto& [shard_id, blob_to_blk_count] : shard_blob_ids_map) { + auto chunk_opt = _obj_inst->get_shard_p_chunk_id(shard_id); + ASSERT_TRUE(chunk_opt.has_value()); + auto chunk_id = chunk_opt.value(); + // now, the chunk state is not determined, maybe GC(being gc) or AVAILABLE(complete gc), skip checking it. + uint32_t used_blks{2}; /* header and footer */ + + for (const auto& [_, blk_count] : blob_to_blk_count) { + used_blks += blk_count; + } + chunk_used_blk_count[chunk_id] += used_blks; + } + + for (const auto& [pg_id, chunk_num] : pg_chunk_nums) { + const auto pg_chunks = chunk_selector->get_pg_chunks(pg_id); + for (uint64_t i{0}; i < chunk_num; i++) { + auto chunk_id = pg_chunks->at(i); + auto EXVchunk = chunk_selector->get_extend_vchunk(chunk_id); + const auto available_blk = EXVchunk->available_blks(); + const auto total_blks = EXVchunk->get_total_blks(); + if (total_blks - available_blk != chunk_used_blk_count[chunk_id]) { + LOGINFO("pg_id={}, chunk_id={}, available_blk={}, total_blk={}, use_blk={}, waiting for gc", pg_id, + chunk_id, available_blk, total_blks, chunk_used_blk_count[chunk_id]); + + if (0 == EXVchunk->get_defrag_nblks()) { + // some unexpect async write or free happens, increase defrag num to trigger gc again. + homestore::data_service().async_free_blk(homestore::MultiBlkId(0, 1, chunk_id)); + } + + all_deleted_blobs_have_been_gc = false; + break; + } + } + if (!all_deleted_blobs_have_been_gc) break; + } + if (all_deleted_blobs_have_been_gc) break; + all_deleted_blobs_have_been_gc = true; + std::this_thread::sleep_for(std::chrono::seconds(5)); + } + + // verify blob data after gc + std::map< shard_id_t, std::set< blob_id_t > > remaining_shard_blobs; + for (const auto& [shard_id, blob_to_blk_count] : shard_blob_ids_map) { + for (const auto& [blob_id, _] : blob_to_blk_count) { + remaining_shard_blobs[shard_id].insert(blob_id); + } + } + verify_shard_blobs(remaining_shard_blobs); + + // check vchunk to pchunk for every pg + for (const auto& [pg_id, shard_vec] : pg_shard_id_vec) { + // after half blobs have been deleted, the tombstone indexes(half of the total blobs) have been removed by gc + ASSERT_EQ(get_valid_blob_count_in_pg(pg_id), pg_blob_id[pg_id] / 2); + auto pg_chunks = chunk_selector->get_pg_chunks(pg_id); + for (const auto& shard_id : shard_vec) { + auto chunk_opt = _obj_inst->get_shard_p_chunk_id(shard_id); + ASSERT_TRUE(chunk_opt.has_value()); + auto chunk_id = chunk_opt.value(); + + auto EXVchunk = chunk_selector->get_extend_vchunk(chunk_id); + ASSERT_TRUE(EXVchunk != nullptr); + ASSERT_TRUE(EXVchunk->m_v_chunk_id.has_value()); + auto vchunk_id = EXVchunk->m_v_chunk_id.value(); + + // after gc , pg_chunks should changes, the vchunk shoud change to a new pchunk. + ASSERT_EQ(pg_chunks->at(vchunk_id), chunk_id); + + ASSERT_TRUE(EXVchunk->m_pg_id.has_value()); + ASSERT_EQ(EXVchunk->m_pg_id.value(), pg_id); + } + } + + // check pg durable entities + for (const auto& [pg_id, hs_pg] : HS_PG_map) { + uint64_t total_blob_occupied_blk_count{0}; + const auto& shard_vec = pg_shard_id_vec[pg_id]; + for (const auto& shard_id : shard_vec) { + total_blob_occupied_blk_count += 2; /*header and footer*/ + for (const auto& [_, blk_count] : shard_blob_ids_map[shard_id]) { + total_blob_occupied_blk_count += blk_count; + } + } + + ASSERT_EQ(hs_pg->pg_sb_->total_occupied_blk_count, total_blob_occupied_blk_count); + ASSERT_EQ(hs_pg->durable_entities().total_occupied_blk_count, total_blob_occupied_blk_count); + } + + restart(); + + HS_PG_map.clear(); + + for (uint64_t i = 1; i <= num_pgs; i++) { + auto hs_pg = _obj_inst->get_hs_pg(i); + ASSERT_TRUE(hs_pg != nullptr); + HS_PG_map[i] = const_cast< HSHomeObject::HS_PG* >(hs_pg); + } + + chunk_selector = _obj_inst->chunk_selector(); + + verify_shard_blobs(remaining_shard_blobs); + + // check vchunk to pchunk for every pg + for (const auto& [pg_id, shard_vec] : pg_shard_id_vec) { + // after half blobs have been deleted, the tombstone indexes(half of the total blobs) have been removed by gc + ASSERT_EQ(get_valid_blob_count_in_pg(pg_id), pg_blob_id[pg_id] / 2); + auto pg_chunks = chunk_selector->get_pg_chunks(pg_id); + for (const auto& shard_id : shard_vec) { + auto chunk_opt = _obj_inst->get_shard_p_chunk_id(shard_id); + ASSERT_TRUE(chunk_opt.has_value()); + auto chunk_id = chunk_opt.value(); + + auto EXVchunk = chunk_selector->get_extend_vchunk(chunk_id); + ASSERT_TRUE(EXVchunk != nullptr); + ASSERT_TRUE(EXVchunk->m_v_chunk_id.has_value()); + auto vchunk_id = EXVchunk->m_v_chunk_id.value(); + + // after restart , the pchunk of a vchunk shoud not change + ASSERT_EQ(pg_chunks->at(vchunk_id), chunk_id); + } + } + + // check pg durable entities + for (const auto& [pg_id, hs_pg] : HS_PG_map) { + uint64_t total_blob_occupied_blk_count{0}; + const auto& shard_vec = pg_shard_id_vec[pg_id]; + for (const auto& shard_id : shard_vec) { + total_blob_occupied_blk_count += 2; /*header and footer*/ + for (const auto& [_, blk_count] : shard_blob_ids_map[shard_id]) { + total_blob_occupied_blk_count += blk_count; + } + } + ASSERT_EQ(hs_pg->pg_sb_->total_occupied_blk_count, total_blob_occupied_blk_count); + ASSERT_EQ(hs_pg->durable_entities().total_occupied_blk_count, total_blob_occupied_blk_count); + } + + // delete remaining blobs + for (const auto& [pg_id, shard_vec] : pg_shard_id_vec) { + std::map< shard_id_t, std::set< blob_id_t > > shard_blob_ids_map_for_deletion; + for (const auto& shard_id : shard_vec) { + shard_blob_ids_map_for_deletion[shard_id] = remaining_shard_blobs[shard_id]; + } + del_blobs(pg_id, shard_blob_ids_map_for_deletion); + } + + // wait until all the deleted blobs are reclaimed + while (true) { + for (const auto& [pg_id, chunk_num] : pg_chunk_nums) { + const auto pg_chunks = chunk_selector->get_pg_chunks(pg_id); + for (uint64_t i{0}; i < chunk_num; i++) { + auto chunk_id = pg_chunks->at(i); + auto EXVchunk = chunk_selector->get_extend_vchunk(chunk_id); + const auto available_blk = EXVchunk->available_blks(); + const auto total_blks = EXVchunk->get_total_blks(); + if (total_blks != available_blk) { + + if (0 == EXVchunk->get_defrag_nblks()) { + // some unexpect async write or free happens, increase defrag num to trigger gc again. + homestore::data_service().async_free_blk(homestore::MultiBlkId(0, 1, chunk_id)); + } + + LOGINFO("pg_id={}, chunk_id={}, available_blk={}, total_blk={}, not empty, waiting for gc", pg_id, + chunk_id, available_blk, total_blks); + all_deleted_blobs_have_been_gc = false; + break; + } + } + if (!all_deleted_blobs_have_been_gc) break; + } + if (all_deleted_blobs_have_been_gc) break; + all_deleted_blobs_have_been_gc = true; + std::this_thread::sleep_for(std::chrono::seconds(5)); + } + + for (const auto& [shard_id, _] : shard_blob_ids_map) { + auto chunk_opt = _obj_inst->get_shard_p_chunk_id(shard_id); + ASSERT_TRUE(chunk_opt.has_value()); + auto chunk_id = chunk_opt.value(); + auto EXVchunk = chunk_selector->get_extend_vchunk(chunk_id); + const auto available_blk = EXVchunk->available_blks(); + const auto total_blks = EXVchunk->get_total_blks(); + + // the shard is empty + ASSERT_EQ(available_blk, total_blks); + } + + // after all blobs have been deleted, + // 1 the pg index table should be empty + // 2 check pg durable entities + for (const auto& [pg_id, hs_pg] : HS_PG_map) { + ASSERT_EQ(get_valid_blob_count_in_pg(pg_id), 0); + ASSERT_EQ(hs_pg->pg_sb_->total_occupied_blk_count, 0); + ASSERT_EQ(hs_pg->durable_entities().total_occupied_blk_count, 0); + } + + // if all blobs of shard are deleted , the shard should be deleted. + // TODO:add more check after we have delete shard implementation +} + +TEST_F(HomeObjectFixture, BasicEGC) { + const auto num_pgs = SISL_OPTIONS["num_pgs"].as< uint64_t >(); + const auto num_shards_per_chunk = SISL_OPTIONS["num_shards"].as< uint64_t >(); + const auto num_blobs_per_shard = 2 * SISL_OPTIONS["num_blobs"].as< uint64_t >(); + std::map< pg_id_t, std::vector< shard_id_t > > pg_shard_id_vec; + std::map< pg_id_t, blob_id_t > pg_blob_id; + std::map< pg_id_t, HSHomeObject::HS_PG* > HS_PG_map; + std::map< pg_id_t, uint64_t > pg_chunk_nums; + std::map< shard_id_t, std::map< blob_id_t, uint64_t > > shard_blob_ids_map; + auto chunk_selector = _obj_inst->chunk_selector(); + + for (uint16_t i = 1; i <= num_pgs; i++) { + create_pg(i); + auto hs_pg = _obj_inst->get_hs_pg(i); + ASSERT_TRUE(hs_pg != nullptr); + // do not use HS_PG_map[i] to change anything, const cast just for compiling + HS_PG_map[i] = const_cast< HSHomeObject::HS_PG* >(hs_pg); + pg_blob_id[i] = 0; + pg_chunk_nums[i] = chunk_selector->get_pg_chunks(i)->size(); + } + + // create multiple shards for each chunk , we seal all shards except the last one + for (uint64_t i = 0; i < num_shards_per_chunk; i++) { + std::map< pg_id_t, std::vector< shard_id_t > > pg_open_shard_id_vec; + + // create a shard for each chunk + for (const auto& [pg_id, chunk_num] : pg_chunk_nums) { + for (uint64_t j = 0; j < chunk_num; j++) { + auto shard = create_shard(pg_id, 64 * Mi); + pg_open_shard_id_vec[pg_id].emplace_back(shard.id); + pg_shard_id_vec[pg_id].emplace_back(shard.id); + } + } + + // Put blob for all shards in all pg's. + auto new_shard_blob_ids_map = put_blobs(pg_open_shard_id_vec, num_blobs_per_shard, pg_blob_id); + + for (const auto& [shard_id, blob_to_blk_count] : new_shard_blob_ids_map) { + shard_blob_ids_map[shard_id].insert(blob_to_blk_count.begin(), blob_to_blk_count.end()); + } + + // seal all shards except the last one and check + for (const auto& [pg_id, shard_vec] : pg_open_shard_id_vec) { + auto pg_chunks = chunk_selector->get_pg_chunks(pg_id); + for (const auto& shard_id : shard_vec) { + auto chunk_opt = _obj_inst->get_shard_p_chunk_id(shard_id); + ASSERT_TRUE(chunk_opt.has_value()); + auto chunk_id = chunk_opt.value(); + + auto EXVchunk = chunk_selector->get_extend_vchunk(chunk_id); + ASSERT_TRUE(EXVchunk != nullptr); + if (i < num_shards_per_chunk - 1) { + // seal the shards so that they can be selected for gc + auto shard_info = seal_shard(shard_id); + EXPECT_EQ(ShardInfo::State::SEALED, shard_info.state); + // if not the last shard, the chunk should be available + ASSERT_EQ(EXVchunk->m_state, ChunkState::AVAILABLE); + } else { + // if the last shard, the chunk should be inuse + ASSERT_EQ(EXVchunk->m_state, ChunkState::INUSE); + } + ASSERT_TRUE(EXVchunk->m_v_chunk_id.has_value()); + auto vchunk_id = EXVchunk->m_v_chunk_id.value(); + ASSERT_EQ(pg_chunks->at(vchunk_id), chunk_id); + + ASSERT_TRUE(EXVchunk->m_pg_id.has_value()); + ASSERT_EQ(EXVchunk->m_pg_id.value(), pg_id); + } + } + } + + // delete half of the blobs per shard. + for (const auto& [pg_id, shard_vec] : pg_shard_id_vec) { + std::map< shard_id_t, std::set< blob_id_t > > shard_blob_ids_map_for_deletion; + for (const auto& shard_id : shard_vec) { + shard_blob_ids_map_for_deletion[shard_id]; + auto& blob_to_blk_count = shard_blob_ids_map[shard_id]; + for (uint64_t i = 0; i < num_blobs_per_shard / 2; i++) { + ASSERT_FALSE(blob_to_blk_count.empty()); + auto it = blob_to_blk_count.begin(); + auto blob_id = it->first; + shard_blob_ids_map_for_deletion[shard_id].insert(blob_id); + blob_to_blk_count.erase(it); + } + } + del_blobs(pg_id, shard_blob_ids_map_for_deletion); + } + + // do not seal the last shard and trigger gc mannually to simulate emergent gc + auto gc_mgr = _obj_inst->gc_manager(); + + // trigger egc for all chunks + std::vector< folly::SemiFuture< bool > > futs; + for (const auto& [pg_id, chunk_num] : pg_chunk_nums) { + const auto pg_chunks = chunk_selector->get_pg_chunks(pg_id); + for (uint64_t i{0}; i < chunk_num; i++) { + auto chunk_id = pg_chunks->at(i); + futs.emplace_back(gc_mgr->submit_gc_task(task_priority::emergent, chunk_id)); + } + } + + gc_mgr.reset(); + + // wait for all egc completed + folly::collectAllUnsafe(futs) + .thenValue([](auto&& results) { + for (auto const& ok : results) { + ASSERT_TRUE(ok.hasValue()); + // all egc task should be completed + ASSERT_TRUE(ok.value()); + } + }) + .get(); + + futs.clear(); + + // verify blob data after gc + std::map< shard_id_t, std::set< blob_id_t > > remaining_shard_blobs; + for (const auto& [shard_id, blob_to_blk_count] : shard_blob_ids_map) { + for (const auto& [blob_id, _] : blob_to_blk_count) { + remaining_shard_blobs[shard_id].insert(blob_id); + } + } + verify_shard_blobs(remaining_shard_blobs); + + // check vchunk to pchunk for every pg + for (const auto& [pg_id, shard_vec] : pg_shard_id_vec) { + // after half blobs have been deleted, the tombstone indexes(half of the total blobs) have been removed by gc + ASSERT_EQ(get_valid_blob_count_in_pg(pg_id), pg_blob_id[pg_id] / 2); + auto pg_chunks = chunk_selector->get_pg_chunks(pg_id); + for (const auto& shard_id : shard_vec) { + auto chunk_opt = _obj_inst->get_shard_p_chunk_id(shard_id); + ASSERT_TRUE(chunk_opt.has_value()); + auto chunk_id = chunk_opt.value(); + + auto EXVchunk = chunk_selector->get_extend_vchunk(chunk_id); + ASSERT_TRUE(EXVchunk != nullptr); + // emergent gc, then chunk is still in use + ASSERT_EQ(EXVchunk->m_state, ChunkState::INUSE); + ASSERT_TRUE(EXVchunk->m_v_chunk_id.has_value()); + auto vchunk_id = EXVchunk->m_v_chunk_id.value(); + + // after gc , pg_chunks should changes, the vchunk shoud change to a new pchunk. + ASSERT_EQ(pg_chunks->at(vchunk_id), chunk_id); + + ASSERT_TRUE(EXVchunk->m_pg_id.has_value()); + ASSERT_EQ(EXVchunk->m_pg_id.value(), pg_id); + } + } + + // check pg durable entities + for (const auto& [pg_id, hs_pg] : HS_PG_map) { + uint64_t total_blob_occupied_blk_count{0}; + const auto& shard_vec = pg_shard_id_vec[pg_id]; + for (const auto& shard_id : shard_vec) { + total_blob_occupied_blk_count += 2; /*header and footer*/ + for (const auto& [_, blk_count] : shard_blob_ids_map[shard_id]) { + total_blob_occupied_blk_count += blk_count; + } + } + // for each chunk, we have an open shard, which has only header. + total_blob_occupied_blk_count -= pg_chunk_nums[pg_id]; + + ASSERT_EQ(hs_pg->pg_sb_->total_occupied_blk_count, total_blob_occupied_blk_count); + ASSERT_EQ(hs_pg->durable_entities().total_occupied_blk_count, total_blob_occupied_blk_count); + } + + restart(); + + HS_PG_map.clear(); + + for (uint64_t i = 1; i <= num_pgs; i++) { + auto hs_pg = _obj_inst->get_hs_pg(i); + ASSERT_TRUE(hs_pg != nullptr); + HS_PG_map[i] = const_cast< HSHomeObject::HS_PG* >(hs_pg); + } + + chunk_selector = _obj_inst->chunk_selector(); + gc_mgr = _obj_inst->gc_manager(); + + verify_shard_blobs(remaining_shard_blobs); + + // check vchunk to pchunk for every pg + for (const auto& [pg_id, shard_vec] : pg_shard_id_vec) { + // after half blobs have been deleted, the tombstone indexes(half of the total blobs) have been removed by gc + ASSERT_EQ(get_valid_blob_count_in_pg(pg_id), pg_blob_id[pg_id] / 2); + auto pg_chunks = chunk_selector->get_pg_chunks(pg_id); + for (const auto& shard_id : shard_vec) { + auto chunk_opt = _obj_inst->get_shard_p_chunk_id(shard_id); + ASSERT_TRUE(chunk_opt.has_value()); + auto chunk_id = chunk_opt.value(); + + auto EXVchunk = chunk_selector->get_extend_vchunk(chunk_id); + ASSERT_TRUE(EXVchunk != nullptr); + // emergent gc, then chunk is still in use + ASSERT_EQ(EXVchunk->m_state, ChunkState::INUSE); + ASSERT_TRUE(EXVchunk->m_v_chunk_id.has_value()); + auto vchunk_id = EXVchunk->m_v_chunk_id.value(); + + // after gc , pg_chunks should changes, the vchunk shoud change to a new pchunk. + ASSERT_EQ(pg_chunks->at(vchunk_id), chunk_id); + + ASSERT_TRUE(EXVchunk->m_pg_id.has_value()); + ASSERT_EQ(EXVchunk->m_pg_id.value(), pg_id); + } + } + + // check pg durable entities + for (const auto& [pg_id, hs_pg] : HS_PG_map) { + uint64_t total_blob_occupied_blk_count{0}; + const auto& shard_vec = pg_shard_id_vec[pg_id]; + for (const auto& shard_id : shard_vec) { + total_blob_occupied_blk_count += 2; /*header and footer*/ + for (const auto& [_, blk_count] : shard_blob_ids_map[shard_id]) { + total_blob_occupied_blk_count += blk_count; + } + } + // for each chunk, we have an open shard, which has only header. + total_blob_occupied_blk_count -= pg_chunk_nums[pg_id]; + + ASSERT_EQ(hs_pg->pg_sb_->total_occupied_blk_count, total_blob_occupied_blk_count); + ASSERT_EQ(hs_pg->durable_entities().total_occupied_blk_count, total_blob_occupied_blk_count); + } + + // delete remaining blks + for (const auto& [pg_id, shard_vec] : pg_shard_id_vec) { + std::map< shard_id_t, std::set< blob_id_t > > shard_blob_ids_map_for_deletion; + for (const auto& shard_id : shard_vec) { + shard_blob_ids_map_for_deletion[shard_id] = remaining_shard_blobs[shard_id]; + } + del_blobs(pg_id, shard_blob_ids_map_for_deletion); + } + + // trigger egc for all chunks + for (const auto& [pg_id, chunk_num] : pg_chunk_nums) { + const auto pg_chunks = chunk_selector->get_pg_chunks(pg_id); + for (uint64_t i{0}; i < chunk_num; i++) { + auto chunk_id = pg_chunks->at(i); + futs.emplace_back(gc_mgr->submit_gc_task(task_priority::emergent, chunk_id)); + } + } + + // wait for all egc completed + folly::collectAllUnsafe(futs) + .thenValue([](auto&& results) { + for (auto const& ok : results) { + ASSERT_TRUE(ok.hasValue()); + // all egc task should be completed + ASSERT_TRUE(ok.value()); + } + }) + .get(); + + // for each chunk in this pg, there is only one shard header + for (const auto& [pg_id, chunk_num] : pg_chunk_nums) { + const auto pg_chunks = chunk_selector->get_pg_chunks(pg_id); + for (uint64_t i{0}; i < chunk_num; i++) { + auto chunk_id = pg_chunks->at(i); + auto EXVchunk = chunk_selector->get_extend_vchunk(chunk_id); + const auto available_blk = EXVchunk->available_blks(); + const auto total_blks = EXVchunk->get_total_blks(); + + // the open shard is not sealed, so there is only the shard header for each shard + ASSERT_EQ(available_blk + 1, total_blks); + } + } + + // check vchunk to pchunk for every pg + for (const auto& [pg_id, shard_vec] : pg_shard_id_vec) { + auto& hs_pg = HS_PG_map[pg_id]; + // check pg durable entities. only shard header left, and every chunk has a open shard, so + // total_occupied_blk_count is equal to the num of chunks in this pg since each chunk has a shard header. + ASSERT_EQ(hs_pg->pg_sb_->total_occupied_blk_count, pg_chunk_nums[pg_id]); + ASSERT_EQ(hs_pg->durable_entities().total_occupied_blk_count, pg_chunk_nums[pg_id]); + + // after all blobs have been deleted, the pg index table should be empty + ASSERT_EQ(get_valid_blob_count_in_pg(pg_id), 0); + + auto pg_chunks = chunk_selector->get_pg_chunks(pg_id); + for (const auto& shard_id : shard_vec) { + auto chunk_opt = _obj_inst->get_shard_p_chunk_id(shard_id); + ASSERT_TRUE(chunk_opt.has_value()); + auto chunk_id = chunk_opt.value(); + + auto EXVchunk = chunk_selector->get_extend_vchunk(chunk_id); + ASSERT_TRUE(EXVchunk != nullptr); + ASSERT_EQ(EXVchunk->m_state, ChunkState::INUSE); + ASSERT_TRUE(EXVchunk->m_v_chunk_id.has_value()); + auto vchunk_id = EXVchunk->m_v_chunk_id.value(); + + // after gc , pg_chunks should changes, the vchunk shoud change to a new pchunk. + ASSERT_EQ(pg_chunks->at(vchunk_id), chunk_id); + } + } + + // TODO:: add more check after we have delete shard implementation +} diff --git a/src/lib/homestore_backend/tests/hs_shard_tests.cpp b/src/lib/homestore_backend/tests/hs_shard_tests.cpp index 6269c117..74b97289 100644 --- a/src/lib/homestore_backend/tests/hs_shard_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_shard_tests.cpp @@ -127,7 +127,6 @@ TEST_F(HomeObjectFixture, ShardManagerRecovery) { EXPECT_EQ(ShardInfo::State::OPEN, shard_info.state); EXPECT_EQ(Mi, shard_info.total_capacity_bytes); EXPECT_EQ(Mi, shard_info.available_capacity_bytes); - EXPECT_EQ(0ul, shard_info.deleted_capacity_bytes); EXPECT_EQ(pg_id, shard_info.placement_group); // restart homeobject and check if pg/shard info will be recovered. diff --git a/src/lib/homestore_backend/tests/test_homestore_backend.cpp b/src/lib/homestore_backend/tests/test_homestore_backend.cpp index 94146515..a6e9fc10 100644 --- a/src/lib/homestore_backend/tests/test_homestore_backend.cpp +++ b/src/lib/homestore_backend/tests/test_homestore_backend.cpp @@ -41,10 +41,10 @@ SISL_OPTION_GROUP( (num_pgs, "", "num_pgs", "number of pgs", ::cxxopts::value< uint64_t >()->default_value("2"), "number"), (num_shards, "", "num_shards", "number of shards", ::cxxopts::value< uint64_t >()->default_value("4"), "number"), (num_blobs, "", "num_blobs", "number of blobs", ::cxxopts::value< uint64_t >()->default_value("20"), "number"), - (is_restart, "", "is_restart", "the process is restart or the first start", ::cxxopts::value< bool >()-> - default_value("false"), "true or false"), - (enable_http, "", "enable_http", "enable http server or not", - ::cxxopts::value< bool >()->default_value("false"), "true or false")); + (is_restart, "", "is_restart", "the process is restart or the first start", + ::cxxopts::value< bool >()->default_value("false"), "true or false"), + (enable_http, "", "enable_http", "enable http server or not", ::cxxopts::value< bool >()->default_value("false"), + "true or false")); SISL_LOGGING_INIT(homeobject) #define test_options logging, config, homeobject, test_homeobject_repl_common diff --git a/src/lib/memory_backend/mem_shard_manager.cpp b/src/lib/memory_backend/mem_shard_manager.cpp index 429c1be6..396be118 100644 --- a/src/lib/memory_backend/mem_shard_manager.cpp +++ b/src/lib/memory_backend/mem_shard_manager.cpp @@ -10,7 +10,7 @@ ShardManager::AsyncResult< ShardInfo > MemoryHomeObject::_create_shard(pg_id_t p trace_id_t tid) { (void)tid; auto const now = get_current_timestamp(); - auto info = ShardInfo(0ull, pg_owner, ShardInfo::State::OPEN, 0, now, now, size_bytes, size_bytes, 0); + auto info = ShardInfo(0ull, pg_owner, ShardInfo::State::OPEN, 0, now, now, size_bytes, size_bytes); { auto lg = std::scoped_lock(_pg_lock, _shard_lock); auto pg_it = _pg_map.find(pg_owner);