Skip to content

Bump Zenoh to v1.3.2 and improve e2e reliability with HeartbeatSporadic #591

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,42 @@
// },
// },
// ],
// /// Overwrite QoS options for messages sent and received from/to the network
// /// This allows more fine grained rules (per network card, etc...) but is
// /// less performant than the publication option above.
// network: [
// {
// /// Optional Id, has to be unique.
// id: "lo0_en0_qos_overwrite",
// // Optional list of interfaces, if not specified, will be applied to all interfaces.
// interfaces: [
// "lo0",
// "en0",
// ],
// /// Optional list of link protocols. Transports with at least one of these links will have their qos overwritten.
// /// If absent, the overwrite will be applied to all transports. An empty list is invalid.
// link_protocols: [ "tcp", "udp", "tls", "quic", "ws", "serial", "unixsock-stream", "unixpipe", "vsock"],
// /// List of message types to apply to.
// messages: [
// "put", // put publications
// "delete" // delete publications
// "query", // get queries
// "reply", // replies to queries
// ],
// /// Optional list of data flows messages will be processed on ("egress" and/or "ingress").
// /// If absent, the rules will be applied to both flows.
// flows: ["egress", "ingress"],
// key_exprs: ["test/demo"],
// overwrite: {
// /// Optional new priority value, if not specified priority of the messages will stay unchanged.
// priority: "real_time",
// /// Optional new congestion control value, if not specified congestion control of the messages will stay unchanged.
// congestion_control: "block",
// /// Optional new express value, if not specified express flag of the messages will stay unchanged.
// express: true
// },
// },
// ],
// },

// /// The declarations aggregation strategy.
Expand Down Expand Up @@ -287,8 +323,11 @@
// /// Optional Id, has to be unique
// "id": "wlan0egress",
// /// Optional list of network interfaces messages will be processed on, the rest will be passed as is.
// /// If absent, the rules will be applied to all interfaces, in case of an empty list it means that they will not be applied to any.
// /// If absent, the rules will be applied to all interfaces. An empty list is invalid.
// interfaces: [ "wlan0" ],
// /// Optional list of link protocols. Transports with at least one of these links will have their messages filtered.
// /// If absent, the rules will be applied to all transports. An empty list is invalid.
// link_protocols: [ "tcp", "udp", "tls", "quic", "ws", "serial", "unixsock-stream", "unixpipe", "vsock"],
// /// Optional list of data flows messages will be processed on ("egress" and/or "ingress").
// /// If absent, the rules will be applied to both flows.
// flow: ["ingress", "egress"],
Expand Down Expand Up @@ -387,6 +426,12 @@
// "id": "subject3",
// /// An empty subject combination is a wildcard
// },
// {
// "id": "subject4",
// /// link protocols can also be used to identify transports to filter messages on.
// /// If absent, the rules will be applied to all transports. An empty list is invalid.
// link_protocols: [ "tcp", "udp", "tls", "quic", "ws", "serial", "unixsock-stream", "unixpipe", "vsock"],
// },
// ],
// /// The policies list associates rules to subjects
// "policies":
Expand All @@ -401,7 +446,7 @@
// },
// {
// "rules": ["rule2"],
// "subjects": ["subject3"],
// "subjects": ["subject3", "subject4"],
// },
// ]
//},
Expand Down
49 changes: 47 additions & 2 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,42 @@
// },
// },
// ],
// /// Overwrite QoS options for messages sent and received from/to the network
// /// This allows more fine grained rules (per network card, etc...) but is
// /// less performant than the publication option above.
// network: [
// {
// /// Optional Id, has to be unique.
// id: "lo0_en0_qos_overwrite",
// // Optional list of interfaces, if not specified, will be applied to all interfaces.
// interfaces: [
// "lo0",
// "en0",
// ],
// /// Optional list of link protocols. Transports with at least one of these links will have their qos overwritten.
// /// If absent, the overwrite will be applied to all transports. An empty list is invalid.
// link_protocols: [ "tcp", "udp", "tls", "quic", "ws", "serial", "unixsock-stream", "unixpipe", "vsock"],
// /// List of message types to apply to.
// messages: [
// "put", // put publications
// "delete" // delete publications
// "query", // get queries
// "reply", // replies to queries
// ],
// /// Optional list of data flows messages will be processed on ("egress" and/or "ingress").
// /// If absent, the rules will be applied to both flows.
// flows: ["egress", "ingress"],
// key_exprs: ["test/demo"],
// overwrite: {
// /// Optional new priority value, if not specified priority of the messages will stay unchanged.
// priority: "real_time",
// /// Optional new congestion control value, if not specified congestion control of the messages will stay unchanged.
// congestion_control: "block",
// /// Optional new express value, if not specified express flag of the messages will stay unchanged.
// express: true
// },
// },
// ],
// },

