From a2795161a31573dc00d8f7e43dc3722971836734 Mon Sep 17 00:00:00 2001 From: laihui Date: Fri, 30 May 2025 16:33:51 +0800 Subject: [PATCH 1/5] introduce dummy tablet in each be to simplify two-step close to one-step Co-authored-by: zclllyybb --- be/src/exec/tablet_info.cpp | 3 +- be/src/runtime/load_stream.cpp | 7 +- be/src/runtime/tablets_channel.cpp | 5 +- be/src/vec/sink/load_stream_map_pool.cpp | 16 ++++ be/src/vec/sink/load_stream_map_pool.h | 4 + be/src/vec/sink/load_stream_stub.cpp | 16 +++- be/src/vec/sink/load_stream_stub.h | 5 +- be/src/vec/sink/writer/vtablet_writer.cpp | 21 ++++- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 79 ++++++++++++---- be/src/vec/sink/writer/vtablet_writer_v2.h | 4 + .../apache/doris/planner/OlapTableSink.java | 93 +++++++++---------- gensrc/proto/internal_service.proto | 4 + gensrc/thrift/Descriptors.thrift | 4 + .../sql/two_instance_correctness.out | 3 + .../sql/two_instance_correctness.groovy | 32 +++++++ 15 files changed, 216 insertions(+), 80 deletions(-) diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index 38fa9242f8bbc7..87f8aa64a80b4d 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -498,7 +498,8 @@ Status VOlapTablePartitionParam::init() { RETURN_IF_ERROR(generate_partition_from(t_part, part)); _partitions.emplace_back(part); - if (!_t_param.partitions_is_fake) { + bool partition_is_fake = t_part.__isset.partition_is_fake && t_part.partition_is_fake; + if (!_t_param.partitions_is_fake && !partition_is_fake) { if (_is_in_partition) { for (auto& in_key : part->in_keys) { _partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part); diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index bb79504e3555ff..91076ee9009b4d 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -453,7 +453,12 @@ LoadStream::~LoadStream() { Status LoadStream::init(const POpenLoadStreamRequest* request) { _txn_id = request->txn_id(); _total_streams = static_cast(request->total_streams()); - _is_incremental = (_total_streams == 0); + bool auto_partition_one_step_close = request->has_auto_partition_one_step_close() + ? request->auto_partition_one_step_close() + : false; + if (!auto_partition_one_step_close) { + _is_incremental = (_total_streams == 0); + } _schema = std::make_shared(); RETURN_IF_ERROR(_schema->init(request->schema())); diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index acab5e12935ae3..44d4717a87b29c 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -154,7 +154,10 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) { * then _num_remaining_senders will not be set here. but inc every time when incremental_open() called. so it's dynamic * and also need same number of senders' close to close. but will not hang. */ - if (_open_by_incremental) { + bool auto_partition_one_step_close = request.has_auto_partition_one_step_close() + ? request.auto_partition_one_step_close() + : false; + if (!auto_partition_one_step_close && _open_by_incremental) { DCHECK(_num_remaining_senders == 0) << _num_remaining_senders; } else { _num_remaining_senders = max_sender; diff --git a/be/src/vec/sink/load_stream_map_pool.cpp b/be/src/vec/sink/load_stream_map_pool.cpp index 24a1cb77a489cc..87ec0f5ac2bb95 100644 --- a/be/src/vec/sink/load_stream_map_pool.cpp +++ b/be/src/vec/sink/load_stream_map_pool.cpp @@ -126,6 +126,22 @@ void LoadStreamMap::close_load(bool incremental) { } } +void LoadStreamMap::close_load_all_streams() { + for (auto& [dst_id, streams] : _streams_for_node) { + std::vector tablets_to_commit; + const auto& tablets = _tablets_to_commit[dst_id]; + tablets_to_commit.reserve(tablets.size()); + for (const auto& [tablet_id, tablet] : tablets) { + tablets_to_commit.push_back(tablet); + tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]); + } + auto st = streams->close_load(tablets_to_commit); + if (!st.ok()) { + LOG(WARNING) << "close_load for all streams failed: " << st << ", load_id=" << _load_id; + } + } +} + LoadStreamMapPool::LoadStreamMapPool() = default; LoadStreamMapPool::~LoadStreamMapPool() = default; diff --git a/be/src/vec/sink/load_stream_map_pool.h b/be/src/vec/sink/load_stream_map_pool.h index bdf98ca8f61df3..2900d4c73e0b4f 100644 --- a/be/src/vec/sink/load_stream_map_pool.h +++ b/be/src/vec/sink/load_stream_map_pool.h @@ -98,6 +98,10 @@ class LoadStreamMap { // only call this method after release() returns true. void close_load(bool incremental); + // send CLOSE_LOAD to all streams, return ERROR if any. + // only call this method after release() returns true. + void close_load_all_streams(); + private: const UniqueId _load_id; const int64_t _src_id; diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 0126f84f6c3ccf..342c33a2090200 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -146,7 +146,8 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, const NodeInfo& node_info, int64_t txn_id, const OlapTableSchemaParam& schema, const std::vector& tablets_for_schema, int total_streams, - int64_t idle_timeout_ms, bool enable_profile) { + int64_t idle_timeout_ms, bool enable_profile, + bool auto_partition_one_step_close) { std::unique_lock lock(_open_mutex); if (_is_init.load()) { return _status; @@ -170,7 +171,7 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, request.set_src_id(_src_id); request.set_txn_id(txn_id); request.set_enable_profile(enable_profile); - if (_is_incremental) { + if (_is_incremental && !auto_partition_one_step_close) { request.set_total_streams(0); } else if (total_streams > 0) { request.set_total_streams(total_streams); @@ -178,6 +179,9 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, _status = Status::InternalError("total_streams should be greator than 0"); return _status; } + if (auto_partition_one_step_close) { + request.set_auto_partition_one_step_close(auto_partition_one_step_close); + } request.set_idle_timeout_ms(idle_timeout_ms); schema.to_protobuf(request.mutable_schema()); for (auto& tablet : tablets_for_schema) { @@ -506,17 +510,19 @@ Status LoadStreamStubs::open(BrpcClientCache* client_cache const NodeInfo& node_info, int64_t txn_id, const OlapTableSchemaParam& schema, const std::vector& tablets_for_schema, int total_streams, - int64_t idle_timeout_ms, bool enable_profile) { + int64_t idle_timeout_ms, bool enable_profile, + bool auto_partition_one_step_close) { bool get_schema = true; auto status = Status::OK(); for (auto& stream : _streams) { Status st; if (get_schema) { st = stream->open(client_cache, node_info, txn_id, schema, tablets_for_schema, - total_streams, idle_timeout_ms, enable_profile); + total_streams, idle_timeout_ms, enable_profile, + auto_partition_one_step_close); } else { st = stream->open(client_cache, node_info, txn_id, schema, {}, total_streams, - idle_timeout_ms, enable_profile); + idle_timeout_ms, enable_profile, auto_partition_one_step_close); } if (st.ok()) { get_schema = false; diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 5c3ca02272d8e1..42b89dd6c19fe8 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -127,7 +127,7 @@ class LoadStreamStub : public std::enable_shared_from_this { Status open(BrpcClientCache* client_cache, const NodeInfo& node_info, int64_t txn_id, const OlapTableSchemaParam& schema, const std::vector& tablets_for_schema, int total_streams, - int64_t idle_timeout_ms, bool enable_profile); + int64_t idle_timeout_ms, bool enable_profile, bool auto_partition_one_step_close); // for mock this class in UT #ifdef BE_TEST @@ -266,6 +266,7 @@ class LoadStreamStub : public std::enable_shared_from_this { std::unordered_map _failed_tablets; bool _is_incremental = false; + bool _auto_partition_one_step_close = false; }; // a collection of LoadStreams connect to the same node @@ -285,7 +286,7 @@ class LoadStreamStubs { Status open(BrpcClientCache* client_cache, const NodeInfo& node_info, int64_t txn_id, const OlapTableSchemaParam& schema, const std::vector& tablets_for_schema, int total_streams, - int64_t idle_timeout_ms, bool enable_profile); + int64_t idle_timeout_ms, bool enable_profile, bool auto_partition_one_step_close); bool is_incremental() const { return _is_incremental; } diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 771cbb03b42931..0a2a30bcb378dd 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -125,7 +125,14 @@ Status IndexChannel::init(RuntimeState* state, const std::vector_t_sink.olap_table_sink.partition.__isset + .auto_partition_one_step_close && + _parent->_t_sink.olap_table_sink.partition.auto_partition_one_step_close) { + return Status::InternalError( + "incremental channel should already initialized"); + } else { + _has_inc_node = true; + } } VLOG_CRITICAL << "init new node for instance " << _parent->_sender_id << ", node id:" << replica_node_id << ", incremantal:" << incremental; @@ -445,6 +452,11 @@ void VNodeChannel::_open_internal(bool is_incremental) { request->set_workload_group_id(_wg_id); } + if (_parent->_t_sink.olap_table_sink.partition.__isset.auto_partition_one_step_close) { + request->set_auto_partition_one_step_close( + _parent->_t_sink.olap_table_sink.partition.auto_partition_one_step_close); + } + auto open_callback = DummyBrpcCallback::create_shared(); auto open_closure = AutoReleaseClosure< PTabletWriterOpenRequest, @@ -1465,7 +1477,10 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status // when they all closed, we are sure all Writer of instances called _do_try_close. that means no new channel // will be opened. the refcount of recievers will be monotonically decreasing. then we are safe to close all // our channels. - if (index_channel->has_incremental_node_channel()) { + bool auto_partition_one_step_close = + _t_sink.olap_table_sink.partition.__isset.auto_partition_one_step_close && + _t_sink.olap_table_sink.partition.auto_partition_one_step_close; + if (index_channel->has_incremental_node_channel() && !auto_partition_one_step_close) { if (!status.ok()) { break; } @@ -1519,7 +1534,7 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status *ch); } }); - } else { // not has_incremental_node_channel + } else { VLOG_TRACE << _sender_id << " has no incremental channels " << _txn_id; index_channel->for_each_node_channel( [&index_channel, &status](const std::shared_ptr& ch) { diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 8652127d88f768..1e19ca26b42459 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -101,7 +101,14 @@ Status VTabletWriterV2::_incremental_open_streams( tablet.set_index_id(index.index_id); tablet.set_tablet_id(tablet_id); if (!_load_stream_map->contains(node)) { - new_backends.insert(node); + if (_auto_partition_one_step_close) { + return Status::InternalError( + "should not happen, node not in load_stream_map, node={}, " + "tablet_id={}, partition_id={}", + node, tablet_id, partition->id); + } else { + new_backends.insert(node); + } } _tablets_for_node[node].emplace(tablet_id, tablet); if (known_indexes.contains(index.index_id)) [[likely]] { @@ -250,6 +257,9 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) { } _load_stream_map = ExecEnv::GetInstance()->load_stream_map_pool()->get_or_create( _load_id, _backend_id, _stream_per_node, _num_local_sink); + if (table_sink.partition.__isset.auto_partition_one_step_close) { + _auto_partition_one_step_close = table_sink.partition.auto_partition_one_step_close; + } return Status::OK(); } @@ -310,7 +320,7 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, LoadStreamStubs { tablets_for_schema.clear(); }); auto st = streams.open(_state->exec_env()->brpc_streaming_client_cache(), *node_info, _txn_id, *_schema, tablets_for_schema, _total_streams, idle_timeout_ms, - _state->enable_profile()); + _state->enable_profile(), _auto_partition_one_step_close); if (!st.ok()) { LOG(WARNING) << "failed to open stream to backend " << dst_id << ", load_id=" << print_id(_load_id) << ", err=" << st; @@ -641,25 +651,34 @@ Status VTabletWriterV2::close(Status exec_status) { LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << is_last_sink << ", load_id=" << print_id(_load_id); - // send CLOSE_LOAD on all non-incremental streams if this is the last sink - if (is_last_sink) { - _load_stream_map->close_load(false); - } + if (!_auto_partition_one_step_close) { + // send CLOSE_LOAD on all non-incremental streams if this is the last sink + if (is_last_sink) { + _load_stream_map->close_load(false); + } - // close_wait on all non-incremental streams, even if this is not the last sink. - // because some per-instance data structures are now shared among all sinks - // due to sharing delta writers and load stream stubs. - RETURN_IF_ERROR(_close_wait(false)); + // close_wait on all non-incremental streams, even if this is not the last sink. + // because some per-instance data structures are now shared among all sinks + // due to sharing delta writers and load stream stubs. + RETURN_IF_ERROR(_close_wait(false)); - // send CLOSE_LOAD on all incremental streams if this is the last sink. - // this must happen after all non-incremental streams are closed, - // so we can ensure all sinks are in close phase before closing incremental streams. - if (is_last_sink) { - _load_stream_map->close_load(true); - } + // send CLOSE_LOAD on all incremental streams if this is the last sink. + // this must happen after all non-incremental streams are closed, + // so we can ensure all sinks are in close phase before closing incremental streams. + if (is_last_sink) { + _load_stream_map->close_load(true); + } - // close_wait on all incremental streams, even if this is not the last sink. - RETURN_IF_ERROR(_close_wait(true)); + // close_wait on all incremental streams, even if this is not the last sink. + RETURN_IF_ERROR(_close_wait(true)); + } else { + // send CLOSE_LOAD on all streams if this is the last sink. + if (is_last_sink) { + _load_stream_map->close_load_all_streams(); + } + // close_wait on all streams, even if this is not the last sink. + RETURN_IF_ERROR(_close_wait_all_streams()); + } // calculate and submit commit info if (is_last_sink) { @@ -733,6 +752,30 @@ Status VTabletWriterV2::_close_wait(bool incremental) { return st; } +Status VTabletWriterV2::_close_wait_all_streams() { + SCOPED_TIMER(_close_load_timer); + auto st = _load_stream_map->for_each_st([this](int64_t dst_id, + LoadStreamStubs& streams) -> Status { + int64_t remain_ms = static_cast(_state->execution_timeout()) * 1000 - + _timeout_watch.elapsed_time() / 1000 / 1000; + DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 0; }); + if (remain_ms <= 0) { + LOG(WARNING) << "load timed out before close waiting, load_id=" << print_id(_load_id); + return Status::TimedOut("load timed out before close waiting"); + } + auto st = streams.close_wait(_state, remain_ms); + if (!st.ok()) { + LOG(WARNING) << "close_wait timeout on streams to dst_id=" << dst_id + << ", load_id=" << print_id(_load_id) << ": " << st; + } + return st; + }); + if (!st.ok()) { + LOG(WARNING) << "close_wait failed: " << st << ", load_id=" << print_id(_load_id); + } + return st; +} + void VTabletWriterV2::_calc_tablets_to_commit() { LOG(INFO) << "saving close load info, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id << ", sink_id=" << _sender_id; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index e328513774881c..1ea173e7f20ba9 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -149,6 +149,8 @@ class VTabletWriterV2 final : public AsyncResultWriter { Status _close_wait(bool incremental); + Status _close_wait_all_streams(); + void _cancel(Status status); std::shared_ptr _mem_tracker; @@ -230,6 +232,8 @@ class VTabletWriterV2 final : public AsyncResultWriter { VRowDistribution _row_distribution; // reuse to avoid frequent memory allocation and release. std::vector _row_part_tablet_ids; + + bool _auto_partition_one_step_close = false; }; } // namespace vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 07178c7a5b8371..b60d325f73337f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -587,16 +587,16 @@ private PartitionItem createDummyPartitionItem(PartitionType partType) throws Us } } - private TOlapTablePartitionParam createDummyPartition(long dbId, OlapTable table, Analyzer analyzer, + private void createDummyPartition(long dbId, OlapTable table, Analyzer analyzer, TOlapTablePartitionParam partitionParam, PartitionInfo partitionInfo, PartitionType partType) throws UserException { partitionParam.setEnableAutomaticPartition(true); - // these partitions only use in locations. not find partition. - partitionParam.setPartitionsIsFake(true); // set columns - for (Column partCol : partitionInfo.getPartitionColumns()) { - partitionParam.addToPartitionColumns(partCol.getName()); + if (partitionIds.isEmpty()) { + for (Column partCol : partitionInfo.getPartitionColumns()) { + partitionParam.addToPartitionColumns(partCol.getName()); + } } int partColNum = partitionInfo.getPartitionColumns().size(); @@ -611,6 +611,7 @@ private TOlapTablePartitionParam createDummyPartition(long dbId, OlapTable table fakePartition.setNumBuckets(1); } fakePartition.setIsMutable(true); + fakePartition.setPartitionIsFake(true); DistributionInfo distInfo = table.getDefaultDistributionInfo(); partitionParam.setDistributedColumns(getDistColumns(distInfo)); @@ -632,20 +633,18 @@ private TOlapTablePartitionParam createDummyPartition(long dbId, OlapTable table } partitionParam.setPartitionFunctionExprs(Expr.treesToThrift(exprs)); } - - return partitionParam; } - private TOlapTablePartitionParam createDummyPartition(long dbId, OlapTable table, + private void createDummyPartition(long dbId, OlapTable table, TOlapTablePartitionParam partitionParam, PartitionInfo partitionInfo, PartitionType partType) throws UserException { partitionParam.setEnableAutomaticPartition(true); - // these partitions only use in locations. not find partition. - partitionParam.setPartitionsIsFake(true); // set columns - for (Column partCol : partitionInfo.getPartitionColumns()) { - partitionParam.addToPartitionColumns(partCol.getName()); + if (partitionIds.isEmpty()) { + for (Column partCol : partitionInfo.getPartitionColumns()) { + partitionParam.addToPartitionColumns(partCol.getName()); + } } int partColNum = partitionInfo.getPartitionColumns().size(); @@ -660,6 +659,7 @@ private TOlapTablePartitionParam createDummyPartition(long dbId, OlapTable table fakePartition.setNumBuckets(1); } fakePartition.setIsMutable(true); + fakePartition.setPartitionIsFake(true); DistributionInfo distInfo = table.getDefaultDistributionInfo(); partitionParam.setDistributedColumns(getDistColumns(distInfo)); @@ -672,8 +672,6 @@ private TOlapTablePartitionParam createDummyPartition(long dbId, OlapTable table } partitionParam.setPartitionFunctionExprs(Expr.treesToThrift(partitionExprs)); } - - return partitionParam; } public TOlapTablePartitionParam createPartition(long dbId, OlapTable table, Analyzer analyzer) @@ -681,15 +679,21 @@ public TOlapTablePartitionParam createPartition(long dbId, OlapTable table, Anal TOlapTablePartitionParam partitionParam = new TOlapTablePartitionParam(); PartitionInfo partitionInfo = table.getPartitionInfo(); boolean enableAutomaticPartition = partitionInfo.enableAutomaticPartition(); + if (enableAutomaticPartition) { + partitionParam.setAutoPartitionOneStepClose(true); + } PartitionType partType = table.getPartitionInfo().getType(); partitionParam.setDbId(dbId); partitionParam.setTableId(table.getId()); partitionParam.setVersion(0); partitionParam.setPartitionType(partType.toThrift()); - // create shadow partition for empty auto partition table. only use in this load. - if (enableAutomaticPartition && partitionIds.isEmpty()) { - return createDummyPartition(dbId, table, analyzer, partitionParam, partitionInfo, partType); + // create shadow partition for auto partition table. only use in this load. + if (enableAutomaticPartition) { + createDummyPartition(dbId, table, analyzer, partitionParam, partitionInfo, partType); + if (partitionIds.isEmpty()) { + return partitionParam; + } } switch (partType) { @@ -815,15 +819,21 @@ private TOlapTablePartitionParam createPartition(long dbId, OlapTable table) TOlapTablePartitionParam partitionParam = new TOlapTablePartitionParam(); PartitionInfo partitionInfo = table.getPartitionInfo(); boolean enableAutomaticPartition = partitionInfo.enableAutomaticPartition(); + if (enableAutomaticPartition) { + partitionParam.setAutoPartitionOneStepClose(true); + } PartitionType partType = table.getPartitionInfo().getType(); partitionParam.setDbId(dbId); partitionParam.setTableId(table.getId()); partitionParam.setVersion(0); partitionParam.setPartitionType(partType.toThrift()); - // create shadow partition for empty auto partition table. only use in this load. - if (enableAutomaticPartition && partitionIds.isEmpty()) { - return createDummyPartition(dbId, table, partitionParam, partitionInfo, partType); + // create shadow partition for auto partition table. only use in this load. + if (enableAutomaticPartition) { + createDummyPartition(dbId, table, partitionParam, partitionInfo, partType); + if (partitionIds.isEmpty()) { + return partitionParam; + } } switch (partType) { @@ -972,48 +982,33 @@ public static void setPartitionKeys(TOlapTablePartition tPartition, PartitionIte } } - public List createDummyLocation(OlapTable table) throws UserException { - TOlapTableLocationParam locationParam = new TOlapTableLocationParam(); - TOlapTableLocationParam slaveLocationParam = new TOlapTableLocationParam(); - + public void createDummyLocation(OlapTable table, TOlapTableLocationParam locationParam) throws UserException { final long fakeTabletId = 0; SystemInfoService clusterInfo = Env.getCurrentSystemInfo(); List aliveBe = clusterInfo.getAllBackendIds(true); if (aliveBe.isEmpty()) { throw new UserException(InternalErrorCode.REPLICA_FEW_ERR, "no available BE in cluster"); } + // By planning a dummy tablet in each BE, + // one-step close wait is ensured that multiple senders have sent all the data. for (int i = 0; i < table.getIndexNumber(); i++) { - // only one fake tablet here - if (singleReplicaLoad) { - Long[] nodes = aliveBe.toArray(new Long[0]); - List slaveBe = aliveBe; - - Random random = new SecureRandom(); - int masterNode = random.nextInt(nodes.length); - locationParam.addToTablets(new TTabletLocation(fakeTabletId, - Arrays.asList(nodes[masterNode]))); - - slaveBe.remove(masterNode); - slaveLocationParam.addToTablets(new TTabletLocation(fakeTabletId, - slaveBe)); - } else { - locationParam.addToTablets(new TTabletLocation(fakeTabletId, - Arrays.asList(aliveBe.get(0)))); // just one fake location is enough - - LOG.info("created dummy location tablet_id={}, be_id={}", fakeTabletId, aliveBe.get(0)); - } + locationParam.addToTablets(new TTabletLocation(fakeTabletId, aliveBe)); + LOG.info("created dummy location tablet_id={}, be_ids={}", fakeTabletId, aliveBe); } - - return Arrays.asList(locationParam, slaveLocationParam); } private List createLocation(long dbId, OlapTable table) throws UserException { - if (table.getPartitionInfo().enableAutomaticPartition() && partitionIds.isEmpty()) { - return createDummyLocation(table); - } - TOlapTableLocationParam locationParam = new TOlapTableLocationParam(); TOlapTableLocationParam slaveLocationParam = new TOlapTableLocationParam(); + + // create dummy location for auto partition table + if (table.getPartitionInfo().enableAutomaticPartition()) { + createDummyLocation(table, locationParam); + if (partitionIds.isEmpty()) { + return Arrays.asList(locationParam, slaveLocationParam); + } + } + // BE id -> path hash Multimap allBePathsMap = HashMultimap.create(); for (long partitionId : partitionIds) { diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 2b5d935d7f3272..13602e211cc50f 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -109,6 +109,8 @@ message PTabletWriterOpenRequest { optional string storage_vault_id = 18; optional int32 sender_id = 19; optional int64 workload_group_id = 20; + // upgrade compatible + optional bool auto_partition_one_step_close = 21; }; message PTabletWriterOpenResult { @@ -916,6 +918,8 @@ message POpenLoadStreamRequest { optional bool enable_profile = 6 [default = false]; optional int64 total_streams = 7; optional int64 idle_timeout_ms = 8; + // update compability + optional bool auto_partition_one_step_close = 9; } message PTabletSchemaWithIndex { diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index d82750c3985ccc..5013e9b2710f92 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -197,6 +197,7 @@ struct TOlapTablePartition { 10: optional bool is_default_partition; // only used in random distribution scenario to make data distributed even 11: optional i64 load_tablet_idx + 12: optional bool partition_is_fake = false } struct TOlapTablePartitionParam { @@ -221,7 +222,10 @@ struct TOlapTablePartitionParam { // insert overwrite partition(*) 11: optional bool enable_auto_detect_overwrite 12: optional i64 overwrite_group_id + // deprecated, use partitions.partition_is_fake instead 13: optional bool partitions_is_fake = false + // upgrade compability + 14: optional bool auto_partition_one_step_close } struct TOlapTableIndex { diff --git a/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out b/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out index 4ee136aef2b9c5..2f1e551f7f48b8 100644 --- a/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out +++ b/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out @@ -2,3 +2,6 @@ -- !sql -- 2 +-- !sql -- +2 + diff --git a/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy b/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy index c9f2f04aab31c5..e5ec6b116c5e53 100644 --- a/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy +++ b/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy @@ -18,6 +18,7 @@ suite("two_instance_correctness") { // finish time of instances have diff + // case1: test enable_memtable_on_sink_node sql "DROP TABLE IF EXISTS two_bkt;" sql """ create table two_bkt( @@ -42,4 +43,35 @@ suite("two_instance_correctness") { sql " insert into two_bkt_dest select * from two_bkt; " qt_sql " select count(distinct k0) from two_bkt_dest; " + + // case2: test enable_memtable_on_sink_node is false + sql """ set enable_memtable_on_sink_node=false """ + try { + sql "DROP TABLE IF EXISTS two_bkt;" + sql """ + create table two_bkt( + k0 date not null + ) + DISTRIBUTED BY HASH(`k0`) BUCKETS 2 + properties("replication_num" = "1"); + """ + + sql """ insert into two_bkt values ("2012-12-11"); """ + sql """ insert into two_bkt select "2020-12-12" from numbers("number" = "20000"); """ + + sql " DROP TABLE IF EXISTS two_bkt_dest; " + sql """ + create table two_bkt_dest( + k0 date not null + ) + AUTO PARTITION BY RANGE (date_trunc(k0, 'day')) () + DISTRIBUTED BY HASH(`k0`) BUCKETS 10 + properties("replication_num" = "1"); + """ + sql " insert into two_bkt_dest select * from two_bkt; " + + qt_sql " select count(distinct k0) from two_bkt_dest; " + } finally { + sql """ set enable_memtable_on_sink_node=true """ + } } From e6946533927e811038373c872ffa38599c166139 Mon Sep 17 00:00:00 2001 From: laihui Date: Wed, 28 May 2025 17:28:22 +0800 Subject: [PATCH 2/5] introduce quorum success write to tolerate slow node(part I) --- be/src/common/config.cpp | 3 + be/src/common/config.h | 3 + be/src/vec/sink/writer/vtablet_writer.cpp | 205 ++++++++++++++++------ be/src/vec/sink/writer/vtablet_writer.h | 87 +++++++-- 4 files changed, 234 insertions(+), 64 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index af4811e00a3bb3..c394d4741682ed 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1496,6 +1496,9 @@ DEFINE_mInt32(load_trigger_compaction_version_percent, "66"); DEFINE_mInt64(base_compaction_interval_seconds_since_last_operation, "86400"); DEFINE_mBool(enable_compaction_pause_on_high_memory, "true"); +DEFINE_mDouble(max_wait_time_multiplier, "0.5"); +DEFINE_mInt32(load_timeout_remaining_seconds, "30"); + DEFINE_mBool(enable_calc_delete_bitmap_between_segments_concurrently, "false"); DEFINE_mBool(enable_update_delete_bitmap_kv_check_core, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 4505d4fe426a3d..9a9effb7b4bde1 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1570,6 +1570,9 @@ DECLARE_mInt32(load_trigger_compaction_version_percent); DECLARE_mInt64(base_compaction_interval_seconds_since_last_operation); DECLARE_mBool(enable_compaction_pause_on_high_memory); +DECLARE_mDouble(max_wait_time_multiplier); +DECLARE_mInt32(load_timeout_remaining_seconds); + DECLARE_mBool(enable_calc_delete_bitmap_between_segments_concurrently); DECLARE_mBool(enable_update_delete_bitmap_kv_check_core); diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 0a2a30bcb378dd..0d22c1b1b16f37 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -278,6 +279,25 @@ Status IndexChannel::check_tablet_filtered_rows_consistency() { return Status::OK(); } +bool IndexChannel::quorum_success() { + if (_write_tablets.empty()) { + return false; + } + const int num_replicas = _parent->_num_replicas; + const int quorum = num_replicas / 2 + 1; + for (const auto& tablet : _write_tablets) { + // check if all write tablets has received rows from quorum number of nodes + auto it = _tablets_received_rows.find(tablet); + if (it == _tablets_received_rows.end()) { + return false; + } + if (it->second.size() < quorum) { + return false; + } + } + return true; +} + static Status none_of(std::initializer_list vars) { bool none = std::none_of(vars.begin(), vars.end(), [](bool var) { return var; }); Status st = Status::OK(); @@ -566,6 +586,7 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload) for (auto tablet_id : payload->second) { _cur_add_block_request->add_tablet_ids(tablet_id); } + _index_channel->update_node_channel_write_bytes(_node_id, _cur_mutable_block->bytes()); if (_cur_mutable_block->rows() >= _batch_size || _cur_mutable_block->bytes() > config::doris_scanner_row_bytes) { @@ -967,20 +988,28 @@ Status VNodeChannel::close_wait(RuntimeState* state) { } } - // Waiting for finished until _add_batches_finished changed by rpc's finished callback. - // it may take a long time, so we couldn't set a timeout - // For pipeline engine, the close is called in async writer's process block method, - // so that it will not block pipeline thread. - while (!_add_batches_finished && !_cancelled && !state->is_cancelled()) { - bthread_usleep(1000); - } - VLOG_CRITICAL << _parent->_sender_id << " close wait finished"; - _close_time_ms = UnixMillis() - _close_time_ms; - if (state->is_cancelled()) { _cancel_with_msg(state->cancel_reason().to_string()); } + return Status::OK(); +} + +// Waiting for finished until _add_batches_finished changed by rpc's finished callback. +// it may take a long time, so we couldn't set a timeout +// For pipeline engine, the close is called in async writer's process block method, +// so that it will not block pipeline thread. +bool VNodeChannel::close_finish(RuntimeState* state) { + if (!_add_batches_finished && !_cancelled && !state->is_cancelled()) { + return false; + } + VLOG_CRITICAL << _parent->_sender_id << " close wait finished"; + return true; +} + +Status VNodeChannel::after_close_handle(RuntimeState* state) { + _close_time_ms = UnixMillis() - _close_time_ms; + if (_add_batches_finished) { _close_check(); _state->add_tablet_commit_infos(_tablet_commit_infos); @@ -988,6 +1017,9 @@ Status VNodeChannel::close_wait(RuntimeState* state) { _index_channel->set_error_tablet_in_state(state); _index_channel->set_tablets_received_rows(_tablets_received_rows, _node_id); _index_channel->set_tablets_filtered_rows(_tablets_filtered_rows, _node_id); + _index_channel->set_node_channel_speed( + _node_id, _index_channel->get_node_channel_write_bytes(_node_id) / + (UnixMillis() - _index_channel->get_start_time())); std::lock_guard l(_closed_lock); // only when normal close, we set _is_closed to true. @@ -1119,6 +1151,7 @@ Status VTabletWriter::open(doris::RuntimeState* state, doris::RuntimeProfile* pr VLOG_DEBUG << "list of open index id = " << fmt::to_string(buf); for (const auto& index_channel : _channels) { + index_channel->set_start_time(UnixMillis()); index_channel->for_each_node_channel([&index_channel]( const std::shared_ptr& ch) { auto st = ch->open_wait(); @@ -1580,16 +1613,7 @@ Status VTabletWriter::close(Status exec_status) { auto status = Status::OK(); // BE id -> add_batch method counter std::unordered_map node_add_batch_counter_map; - int64_t serialize_batch_ns = 0; - int64_t queue_push_lock_ns = 0; - int64_t actual_consume_ns = 0; - int64_t total_add_batch_exec_time_ns = 0; - int64_t max_add_batch_exec_time_ns = 0; - int64_t total_wait_exec_time_ns = 0; - int64_t max_wait_exec_time_ns = 0; - int64_t total_add_batch_num = 0; - int64_t num_node_channels = 0; - VNodeChannelStat channel_stat; + WriterStats writer_stats; for (const auto& index_channel : _channels) { if (!status.ok()) { @@ -1598,11 +1622,7 @@ Status VTabletWriter::close(Status exec_status) { int64_t add_batch_exec_time = 0; int64_t wait_exec_time = 0; index_channel->for_each_node_channel( - [this, &index_channel, &status, &node_add_batch_counter_map, - &serialize_batch_ns, &channel_stat, &queue_push_lock_ns, &actual_consume_ns, - &total_add_batch_exec_time_ns, &add_batch_exec_time, &total_wait_exec_time_ns, - &wait_exec_time, - &total_add_batch_num](const std::shared_ptr& ch) { + [this, &index_channel, &status](const std::shared_ptr& ch) { if (!status.ok() || (ch->is_closed() && !ch->is_cancelled())) { return; } @@ -1613,13 +1633,93 @@ Status VTabletWriter::close(Status exec_status) { status = cancel_channel_and_check_intolerable_failure( std::move(status), s.to_string(), *index_channel, *ch); } - ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns, - &channel_stat, &queue_push_lock_ns, &actual_consume_ns, - &total_add_batch_exec_time_ns, &add_batch_exec_time, - &total_wait_exec_time_ns, &wait_exec_time, - &total_add_batch_num); }); + std::unordered_set unfinished_node_channel_ids; + for (auto& it : index_channel->get_node_channels()) { + unfinished_node_channel_ids.insert(it.first); + } + auto check_each_node_channel = + [&](const std::shared_ptr& index_channel, + std::unordered_set& unfinished_node_channel_ids, + std::unordered_map& node_add_batch_counter_map, + WriterStats& writer_stats, Status& status) { + for (auto& it : index_channel->get_node_channels()) { + std::shared_ptr node_channel = it.second; + // If the node channel is not in the unfinished_node_channel_ids, + // it means the node channel is already closed. + if (!unfinished_node_channel_ids.contains(it.first)) { + continue; + } + // check if the node channel is finished or cancelled + if (node_channel->close_finish(_state)) { + unfinished_node_channel_ids.erase(it.first); + auto close_status = node_channel->after_close_handle(_state); + if (!close_status.ok()) { + status = cancel_channel_and_check_intolerable_failure( + std::move(status), close_status.to_string(), + *index_channel, *node_channel); + } + node_channel->time_report(&node_add_batch_counter_map, + &writer_stats); + } + } + }; + + // wait quorum success + while (true) { + check_each_node_channel(index_channel, unfinished_node_channel_ids, + node_add_batch_counter_map, writer_stats, status); + if (!status.ok() || unfinished_node_channel_ids.empty() || + index_channel->quorum_success()) { + break; + } + bthread_usleep(1000 * 10); + } + + // Wait for all node channel to complete as much as possible, + // if wait time is more than max_wait_time_seconds, + // or remaining time is less than load_timeout_remaining_seconds, + // cancel unfinished node channel. + if (status.ok() && !unfinished_node_channel_ids.empty()) { + int64_t max_wait_time_ms = 0; + double avg_speed = index_channel->calculate_avg_node_channel_speed(); + for (auto& id : unfinished_node_channel_ids) { + max_wait_time_ms = std::max( + max_wait_time_ms, + static_cast( + static_cast( + index_channel->get_node_channel_write_bytes(id)) / + avg_speed * (1 + config::max_wait_time_multiplier))); + } + while (true) { + check_each_node_channel(index_channel, unfinished_node_channel_ids, + node_add_batch_counter_map, writer_stats, status); + if (unfinished_node_channel_ids.empty() || !status.ok()) { + break; + } + int64_t load_time_ms = UnixMillis() - index_channel->get_start_time(); + if (load_time_ms > max_wait_time_ms || + _load_channel_timeout_s - load_time_ms < + config::load_timeout_remaining_seconds * 1000) { + // cancel unfinished node channel + std::stringstream unfinished_node_channel_host_str; + for (auto& it : unfinished_node_channel_ids) { + unfinished_node_channel_host_str + << index_channel->get_node_channels()[it]->host() << ","; + index_channel->get_node_channels()[it]->cancel("timeout"); + } + LOG(INFO) << "reach max wait time, cancel unfinished node channel and " + "finish close" + << ", load id: " << print_id(_load_id) << "_txn_id: " << _txn_id + << ", unfinished node channel: " + << unfinished_node_channel_host_str.str(); + break; + } + bthread_usleep(1000 * 10); + } + } + // Due to the non-determinism of compaction, the rowsets of each replica may be different from each other on different // BE nodes. The number of rows filtered in SegmentWriter depends on the historical rowsets located in the correspoding // BE node. So we check the number of rows filtered on each succeccful BE to ensure the consistency of the current load @@ -1633,21 +1733,22 @@ Status VTabletWriter::close(Status exec_status) { } } - num_node_channels += index_channel->num_node_channels(); - if (add_batch_exec_time > max_add_batch_exec_time_ns) { - max_add_batch_exec_time_ns = add_batch_exec_time; + writer_stats.num_node_channels += index_channel->num_node_channels(); + if (add_batch_exec_time > writer_stats.max_add_batch_exec_time_ns) { + writer_stats.max_add_batch_exec_time_ns = add_batch_exec_time; } - if (wait_exec_time > max_wait_exec_time_ns) { - max_wait_exec_time_ns = wait_exec_time; + if (wait_exec_time > writer_stats.max_wait_exec_time_ns) { + writer_stats.max_wait_exec_time_ns = wait_exec_time; } } // end for index channels if (status.ok()) { // TODO need to be improved - LOG(INFO) << "total mem_exceeded_block_ns=" << channel_stat.mem_exceeded_block_ns - << ", total queue_push_lock_ns=" << queue_push_lock_ns - << ", total actual_consume_ns=" << actual_consume_ns - << ", load id=" << print_id(_load_id); + LOG(INFO) << "total mem_exceeded_block_ns=" + << writer_stats.channel_stat.mem_exceeded_block_ns + << ", total queue_push_lock_ns=" << writer_stats.queue_push_lock_ns + << ", total actual_consume_ns=" << writer_stats.actual_consume_ns + << ", load id=" << print_id(_load_id) << ", txn_id=" << _txn_id; COUNTER_SET(_input_rows_counter, _number_input_rows); COUNTER_SET(_output_rows_counter, _number_output_rows); @@ -1658,18 +1759,19 @@ Status VTabletWriter::close(Status exec_status) { COUNTER_SET(_send_data_timer, _send_data_ns); COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time()); COUNTER_SET(_filter_timer, _filter_ns); - COUNTER_SET(_append_node_channel_timer, channel_stat.append_node_channel_ns); - COUNTER_SET(_where_clause_timer, channel_stat.where_clause_ns); - COUNTER_SET(_wait_mem_limit_timer, channel_stat.mem_exceeded_block_ns); + COUNTER_SET(_append_node_channel_timer, + writer_stats.channel_stat.append_node_channel_ns); + COUNTER_SET(_where_clause_timer, writer_stats.channel_stat.where_clause_ns); + COUNTER_SET(_wait_mem_limit_timer, writer_stats.channel_stat.mem_exceeded_block_ns); COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns()); - COUNTER_SET(_serialize_batch_timer, serialize_batch_ns); - COUNTER_SET(_non_blocking_send_work_timer, actual_consume_ns); - COUNTER_SET(_total_add_batch_exec_timer, total_add_batch_exec_time_ns); - COUNTER_SET(_max_add_batch_exec_timer, max_add_batch_exec_time_ns); - COUNTER_SET(_total_wait_exec_timer, total_wait_exec_time_ns); - COUNTER_SET(_max_wait_exec_timer, max_wait_exec_time_ns); - COUNTER_SET(_add_batch_number, total_add_batch_num); - COUNTER_SET(_num_node_channels, num_node_channels); + COUNTER_SET(_serialize_batch_timer, writer_stats.serialize_batch_ns); + COUNTER_SET(_non_blocking_send_work_timer, writer_stats.actual_consume_ns); + COUNTER_SET(_total_add_batch_exec_timer, writer_stats.total_add_batch_exec_time_ns); + COUNTER_SET(_max_add_batch_exec_timer, writer_stats.max_add_batch_exec_time_ns); + COUNTER_SET(_total_wait_exec_timer, writer_stats.total_wait_exec_time_ns); + COUNTER_SET(_max_wait_exec_timer, writer_stats.max_wait_exec_time_ns); + COUNTER_SET(_add_batch_number, writer_stats.total_add_batch_num); + COUNTER_SET(_num_node_channels, writer_stats.num_node_channels); // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node int64_t num_rows_load_total = _number_input_rows + _state->num_rows_load_filtered() + @@ -1806,6 +1908,9 @@ Status VTabletWriter::write(RuntimeState* state, doris::vectorized::Block& input for (const auto& entry : channel_to_payload[i]) { // if this node channel is already failed, this add_row will be skipped // entry.second is a [row -> tablet] mapping + for (const auto& tablet_id : entry.second.second) { + _channels[i]->update_write_tablets(tablet_id); + } auto st = entry.first->add_block(block.get(), &entry.second); if (!st.ok()) { _channels[i]->mark_as_failed(entry.first, st.to_string()); diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index 9f10c01ba8aa5e..025341ad46966b 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -206,6 +206,19 @@ class VNodeChannelStat { int64_t append_node_channel_ns = 0; }; +struct WriterStats { + int64_t serialize_batch_ns = 0; + int64_t queue_push_lock_ns = 0; + int64_t actual_consume_ns = 0; + int64_t total_add_batch_exec_time_ns = 0; + int64_t max_add_batch_exec_time_ns = 0; + int64_t total_wait_exec_time_ns = 0; + int64_t max_wait_exec_time_ns = 0; + int64_t total_add_batch_num = 0; + int64_t num_node_channels = 0; + VNodeChannelStat channel_stat; +}; + // pair using Payload = std::pair, std::vector>; @@ -275,25 +288,25 @@ class VNodeChannel { // 2. just cancel() Status close_wait(RuntimeState* state); + bool close_finish(RuntimeState* state); + + Status after_close_handle(RuntimeState* state); + void cancel(const std::string& cancel_msg); void time_report(std::unordered_map* add_batch_counter_map, - int64_t* serialize_batch_ns, VNodeChannelStat* stat, - int64_t* queue_push_lock_ns, int64_t* actual_consume_ns, - int64_t* total_add_batch_exec_time_ns, int64_t* add_batch_exec_time_ns, - int64_t* total_wait_exec_time_ns, int64_t* wait_exec_time_ns, - int64_t* total_add_batch_num) const { + WriterStats* writer_stats) const { (*add_batch_counter_map)[_node_id] += _add_batch_counter; (*add_batch_counter_map)[_node_id].close_wait_time_ms = _close_time_ms; - *serialize_batch_ns += _serialize_batch_ns; - *stat += _stat; - *queue_push_lock_ns += _queue_push_lock_ns; - *actual_consume_ns += _actual_consume_ns; - *add_batch_exec_time_ns = (_add_batch_counter.add_batch_execution_time_us * 1000); - *total_add_batch_exec_time_ns += *add_batch_exec_time_ns; - *wait_exec_time_ns = (_add_batch_counter.add_batch_wait_execution_time_us * 1000); - *total_wait_exec_time_ns += *wait_exec_time_ns; - *total_add_batch_num += _add_batch_counter.add_batch_num; + writer_stats->serialize_batch_ns += _serialize_batch_ns; + writer_stats->channel_stat += _stat; + writer_stats->queue_push_lock_ns += _queue_push_lock_ns; + writer_stats->actual_consume_ns += _actual_consume_ns; + writer_stats->total_add_batch_exec_time_ns += + (_add_batch_counter.add_batch_execution_time_us * 1000); + writer_stats->total_wait_exec_time_ns += + (_add_batch_counter.add_batch_wait_execution_time_us * 1000); + writer_stats->total_add_batch_num += _add_batch_counter.add_batch_num; } int64_t node_id() const { return _node_id; } @@ -458,9 +471,15 @@ class IndexChannel { int64_t tablet_id = -1); Status check_intolerable_failure(); + bool quorum_success(); + // set error tablet info in runtime state, so that it can be returned to FE. void set_error_tablet_in_state(RuntimeState* state); + std::unordered_map> get_node_channels() { + return _node_channels; + } + size_t num_node_channels() const { return _node_channels.size(); } size_t get_pending_bytes() const { @@ -490,8 +509,39 @@ class IndexChannel { // check whether the rows num filtered by different replicas is consistent Status check_tablet_filtered_rows_consistency(); + void set_start_time(const int64_t& start_time) { _start_time = start_time; } + + int64_t get_start_time() const { return _start_time; } + + void update_node_channel_write_bytes(const int64_t& node_id, const int64_t& bytes) { + _node_channel_write_bytes[node_id] += bytes; + } + + int64_t get_node_channel_write_bytes(const int64_t& node_id) { + return _node_channel_write_bytes[node_id]; + } + + void update_write_tablets(const int64_t& tablet_id) { _write_tablets.insert(tablet_id); } + + void set_node_channel_speed(const int64_t& node_id, const int64_t& speed) { + LOG(INFO) << "set_node_channel_speed" + << ", node_id: " << node_id << ", speed: " << speed; + _node_channel_speed[node_id] = speed; + } + + double calculate_avg_node_channel_speed() { + DCHECK(!_node_channel_speed.empty()) << "node_channel_speed is empty"; + double total_speed = 0; + for (const auto& [node_id, speed] : _node_channel_speed) { + total_speed += speed; + } + return total_speed / _node_channel_speed.size(); + } + vectorized::VExprContextSPtr get_where_clause() { return _where_clause; } + int64_t get_index_id() const { return _index_id; } + private: friend class VNodeChannel; friend class VTabletWriter; @@ -530,6 +580,13 @@ class IndexChannel { // rows num filtered by DeltaWriter per tablet, tablet_id -> // used to verify whether the rows num filtered by different replicas is consistent std::map>> _tablets_filtered_rows; + + // key is node_id, value is the bytes written by this node + std::unordered_map _node_channel_write_bytes; + int64_t _start_time = 0; + // key is node_id, value is the speed of this node(bytes/s) + std::unordered_map _node_channel_speed; + std::set _write_tablets; }; } // namespace vectorized } // namespace doris @@ -558,6 +615,8 @@ class VTabletWriter final : public AsyncResultWriter { Status _send_new_partition_batch(); + PUniqueId load_id() const { return _load_id; } + private: friend class VNodeChannel; friend class IndexChannel; From 6f29a6f0b6b9ec9478ffdc41f72d4dc60f877f71 Mon Sep 17 00:00:00 2001 From: laihui Date: Fri, 30 May 2025 22:45:43 +0800 Subject: [PATCH 3/5] introduce quorum success write to tolerate slow node(part II) --- be/src/vec/sink/load_stream_map_pool.h | 9 + be/src/vec/sink/load_stream_stub.cpp | 52 ++--- be/src/vec/sink/load_stream_stub.h | 12 +- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 206 +++++++++++++++---- be/src/vec/sink/writer/vtablet_writer_v2.h | 15 ++ 5 files changed, 214 insertions(+), 80 deletions(-) diff --git a/be/src/vec/sink/load_stream_map_pool.h b/be/src/vec/sink/load_stream_map_pool.h index 2900d4c73e0b4f..a664461e971d76 100644 --- a/be/src/vec/sink/load_stream_map_pool.h +++ b/be/src/vec/sink/load_stream_map_pool.h @@ -102,6 +102,15 @@ class LoadStreamMap { // only call this method after release() returns true. void close_load_all_streams(); + std::unordered_map> get_streams_for_node() { + decltype(_streams_for_node) snapshot; + { + std::lock_guard lock(_mutex); + snapshot = _streams_for_node; + } + return snapshot; + } + private: const UniqueId _load_id; const int64_t _src_id; diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 342c33a2090200..6ea8899350f543 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -114,9 +114,7 @@ void LoadStreamReplyHandler::on_closed(brpc::StreamId id) { LOG(WARNING) << "stub is not exist when on_closed, " << *this; return; } - std::lock_guard lock(stub->_close_mutex); stub->_is_closed.store(true); - stub->_close_cv.notify_all(); } inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler& handler) { @@ -334,37 +332,28 @@ Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, i return Status::OK(); } -Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) { +Status LoadStreamStub::close_finish_check(RuntimeState* state, bool* is_closed) { DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK); + *is_closed = true; if (!_is_open.load()) { // we don't need to close wait on non-open streams return Status::OK(); } if (!_is_closing.load()) { + *is_closed = false; return _status; } + if (state->get_query_ctx()->is_cancelled()) { + return state->get_query_ctx()->exec_status(); + } if (_is_closed.load()) { - return _check_cancel(); - } - DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0"; - std::unique_lock lock(_close_mutex); - auto timeout_sec = timeout_ms / 1000; - while (!_is_closed.load() && !state->get_query_ctx()->is_cancelled()) { - //the query maybe cancel, so need check after wait 1s - timeout_sec = timeout_sec - 1; - LOG(INFO) << "close waiting, " << *this << ", timeout_sec=" << timeout_sec - << ", is_closed=" << _is_closed.load() - << ", is_cancelled=" << state->get_query_ctx()->is_cancelled(); - int ret = _close_cv.wait_for(lock, 1000000); - if (ret != 0 && timeout_sec <= 0) { - return Status::InternalError("stream close_wait timeout, error={}, timeout_ms={}, {}", - ret, timeout_ms, to_string()); + RETURN_IF_ERROR(_check_cancel()); + if (!_is_eos.load()) { + return Status::InternalError("Stream closed without EOS, {}", to_string()); } + return Status::OK(); } - RETURN_IF_ERROR(_check_cancel()); - if (!_is_eos.load()) { - return Status::InternalError("stream closed without eos, {}", to_string()); - } + *is_closed = false; return Status::OK(); } @@ -378,15 +367,12 @@ void LoadStreamStub::cancel(Status reason) { _cancel_st = reason; _is_cancelled.store(true); } - { - std::lock_guard lock(_close_mutex); - _is_closed.store(true); - _close_cv.notify_all(); - } + _is_closed.store(true); } Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span data) { butil::IOBuf buf; + // append data to buffer size_t header_len = header.ByteSizeLong(); buf.append(reinterpret_cast(&header_len), sizeof(header_len)); buf.append(header.SerializeAsString()); @@ -398,6 +384,9 @@ Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span& tablets_to_comm return status; } -Status LoadStreamStubs::close_wait(RuntimeState* state, int64_t timeout_ms) { - MonotonicStopWatch watch; - watch.start(); - for (auto& stream : _streams) { - RETURN_IF_ERROR(stream->close_wait(state, timeout_ms - watch.elapsed_time() / 1000 / 1000)); - } - return Status::OK(); -} - } // namespace doris diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 42b89dd6c19fe8..d3098df28c9ec8 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -35,6 +35,7 @@ #include // IWYU pragma: no_include #include // IWYU pragma: keep +#include #include #include #include @@ -155,7 +156,7 @@ class LoadStreamStub : public std::enable_shared_from_this { // wait remote to close stream, // remote will close stream when it receives CLOSE_LOAD - Status close_wait(RuntimeState* state, int64_t timeout_ms = 0); + Status close_finish_check(RuntimeState* state, bool* is_closed); // cancel the stream, abort close_wait, mark _is_closed and _is_cancelled void cancel(Status reason); @@ -216,6 +217,8 @@ class LoadStreamStub : public std::enable_shared_from_this { _failed_tablets[tablet_id] = reason; } + int64_t bytes_written() const { return _bytes_written; } + private: Status _encode_and_send(PStreamHeader& header, std::span data = {}); Status _send_with_buffer(butil::IOBuf& buf, bool sync = false); @@ -247,9 +250,7 @@ class LoadStreamStub : public std::enable_shared_from_this { Status _cancel_st; bthread::Mutex _open_mutex; - bthread::Mutex _close_mutex; bthread::Mutex _cancel_mutex; - bthread::ConditionVariable _close_cv; std::mutex _buffer_mutex; std::mutex _send_mutex; @@ -267,6 +268,7 @@ class LoadStreamStub : public std::enable_shared_from_this { bool _is_incremental = false; bool _auto_partition_one_step_close = false; + size_t _bytes_written = 0; }; // a collection of LoadStreams connect to the same node @@ -311,8 +313,6 @@ class LoadStreamStubs { Status close_load(const std::vector& tablets_to_commit); - Status close_wait(RuntimeState* state, int64_t timeout_ms = 0); - std::unordered_set success_tablets() { std::unordered_set s; for (auto& stream : _streams) { @@ -331,6 +331,8 @@ class LoadStreamStubs { return m; } + std::vector> streams() { return _streams; } + private: std::vector> _streams; std::atomic _open_success = false; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 1e19ca26b42459..4a3bdfd138371d 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -494,6 +494,7 @@ Status VTabletWriterV2::write(RuntimeState* state, Block& input_block) { // For each tablet, send its input_rows from block to delta writer for (const auto& [tablet_id, rows] : rows_for_tablet) { RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows)); + _write_tablets.insert(tablet_id); } COUNTER_SET(_input_rows_counter, _number_input_rows); @@ -726,54 +727,181 @@ Status VTabletWriterV2::close(Status exec_status) { Status VTabletWriterV2::_close_wait(bool incremental) { SCOPED_TIMER(_close_load_timer); - auto st = _load_stream_map->for_each_st( - [this, incremental](int64_t dst_id, LoadStreamStubs& streams) -> Status { - if (streams.is_incremental() != incremental) { - return Status::OK(); - } - int64_t remain_ms = static_cast(_state->execution_timeout()) * 1000 - - _timeout_watch.elapsed_time() / 1000 / 1000; - DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 0; }); - if (remain_ms <= 0) { - LOG(WARNING) << "load timed out before close waiting, load_id=" - << print_id(_load_id); - return Status::TimedOut("load timed out before close waiting"); - } - auto st = streams.close_wait(_state, remain_ms); - if (!st.ok()) { - LOG(WARNING) << "close_wait timeout on streams to dst_id=" << dst_id - << ", load_id=" << print_id(_load_id) << ": " << st; + auto streams_for_node = _load_stream_map->get_streams_for_node(); + std::unordered_set> unfinished_streams; + for (const auto& [dst_id, streams] : streams_for_node) { + for (const auto& stream : streams->streams()) { + unfinished_streams.insert(stream); + } + } + Status status; + // First wait for quorum success + while (true) { + RETURN_IF_ERROR(_check_timeout()); + RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, streams_for_node)); + if (!status.ok() || unfinished_streams.empty()) { + break; + } + bthread_usleep(1000 * 10); + } + return status; +} + +bool VTabletWriterV2::_quorum_success( + const std::unordered_set>& unfinished_streams) { + if (_write_tablets.empty()) { + return false; + } + const int num_replicas = _num_replicas; + const int quorum = num_replicas / 2 + 1; + std::unordered_map success_counts; + auto streams_for_node = _load_stream_map->get_streams_for_node(); + for (const auto& [dst_id, streams] : streams_for_node) { + for (const auto& stream : streams->streams()) { + if (unfinished_streams.contains(stream)) { + continue; + } + for (auto tablet_id : stream->success_tablets()) { + if (_write_tablets.contains(tablet_id)) { + success_counts[tablet_id]++; } - return st; - }); - if (!st.ok()) { - LOG(WARNING) << "close_wait failed: " << st << ", load_id=" << print_id(_load_id); + } + } } - return st; + for (auto tablet_id : _write_tablets) { + int success = success_counts[tablet_id]; + if (success < quorum) { + return false; + } + } + return true; +} + +double VTabletWriterV2::_calc_max_wait_time_ms( + const std::unordered_map>& streams_for_node, + const std::unordered_set>& unfinished_streams) { + double avg_speed = 0.0; + int64_t total_time_ms = std::max( + 1L, static_cast(UnixMillis() - _timeout_watch.elapsed_time())); + int64_t finished_streams_count = 0; + + for (const auto& [dst_id, streams] : streams_for_node) { + for (const auto& stream : streams->streams()) { + if (unfinished_streams.contains(stream)) { + continue; + } + avg_speed += static_cast(stream->bytes_written()) / + static_cast(total_time_ms); + finished_streams_count++; + } + } + DCHECK(finished_streams_count > 0) << "no finished streams"; + avg_speed /= static_cast(finished_streams_count); + + double max_wait_time_ms = 0.0; + for (const auto& [dst_id, streams] : streams_for_node) { + for (const auto& stream : streams->streams()) { + if (unfinished_streams.contains(stream)) { + max_wait_time_ms = std::max( + max_wait_time_ms, static_cast(stream->bytes_written()) / avg_speed); + } + } + } + max_wait_time_ms += config::max_wait_time_multiplier * max_wait_time_ms; + + return max_wait_time_ms; } Status VTabletWriterV2::_close_wait_all_streams() { SCOPED_TIMER(_close_load_timer); - auto st = _load_stream_map->for_each_st([this](int64_t dst_id, - LoadStreamStubs& streams) -> Status { - int64_t remain_ms = static_cast(_state->execution_timeout()) * 1000 - - _timeout_watch.elapsed_time() / 1000 / 1000; - DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 0; }); - if (remain_ms <= 0) { - LOG(WARNING) << "load timed out before close waiting, load_id=" << print_id(_load_id); - return Status::TimedOut("load timed out before close waiting"); + auto streams_for_node = _load_stream_map->get_streams_for_node(); + + std::unordered_set> unfinished_streams; + for (const auto& [dst_id, streams] : streams_for_node) { + for (const auto& stream : streams->streams()) { + unfinished_streams.insert(stream); } - auto st = streams.close_wait(_state, remain_ms); - if (!st.ok()) { - LOG(WARNING) << "close_wait timeout on streams to dst_id=" << dst_id - << ", load_id=" << print_id(_load_id) << ": " << st; + } + + Status status; + // First wait for quorum success + while (true) { + RETURN_IF_ERROR(_check_timeout()); + RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, streams_for_node)); + if (_quorum_success(unfinished_streams) || !status.ok() || unfinished_streams.empty()) { + break; } - return st; - }); - if (!st.ok()) { - LOG(WARNING) << "close_wait failed: " << st << ", load_id=" << print_id(_load_id); + bthread_usleep(1000 * 10); } - return st; + + // Then wait for remaining streams as much as possible + if (status.ok() && !unfinished_streams.empty()) { + double max_wait_time_ms = _calc_max_wait_time_ms(streams_for_node, unfinished_streams); + while (true) { + RETURN_IF_ERROR(_check_timeout()); + RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, streams_for_node)); + + if (unfinished_streams.empty() || !status.ok()) { + break; + } + + // Check if we should stop waiting + if (static_cast(UnixMillis() - _timeout_watch.elapsed_time()) > + max_wait_time_ms || + _state->execution_timeout() * 1000 - _timeout_watch.elapsed_time() < + config::load_timeout_remaining_seconds * 1000) { + std::stringstream unfinished_streams_str; + for (const auto& stream : unfinished_streams) { + unfinished_streams_str << stream->stream_id() << ","; + } + LOG(INFO) << "reach max wait time" + << ", load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id + << ", unfinished streams: " << unfinished_streams_str.str(); + break; + } + bthread_usleep(1000 * 10); + } + } + + if (!status.ok()) { + LOG(WARNING) << "close_wait failed: " << status << ", load_id=" << print_id(_load_id); + } + return status; +} + +Status VTabletWriterV2::_check_timeout() { + int64_t remain_ms = static_cast(_state->execution_timeout()) * 1000 - + _timeout_watch.elapsed_time() / 1000 / 1000; + DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 0; }); + if (remain_ms <= 0) { + LOG(WARNING) << "load timed out before close waiting, load_id=" << print_id(_load_id); + return Status::TimedOut("load timed out before close waiting"); + } + return Status::OK(); +} + +Status VTabletWriterV2::_check_streams_finish( + std::unordered_set>& unfinished_streams, Status& status, + const std::unordered_map>& streams_for_node) { + for (const auto& [dst_id, streams] : streams_for_node) { + for (const auto& stream : streams->streams()) { + if (!unfinished_streams.contains(stream)) { + continue; + } + bool is_closed = false; + auto stream_st = stream->close_finish_check(_state, &is_closed); + if (!stream_st.ok()) { + status = stream_st; + unfinished_streams.erase(stream); + LOG(WARNING) << "close_wait failed: " << stream_st + << ", load_id=" << print_id(_load_id); + } + if (is_closed) { + unfinished_streams.erase(stream); + } + } + } + return status; } void VTabletWriterV2::_calc_tablets_to_commit() { diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index 1ea173e7f20ba9..25c819b49985bf 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -151,6 +151,19 @@ class VTabletWriterV2 final : public AsyncResultWriter { Status _close_wait_all_streams(); + bool _quorum_success( + const std::unordered_set>& unfinished_streams); + + Status _check_timeout(); + + Status _check_streams_finish( + std::unordered_set>& unfinished_streams, Status& status, + const std::unordered_map>& streams_for_node); + + double _calc_max_wait_time_ms( + const std::unordered_map>& streams_for_node, + const std::unordered_set>& unfinished_streams); + void _cancel(Status status); std::shared_ptr _mem_tracker; @@ -234,6 +247,8 @@ class VTabletWriterV2 final : public AsyncResultWriter { std::vector _row_part_tablet_ids; bool _auto_partition_one_step_close = false; + + std::unordered_set _write_tablets; }; } // namespace vectorized From 7b2c53d90d0337bed382da7b36c914abc0105d75 Mon Sep 17 00:00:00 2001 From: laihui Date: Sat, 7 Jun 2025 23:56:39 +0800 Subject: [PATCH 4/5] update --- be/src/vec/sink/writer/vtablet_writer.cpp | 280 +++++++++++----------- be/src/vec/sink/writer/vtablet_writer.h | 48 ++-- 2 files changed, 163 insertions(+), 165 deletions(-) diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 0d22c1b1b16f37..161ef115d4acbd 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -279,6 +279,110 @@ Status IndexChannel::check_tablet_filtered_rows_consistency() { return Status::OK(); } +static Status cancel_channel_and_check_intolerable_failure(Status status, + const std::string& err_msg, + IndexChannel& ich, VNodeChannel& nch) { + LOG(WARNING) << nch.channel_info() << ", close channel failed, err: " << err_msg; + ich.mark_as_failed(&nch, err_msg, -1); + // cancel the node channel in best effort + nch.cancel(err_msg); + + // check if index has intolerable failure + if (Status index_st = ich.check_intolerable_failure(); !index_st.ok()) { + status = std::move(index_st); + } else if (Status receive_st = ich.check_tablet_received_rows_consistency(); !receive_st.ok()) { + status = std::move(receive_st); + } else if (Status filter_st = ich.check_tablet_filtered_rows_consistency(); !filter_st.ok()) { + status = std::move(filter_st); + } + return status; +} + +Status IndexChannel::close_wait( + RuntimeState* state, WriterStats* writer_stats, + std::unordered_map* node_add_batch_counter_map) { + Status status = Status::OK(); + std::unordered_set unfinished_node_channel_ids; + for (auto& it : _node_channels) { + unfinished_node_channel_ids.insert(it.first); + } + // wait quorum success + while (true) { + status =check_each_node_channel_close(&unfinished_node_channel_ids, + node_add_batch_counter_map, writer_stats, status); + if (!status.ok() || unfinished_node_channel_ids.empty() || quorum_success()) { + break; + } + bthread_usleep(1000 * 10); + } + + // Wait for all node channel to complete as much as possible, + // if wait time is more than max_wait_time_seconds, + // or remaining time is less than load_timeout_remaining_seconds, + // cancel unfinished node channel. + if (status.ok() && !unfinished_node_channel_ids.empty()) { + int64_t max_wait_time_ms = 0; + double avg_speed = calculate_avg_node_channel_speed(); + for (auto& id : unfinished_node_channel_ids) { + max_wait_time_ms = std::max( + max_wait_time_ms, + static_cast(static_cast(get_node_channel_write_bytes(id)) / + avg_speed * (1 + config::max_wait_time_multiplier))); + } + while (true) { + status =check_each_node_channel_close( + &unfinished_node_channel_ids, node_add_batch_counter_map, writer_stats, status); + if (unfinished_node_channel_ids.empty() || !status.ok()) { + break; + } + int64_t load_time_ms = UnixMillis() - get_start_time(); + if (load_time_ms > max_wait_time_ms || + _parent->_load_channel_timeout_s - load_time_ms < + config::load_timeout_remaining_seconds * 1000) { + // cancel unfinished node channel + std::stringstream unfinished_node_channel_host_str; + for (auto& it : unfinished_node_channel_ids) { + unfinished_node_channel_host_str << _node_channels[it]->host() << ","; + _node_channels[it]->cancel("timeout"); + } + LOG(INFO) << "reach max wait time, cancel unfinished node channel and " + "finish close" + << ", load id: " << print_id(_parent->_load_id) + << "_txn_id: " << _parent->_txn_id << ", unfinished node channel: " + << unfinished_node_channel_host_str.str(); + break; + } + bthread_usleep(1000 * 10); + } + } + return status; +} + +Status IndexChannel::check_each_node_channel_close( + std::unordered_set* unfinished_node_channel_ids, + std::unordered_map* node_add_batch_counter_map, + WriterStats* writer_stats, Status status) { + for (auto& it : _node_channels) { + std::shared_ptr node_channel = it.second; + // If the node channel is not in the unfinished_node_channel_ids, + // it means the node channel is already closed. + if (!unfinished_node_channel_ids->contains(it.first)) { + continue; + } + bool node_channel_closed = false; + auto s = it.second->close_wait(_parent->_state, &node_channel_closed); + if (node_channel_closed) { + s = it.second->after_close_handle(_parent->_state, writer_stats, + node_add_batch_counter_map); + } + if (!s.ok()) { + status = cancel_channel_and_check_intolerable_failure(std::move(status), s.to_string(), + *this, *it.second); + } + } + return status; +} + bool IndexChannel::quorum_success() { if (_write_tablets.empty()) { return false; @@ -969,13 +1073,15 @@ void VNodeChannel::cancel(const std::string& cancel_msg) { static_cast(request->release_id()); } -Status VNodeChannel::close_wait(RuntimeState* state) { +Status VNodeChannel::close_wait(RuntimeState* state, bool* is_closed) { DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", { std::thread t(injection_full_gc_fn); t.join(); }); SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); + *is_closed = true; + auto st = none_of({_cancelled, !_eos_is_produced}); if (!st.ok()) { if (_cancelled) { @@ -992,22 +1098,22 @@ Status VNodeChannel::close_wait(RuntimeState* state) { _cancel_with_msg(state->cancel_reason().to_string()); } - return Status::OK(); -} - -// Waiting for finished until _add_batches_finished changed by rpc's finished callback. -// it may take a long time, so we couldn't set a timeout -// For pipeline engine, the close is called in async writer's process block method, -// so that it will not block pipeline thread. -bool VNodeChannel::close_finish(RuntimeState* state) { + // Waiting for finished until _add_batches_finished changed by rpc's finished callback. + // it may take a long time, so we couldn't set a timeout + // For pipeline engine, the close is called in async writer's process block method, + // so that it will not block pipeline thread. if (!_add_batches_finished && !_cancelled && !state->is_cancelled()) { - return false; + *is_closed = false; + return Status::OK(); } VLOG_CRITICAL << _parent->_sender_id << " close wait finished"; - return true; + return Status::OK(); } -Status VNodeChannel::after_close_handle(RuntimeState* state) { +Status VNodeChannel::after_close_handle( + RuntimeState* state, WriterStats* writer_stats, + std::unordered_map* node_add_batch_counter_map) { + Status st = Status::Error(get_cancel_msg()); _close_time_ms = UnixMillis() - _close_time_ms; if (_add_batches_finished) { @@ -1025,10 +1131,11 @@ Status VNodeChannel::after_close_handle(RuntimeState* state) { // only when normal close, we set _is_closed to true. // otherwise, we will set it to true in cancel(). _is_closed = true; - return Status::OK(); + st = Status::OK(); } - return Status::Error(get_cancel_msg()); + time_report(node_add_batch_counter_map, writer_stats); + return st; } void VNodeChannel::_close_check() { @@ -1435,25 +1542,6 @@ Status VTabletWriter::_incremental_open_node_channel( return Status::OK(); } -static Status cancel_channel_and_check_intolerable_failure(Status status, - const std::string& err_msg, - IndexChannel& ich, VNodeChannel& nch) { - LOG(WARNING) << nch.channel_info() << ", close channel failed, err: " << err_msg; - ich.mark_as_failed(&nch, err_msg, -1); - // cancel the node channel in best effort - nch.cancel(err_msg); - - // check if index has intolerable failure - if (Status index_st = ich.check_intolerable_failure(); !index_st.ok()) { - status = std::move(index_st); - } else if (Status receive_st = ich.check_tablet_received_rows_consistency(); !receive_st.ok()) { - status = std::move(receive_st); - } else if (Status filter_st = ich.check_tablet_filtered_rows_consistency(); !filter_st.ok()) { - status = std::move(filter_st); - } - return status; -} - void VTabletWriter::_cancel_all_channel(Status status) { for (const auto& index_channel : _channels) { index_channel->for_each_node_channel([&status](const std::shared_ptr& ch) { @@ -1535,19 +1623,20 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status if (!status.ok()) { break; } - index_channel->for_init_node_channel( - [this, &index_channel, &status](const std::shared_ptr& ch) { - if (!status.ok() || ch->is_closed()) { - return; - } - auto s = ch->close_wait(_state); - VLOG_DEBUG << index_channel->_parent->_sender_id << "'s " << ch->host() - << "close1 wait finished!"; - if (!s.ok()) { - status = cancel_channel_and_check_intolerable_failure( - std::move(status), s.to_string(), *index_channel, *ch); - } - }); + std::unordered_set unfinished_node_channel_ids; + for (auto& it : index_channel->_node_channels) { + if (!it.second->is_incremental()) { + unfinished_node_channel_ids.insert(it.first); + } + } + while (true) { + status = index_channel->check_each_node_channel_close( + &unfinished_node_channel_ids, nullptr, nullptr, status); + if (!status.ok() || unfinished_node_channel_ids.empty()) { + break; + } + bthread_usleep(1000 * 10); + } if (!status.ok()) { break; } @@ -1621,104 +1710,7 @@ Status VTabletWriter::close(Status exec_status) { } int64_t add_batch_exec_time = 0; int64_t wait_exec_time = 0; - index_channel->for_each_node_channel( - [this, &index_channel, &status](const std::shared_ptr& ch) { - if (!status.ok() || (ch->is_closed() && !ch->is_cancelled())) { - return; - } - // in pipeline, all node channels are done or canceled, will not block. - // no pipeline, close may block waiting. - auto s = ch->close_wait(_state); - if (!s.ok()) { - status = cancel_channel_and_check_intolerable_failure( - std::move(status), s.to_string(), *index_channel, *ch); - } - }); - - std::unordered_set unfinished_node_channel_ids; - for (auto& it : index_channel->get_node_channels()) { - unfinished_node_channel_ids.insert(it.first); - } - auto check_each_node_channel = - [&](const std::shared_ptr& index_channel, - std::unordered_set& unfinished_node_channel_ids, - std::unordered_map& node_add_batch_counter_map, - WriterStats& writer_stats, Status& status) { - for (auto& it : index_channel->get_node_channels()) { - std::shared_ptr node_channel = it.second; - // If the node channel is not in the unfinished_node_channel_ids, - // it means the node channel is already closed. - if (!unfinished_node_channel_ids.contains(it.first)) { - continue; - } - // check if the node channel is finished or cancelled - if (node_channel->close_finish(_state)) { - unfinished_node_channel_ids.erase(it.first); - auto close_status = node_channel->after_close_handle(_state); - if (!close_status.ok()) { - status = cancel_channel_and_check_intolerable_failure( - std::move(status), close_status.to_string(), - *index_channel, *node_channel); - } - node_channel->time_report(&node_add_batch_counter_map, - &writer_stats); - } - } - }; - - // wait quorum success - while (true) { - check_each_node_channel(index_channel, unfinished_node_channel_ids, - node_add_batch_counter_map, writer_stats, status); - if (!status.ok() || unfinished_node_channel_ids.empty() || - index_channel->quorum_success()) { - break; - } - bthread_usleep(1000 * 10); - } - - // Wait for all node channel to complete as much as possible, - // if wait time is more than max_wait_time_seconds, - // or remaining time is less than load_timeout_remaining_seconds, - // cancel unfinished node channel. - if (status.ok() && !unfinished_node_channel_ids.empty()) { - int64_t max_wait_time_ms = 0; - double avg_speed = index_channel->calculate_avg_node_channel_speed(); - for (auto& id : unfinished_node_channel_ids) { - max_wait_time_ms = std::max( - max_wait_time_ms, - static_cast( - static_cast( - index_channel->get_node_channel_write_bytes(id)) / - avg_speed * (1 + config::max_wait_time_multiplier))); - } - while (true) { - check_each_node_channel(index_channel, unfinished_node_channel_ids, - node_add_batch_counter_map, writer_stats, status); - if (unfinished_node_channel_ids.empty() || !status.ok()) { - break; - } - int64_t load_time_ms = UnixMillis() - index_channel->get_start_time(); - if (load_time_ms > max_wait_time_ms || - _load_channel_timeout_s - load_time_ms < - config::load_timeout_remaining_seconds * 1000) { - // cancel unfinished node channel - std::stringstream unfinished_node_channel_host_str; - for (auto& it : unfinished_node_channel_ids) { - unfinished_node_channel_host_str - << index_channel->get_node_channels()[it]->host() << ","; - index_channel->get_node_channels()[it]->cancel("timeout"); - } - LOG(INFO) << "reach max wait time, cancel unfinished node channel and " - "finish close" - << ", load id: " << print_id(_load_id) << "_txn_id: " << _txn_id - << ", unfinished node channel: " - << unfinished_node_channel_host_str.str(); - break; - } - bthread_usleep(1000 * 10); - } - } + status = index_channel->close_wait(_state, &writer_stats, &node_add_batch_counter_map); // Due to the non-determinism of compaction, the rowsets of each replica may be different from each other on different // BE nodes. The number of rows filtered in SegmentWriter depends on the historical rowsets located in the correspoding diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index 025341ad46966b..8b285278a93a36 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -286,27 +286,31 @@ class VNodeChannel { // two ways to stop channel: // 1. mark_close()->close_wait() PS. close_wait() will block waiting for the last AddBatch rpc response. // 2. just cancel() - Status close_wait(RuntimeState* state); + Status close_wait(RuntimeState* state, bool* is_closed); - bool close_finish(RuntimeState* state); - - Status after_close_handle(RuntimeState* state); + Status after_close_handle( + RuntimeState* state, WriterStats* writer_stats, + std::unordered_map* node_add_batch_counter_map); void cancel(const std::string& cancel_msg); void time_report(std::unordered_map* add_batch_counter_map, WriterStats* writer_stats) const { - (*add_batch_counter_map)[_node_id] += _add_batch_counter; - (*add_batch_counter_map)[_node_id].close_wait_time_ms = _close_time_ms; - writer_stats->serialize_batch_ns += _serialize_batch_ns; - writer_stats->channel_stat += _stat; - writer_stats->queue_push_lock_ns += _queue_push_lock_ns; - writer_stats->actual_consume_ns += _actual_consume_ns; - writer_stats->total_add_batch_exec_time_ns += - (_add_batch_counter.add_batch_execution_time_us * 1000); - writer_stats->total_wait_exec_time_ns += - (_add_batch_counter.add_batch_wait_execution_time_us * 1000); - writer_stats->total_add_batch_num += _add_batch_counter.add_batch_num; + if (add_batch_counter_map != nullptr) { + (*add_batch_counter_map)[_node_id] += _add_batch_counter; + (*add_batch_counter_map)[_node_id].close_wait_time_ms = _close_time_ms; + } + if (writer_stats != nullptr) { + writer_stats->serialize_batch_ns += _serialize_batch_ns; + writer_stats->channel_stat += _stat; + writer_stats->queue_push_lock_ns += _queue_push_lock_ns; + writer_stats->actual_consume_ns += _actual_consume_ns; + writer_stats->total_add_batch_exec_time_ns += + (_add_batch_counter.add_batch_execution_time_us * 1000); + writer_stats->total_wait_exec_time_ns += + (_add_batch_counter.add_batch_wait_execution_time_us * 1000); + writer_stats->total_add_batch_num += _add_batch_counter.add_batch_num; + } } int64_t node_id() const { return _node_id; } @@ -471,15 +475,19 @@ class IndexChannel { int64_t tablet_id = -1); Status check_intolerable_failure(); + Status close_wait(RuntimeState* state, WriterStats* writer_stats, + std::unordered_map* node_add_batch_counter_map); + + Status check_each_node_channel_close( + std::unordered_set* unfinished_node_channel_ids, + std::unordered_map* node_add_batch_counter_map, + WriterStats* writer_stats, Status status); + bool quorum_success(); // set error tablet info in runtime state, so that it can be returned to FE. void set_error_tablet_in_state(RuntimeState* state); - std::unordered_map> get_node_channels() { - return _node_channels; - } - size_t num_node_channels() const { return _node_channels.size(); } size_t get_pending_bytes() const { @@ -524,8 +532,6 @@ class IndexChannel { void update_write_tablets(const int64_t& tablet_id) { _write_tablets.insert(tablet_id); } void set_node_channel_speed(const int64_t& node_id, const int64_t& speed) { - LOG(INFO) << "set_node_channel_speed" - << ", node_id: " << node_id << ", speed: " << speed; _node_channel_speed[node_id] = speed; } From d3dc7cbf0b27707ba0af1200b671d6bbab622df9 Mon Sep 17 00:00:00 2001 From: laihui Date: Mon, 9 Jun 2025 14:21:32 +0800 Subject: [PATCH 5/5] update --- be/src/vec/sink/writer/vtablet_writer.cpp | 56 ++++++++++++++------ be/src/vec/sink/writer/vtablet_writer.h | 27 ++-------- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 1 - 3 files changed, 43 insertions(+), 41 deletions(-) diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 161ef115d4acbd..ce1c5a463bb90c 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -308,8 +308,8 @@ Status IndexChannel::close_wait( } // wait quorum success while (true) { - status =check_each_node_channel_close(&unfinished_node_channel_ids, - node_add_batch_counter_map, writer_stats, status); + status = check_each_node_channel_close(&unfinished_node_channel_ids, + node_add_batch_counter_map, writer_stats, status); if (!status.ok() || unfinished_node_channel_ids.empty() || quorum_success()) { break; } @@ -321,21 +321,14 @@ Status IndexChannel::close_wait( // or remaining time is less than load_timeout_remaining_seconds, // cancel unfinished node channel. if (status.ok() && !unfinished_node_channel_ids.empty()) { - int64_t max_wait_time_ms = 0; - double avg_speed = calculate_avg_node_channel_speed(); - for (auto& id : unfinished_node_channel_ids) { - max_wait_time_ms = std::max( - max_wait_time_ms, - static_cast(static_cast(get_node_channel_write_bytes(id)) / - avg_speed * (1 + config::max_wait_time_multiplier))); - } + int64_t max_wait_time_ms = _max_wait_time_ms(unfinished_node_channel_ids); while (true) { - status =check_each_node_channel_close( + status = check_each_node_channel_close( &unfinished_node_channel_ids, node_add_batch_counter_map, writer_stats, status); if (unfinished_node_channel_ids.empty() || !status.ok()) { break; } - int64_t load_time_ms = UnixMillis() - get_start_time(); + int64_t load_time_ms = UnixMillis() - _start_time; if (load_time_ms > max_wait_time_ms || _parent->_load_channel_timeout_s - load_time_ms < config::load_timeout_remaining_seconds * 1000) { @@ -376,14 +369,14 @@ Status IndexChannel::check_each_node_channel_close( node_add_batch_counter_map); } if (!s.ok()) { - status = cancel_channel_and_check_intolerable_failure(std::move(status), s.to_string(), + status = cancel_channel_and_check_intolerable_failure(std::move(s), s.to_string(), *this, *it.second); } } return status; } -bool IndexChannel::quorum_success() { +bool IndexChannel::_quorum_success() { if (_write_tablets.empty()) { return false; } @@ -402,6 +395,38 @@ bool IndexChannel::quorum_success() { return true; } +int64_t IndexChannel::_max_wait_time_ms( + const std::unordered_set& unfinished_node_channel_ids) { + int64_t elapsed_ms = UnixMillis() - _start_time; + if (elapsed_ms <= 0) { + return 0; + } + + int64_t total_bytes = 0; + for (const auto& [_, bytes] : _node_channel_write_bytes) { + total_bytes += bytes; + } + if (total_bytes <= 0) { + DCHECK(false) << "total_bytes is 0"; + return 0; + } + + double avg_speed = + static_cast(total_bytes) / static_cast(elapsed_ms); // bytes/ms + double multiplier = 1.0 + config::max_wait_time_multiplier; + + int64_t max_wait = 0; + for (int64_t id : unfinished_node_channel_ids) { + int64_t bytes = _node_channel_write_bytes[id]; + int64_t wait = + avg_speed > 0 + ? static_cast(static_cast(bytes) / avg_speed * multiplier) + : 0; + max_wait = std::max(max_wait, wait); + } + return max_wait; +} + static Status none_of(std::initializer_list vars) { bool none = std::none_of(vars.begin(), vars.end(), [](bool var) { return var; }); Status st = Status::OK(); @@ -1123,9 +1148,6 @@ Status VNodeChannel::after_close_handle( _index_channel->set_error_tablet_in_state(state); _index_channel->set_tablets_received_rows(_tablets_received_rows, _node_id); _index_channel->set_tablets_filtered_rows(_tablets_filtered_rows, _node_id); - _index_channel->set_node_channel_speed( - _node_id, _index_channel->get_node_channel_write_bytes(_node_id) / - (UnixMillis() - _index_channel->get_start_time())); std::lock_guard l(_closed_lock); // only when normal close, we set _is_closed to true. diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index 8b285278a93a36..f2c5bf9dc8e022 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -483,8 +483,6 @@ class IndexChannel { std::unordered_map* node_add_batch_counter_map, WriterStats* writer_stats, Status status); - bool quorum_success(); - // set error tablet info in runtime state, so that it can be returned to FE. void set_error_tablet_in_state(RuntimeState* state); @@ -519,31 +517,12 @@ class IndexChannel { void set_start_time(const int64_t& start_time) { _start_time = start_time; } - int64_t get_start_time() const { return _start_time; } - void update_node_channel_write_bytes(const int64_t& node_id, const int64_t& bytes) { _node_channel_write_bytes[node_id] += bytes; } - int64_t get_node_channel_write_bytes(const int64_t& node_id) { - return _node_channel_write_bytes[node_id]; - } - void update_write_tablets(const int64_t& tablet_id) { _write_tablets.insert(tablet_id); } - void set_node_channel_speed(const int64_t& node_id, const int64_t& speed) { - _node_channel_speed[node_id] = speed; - } - - double calculate_avg_node_channel_speed() { - DCHECK(!_node_channel_speed.empty()) << "node_channel_speed is empty"; - double total_speed = 0; - for (const auto& [node_id, speed] : _node_channel_speed) { - total_speed += speed; - } - return total_speed / _node_channel_speed.size(); - } - vectorized::VExprContextSPtr get_where_clause() { return _where_clause; } int64_t get_index_id() const { return _index_id; } @@ -553,6 +532,10 @@ class IndexChannel { friend class VTabletWriter; friend class VRowDistribution; + bool _quorum_success(); + + int64_t _max_wait_time_ms(const std::unordered_set& unfinished_node_channel_ids); + VTabletWriter* _parent = nullptr; int64_t _index_id; vectorized::VExprContextSPtr _where_clause; @@ -590,8 +573,6 @@ class IndexChannel { // key is node_id, value is the bytes written by this node std::unordered_map _node_channel_write_bytes; int64_t _start_time = 0; - // key is node_id, value is the speed of this node(bytes/s) - std::unordered_map _node_channel_speed; std::set _write_tablets; }; } // namespace vectorized diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 4a3bdfd138371d..5ca95cb82398b8 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -735,7 +735,6 @@ Status VTabletWriterV2::_close_wait(bool incremental) { } } Status status; - // First wait for quorum success while (true) { RETURN_IF_ERROR(_check_timeout()); RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, streams_for_node));