// /// The declarations aggregation strategy.
Expand Down Expand Up @@ -295,8 +331,11 @@
// /// Optional Id, has to be unique
// "id": "wlan0egress",
// /// Optional list of network interfaces messages will be processed on, the rest will be passed as is.
// /// If absent, the rules will be applied to all interfaces, in case of an empty list it means that they will not be applied to any.
// /// If absent, the rules will be applied to all interfaces. An empty list is invalid.
// interfaces: [ "wlan0" ],
// /// Optional list of link protocols. Transports with at least one of these links will have their messages filtered.
// /// If absent, the rules will be applied to all transports. An empty list is invalid.
// link_protocols: [ "tcp", "udp", "tls", "quic", "ws", "serial", "unixsock-stream", "unixpipe", "vsock"],
// /// Optional list of data flows messages will be processed on ("egress" and/or "ingress").
// /// If absent, the rules will be applied to both flows.
// flow: ["ingress", "egress"],
Expand Down Expand Up @@ -395,6 +434,12 @@
// "id": "subject3",
// /// An empty subject combination is a wildcard
// },
// {
// "id": "subject4",
// /// link protocols can also be used to identify transports to filter messages on.
// /// If absent, the rules will be applied to all transports. An empty list is invalid.
// link_protocols: [ "tcp", "udp", "tls", "quic", "ws", "serial", "unixsock-stream", "unixpipe", "vsock"],
// },
// ],
// /// The policies list associates rules to subjects
// "policies":
Expand All @@ -409,7 +454,7 @@
// },
// {
// "rules": ["rule2"],
// "subjects": ["subject3"],
// "subjects": ["subject3", "subject4"],
// },
// ]
//},
Expand Down
12 changes: 12 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ namespace rmw_zenoh_cpp
// TODO(yuyuan): SHM, make this configurable
#define SHM_BUF_OK_SIZE 2621440

// Period (ms) of heartbeats sent for detection of lost samples
// by a RELIABLE + TRANSIENT_LOCAL Publisher
#define SAMPLE_MISS_DETECTION_HEARTBEAT_PERIOD 500

///=============================================================================
std::shared_ptr<PublisherData> PublisherData::make(
std::shared_ptr<zenoh::Session> session,
Expand Down Expand Up @@ -115,6 +119,14 @@ std::shared_ptr<PublisherData> PublisherData::make(
adv_pub_opts.publisher_detection = true;
adv_pub_opts.cache = AdvancedPublisherOptions::CacheOptions::create_default();
adv_pub_opts.cache->max_samples = adapted_qos_profile.depth;
if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
// If RELIABLE + TRANSIENT_LOCAL activate sample miss detection for subscriber
// to detect missed samples and retrieve those from the Publisher cache.
// HeartbeatSporadic is used to prevent excessive background traffic
adv_pub_opts.sample_miss_detection.emplace().heartbeat =
AdvancedPublisherOptions::SampleMissDetectionOptions::HeartbeatSporadic{
SAMPLE_MISS_DETECTION_HEARTBEAT_PERIOD};
}
}

zenoh::KeyExpr pub_ke(entity->topic_info()->topic_keyexpr_);
Expand Down
7 changes: 7 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,13 @@ bool SubscriptionData::init()
// Enable detection of late joiner publishers and query for their historical data.
adv_sub_opts.history->detect_late_publishers = true;
adv_sub_opts.history->max_samples = entity_->topic_info()->qos_.depth;
if (entity_->topic_info()->qos_.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
// Activate recovery of lost samples.
// This requires the Publisher to have sample_miss_detection configured,
// which is the case for a RELIABLE + TRANSIENT_LOCAL Publisher.
adv_sub_opts.recovery.emplace().last_sample_miss_detection =
AdvancedSubscriberOptions::RecoveryOptions::Heartbeat{};
}
}

std::weak_ptr<SubscriptionData> data_wp = shared_from_this();
Expand Down
39 changes: 18 additions & 21 deletions zenoh_cpp_vendor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,25 @@ find_package(ament_cmake_vendor_package REQUIRED)
set(ZENOHC_CARGO_FLAGS "--no-default-features$<SEMICOLON>--features=shared-memory zenoh/transport_compression zenoh/transport_tcp zenoh/transport_udp zenoh/transport_tls")

# Set VCS_VERSION to include latest changes from zenoh/zenoh-c/zenoh-cpp to benefit from:
# - Reword SHM warning log about "setting scheduling priority":
# - https://github.com/eclipse-zenoh/zenoh/pull/1778
# - Performances improvements at launch time:
# - https://github.com/eclipse-zenoh/zenoh/pull/1786
# - https://github.com/eclipse-zenoh/zenoh/pull/1789
# - https://github.com/eclipse-zenoh/zenoh/pull/1793
# - Fixed open timeout
# - https://github.com/eclipse-zenoh/zenoh/pull/1796
# - Improve ACL behaviour, notably for S-ROS
# - https://github.com/eclipse-zenoh/zenoh/pull/1781
# - https://github.com/eclipse-zenoh/zenoh/pull/1785
# - https://github.com/eclipse-zenoh/zenoh/pull/1795
# - https://github.com/eclipse-zenoh/zenoh/pull/1806
# - Reduce the number of threads in case of scouting
# - https://github.com/eclipse-zenoh/zenoh-c/pull/937
# - Namespace prefix support
# - https://github.com/eclipse-zenoh/zenoh/pull/1792
# - Fix debug mode crash
# - https://github.com/eclipse-zenoh/zenoh-cpp/pull/432
# - Fix a bug leading to invalid inapropriate "Unable to push non droppable network message" log and transport closure:
# - https://github.com/eclipse-zenoh/zenoh/pull/1855
# - Fix crash with highly chunked keys:
# - https://github.com/eclipse-zenoh/zenoh/pull/1826
# - Resolve issue with closing the Session in atexit:
# - https://github.com/eclipse-zenoh/zenoh/pull/1632
# - Change `Session::close()` implementation so it can be safely waited and awaited in `atexit``
# - https://github.com/eclipse-zenoh/zenoh/pull/1632
# - Add QoS overwrite interceptor allowing for instance a Router to be configured to change QoS on the fly
# - https://github.com/eclipse-zenoh/zenoh/pull/1825
# - Add link protocols as subject to interceptors (access_control, downsampling or qos overwrite):
# - https://github.com/eclipse-zenoh/zenoh/pull/1850
# - Add new non periodic last sample miss detection mechanism for Advanced Publisher:
# - https://github.com/eclipse-zenoh/zenoh/pull/1861
# - Improve tracing for better analysis on the system like rmw_zenoh
# - https://github.com/eclipse-zenoh/zenoh/pull/1844
ament_vendor(zenoh_c_vendor
VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git
VCS_VERSION e6a1971139f405f7887bf5bb54f0efe402123032
VCS_VERSION ffa4bddc947f7ed6c0e3b4546205dd1b73e7df81
CMAKE_ARGS
"-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}"
"-DZENOHC_BUILD_WITH_UNSTABLE_API=TRUE"
Expand All @@ -50,7 +47,7 @@ ament_export_dependencies(zenohc)

ament_vendor(zenoh_cpp_vendor
VCS_URL https://github.com/eclipse-zenoh/zenoh-cpp
VCS_VERSION 8ad67f6c7a9031acd437c8739bbc8ddab0ca8173
VCS_VERSION 868fdad0e7418e8f8cb96e94c89a3aed05905e63
CMAKE_ARGS
-DZENOHCXX_ZENOHC=OFF
)
Expand Down