Description
Required information
Operating system:
Ubuntu 18.04 via WSL (Windows 11)
Compiler version:
Clang TSan
Eclipse iceoryx version:
v2.0.3
(Blueberry), master
branch
Observed result or behaviour:
During the message exchange between two processes with one publisher and one subscriber per process, the thread sanitizer reports data races. The listener is employed for receiving data.
Potentially related issues are #2046 and #1119.
Expected result or behaviour:
Bidirectional communication without concurrency issues.
Conditions where it occurred / Performed steps:
The issue can be reproduced with the code snippet below. Just let it run for a while (a restart of the application may be necessary) until the compiler reports data races.
Questions:
I have a few questions regarding the described problem:
Am I using iceoryx as intended (see the code snippet below)? More precisely, is it allowed to use one publisher and one subscriber per process (I haven't seen this situation in the iceoryx examples)?
Is there a way to fix or circumvent the issue?
Have you experienced any resultant problems?
Detailed description of the problem
Bidirectional communication using the listener leads to data races. The critical parts seem to be the loan() and take() routine of the publisher and the subscriber, respectively. Using a mutex and locking the sending & receiving part is unfortunately not possible, as the specific pub/sub options lead to a blocking behavior.
Here goes the TSan output:
==================
WARNING: ThreadSanitizer: data race (pid=241)
Write of size 4 at 0x7f6ac40712cc by thread T2 (mutexes: write M0):
#0 iox::concurrent::LoFFLi::push(unsigned int) /iceoryx/iceoryx_hoofs/source/concurrent/loffli.cpp:104:32
#1 iox::mepoo::MemPool::freeChunk(void const*) /iceoryx/iceoryx_posh/source/mepoo/mem_pool.cpp:106:24
#2 iox::mepoo::SharedChunk::freeChunk() /iceoryx/iceoryx_posh/source/mepoo/shared_chunk.cpp:65:47
#3 iox::mepoo::SharedChunk::decrementReferenceCounter() /iceoryx/iceoryx_posh/source/mepoo/shared_chunk.cpp:58:9
#4 iox::mepoo::SharedChunk::~SharedChunk() /iceoryx/iceoryx_posh/source/mepoo/shared_chunk.cpp:42:5
#5 iox::popo::ChunkReceiver<iox::popo::ChunkReceiverData<256u, iox::popo::ChunkQueueData<iox::DefaultChunkQueueConfig, iox::popo::ThreadSafePolicy>>>::release(iox::mepoo::ChunkHeader const*) /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_receiver.inl:108:1
#6 iox::popo::SubscriberPortUser::releaseChunk(iox::mepoo::ChunkHeader const*) /iceoryx/iceoryx_posh/source/popo/ports/subscriber_port_user.cpp:74:21
#7 void iox::popo::SampleDeleteriox::popo::SubscriberPortUser::operator()(RadarObject const*) /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/sample_deleter.inl:37:13
#8 iox::cxx::function_ref<void (RadarObject const*)>::function_ref<iox::popo::SampleDeleteriox::popo::SubscriberPortUser&, void>(iox::popo::SampleDeleteriox::popo::SubscriberPortUser&)::'lambda'(void*, RadarObject const*)::operator()(void*, RadarObject const*) const /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/function_ref.inl:34:16
#9 iox::cxx::function_ref<void (RadarObject const*)>::function_ref<iox::popo::SampleDeleteriox::popo::SubscriberPortUser&, void>(iox::popo::SampleDeleteriox::popo::SubscriberPortUser&)::'lambda'(void*, RadarObject const*)::__invoke(void*, RadarObject const*) /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/function_ref.inl:33:25
#10 iox::cxx::function_ref<void (RadarObject const*)>::operator()(RadarObject const*) const /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/function_ref.inl:80:12
#11 iox::cxx::unique_ptr::reset(RadarObject const*) /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/unique_ptr.inl:111:9
#12 iox::cxx::unique_ptr::~unique_ptr() /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/unique_ptr.inl:65:5
#13 iox::popo::internal::SmartChunkPrivateData<iox::popo::PublisherInterface<RadarObject const, iox::mepoo::NoUserHeader const>, RadarObject const, iox::mepoo::NoUserHeader const>::~SmartChunkPrivateData() /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/smart_chunk.hpp:60:38
#14 iox::popo::SmartChunk<iox::popo::PublisherInterface<RadarObject const, iox::mepoo::NoUserHeader const>, RadarObject const, iox::mepoo::NoUserHeader const>::~SmartChunk() /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/smart_chunk.hpp:104:36
#15 iox::popo::Sample<RadarObject const, iox::mepoo::NoUserHeader const>::~Sample() /iceoryx/iceoryx_posh/include/iceoryx_posh/popo/sample.hpp:37:7
#16 iox::cxx::internal::call_at_index<0ul, iox::popo::Sample<RadarObject const, iox::mepoo::NoUserHeader const>, iox::popo::ChunkReceiveResult>::destructor(unsigned long, unsigned char*) /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/variant_internal.hpp:101:41
#17 iox::cxx::variant<iox::popo::Sample<RadarObject const, iox::mepoo::NoUserHeader const>, iox::popo::ChunkReceiveResult>::call_element_destructor() /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/variant.inl:136:9
#18 iox::cxx::variant<iox::popo::Sample<RadarObject const, iox::mepoo::NoUserHeader const>, iox::popo::ChunkReceiveResult>::~variant() /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/variant.inl:128:5
#19 iox::cxx::expected<iox::popo::Sample<RadarObject const, iox::mepoo::NoUserHeader const>, iox::popo::ChunkReceiveResult>::~expected() /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/cxx/expected.hpp:316:34
#20 Peer::onSampleReceivedCallback(iox::popo::Subscriber<RadarObject, iox::mepoo::NoUserHeader>) /home/Test_Iceoryx_DataRace.cpp:84:9
#21 iox::popo::internal::TranslateAndCallTypelessCallback<iox::popo::Subscriber<RadarObject, iox::mepoo::NoUserHeader>, iox::popo::internal::NoType_t>::call(void, void*, void ()()) /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/notification_callback.inl:40:5
#22 iox::popo::internal::Event_t::executeCallback() /iceoryx/iceoryx_posh/source/popo/listener.cpp:48:5
#23 auto iox::popo::ListenerImpl<256ul>::threadLoop()::'lambda'(auto)::operator()(auto) const /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/listener.inl:168:79
#24 void iox::cxx::forEach<iox::cxx::vector<unsigned short, 256ul>, iox::popo::ListenerImpl<256ul>::threadLoop()::'lambda'(auto)>(auto&, iox::popo::ListenerImpl<256ul>::threadLoop()::'lambda'(auto) const&) /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/cxx/helplets.hpp:206:9
#25 iox::popo::ListenerImpl<256ul>::threadLoop() /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/listener.inl:168:9
#26 void std::__invoke_impl<void, void (iox::popo::ListenerImpl<256ul>::)(), iox::popo::ListenerImpl<256ul>>(std::__invoke_memfun_deref, void (iox::popo::ListenerImpl<256ul>::&&)(), iox::popo::ListenerImpl<256ul>&&) /usr/local/bin/../lib64/gcc/x86_64-linux-gnu/12.2.0/../../../../include/c++/12.2.0/bits/invoke.h:74:14
#27 std::__invoke_result<void (iox::popo::ListenerImpl<256ul>::)(), iox::popo::ListenerImpl<256ul>>::type std::__invoke<void (iox::popo::ListenerImpl<256ul>::)(), iox::popo::ListenerImpl<256ul>>(void (iox::popo::ListenerImpl<256ul>::&&)(), iox::popo::ListenerImpl<256ul>&&) /usr/local/bin/../lib64/gcc/x86_64-linux-gnu/12.2.0/../../../../include/c++/12.2.0/bits/invoke.h:96:14
#28 void std::thread::_Invoker<std::tuple<void (iox::popo::ListenerImpl<256ul>::)(), iox::popo::ListenerImpl<256ul>>>::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/local/bin/../lib64/gcc/x86_64-linux-gnu/12.2.0/../../../../include/c++/12.2.0/bits/std_thread.h:252:13
#29 std::thread::_Invoker<std::tuple<void (iox::popo::ListenerImpl<256ul>::)(), iox::popo::ListenerImpl<256ul>>>::operator()() /usr/local/bin/../lib64/gcc/x86_64-linux-gnu/12.2.0/../../../../include/c++/12.2.0/bits/std_thread.h:259:11
#30 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (iox::popo::ListenerImpl<256ul>::)(), iox::popo::ListenerImpl<256ul>*>>>::_M_run() /usr/local/bin/../lib64/gcc/x86_64-linux-gnu/12.2.0/../../../../include/c++/12.2.0/bits/std_thread.h:210:13
#31 (libstdc++.so.6+0xdf202) (BuildId: c9fa50ce095a2e63a62fe03497f249dbe7dd0a7b)
Previous read of size 4 at 0x7f6ac40712cc by main thread:
#0 iox::concurrent::LoFFLi::pop(unsigned int&) /iceoryx/iceoryx_hoofs/source/concurrent/loffli.cpp:63:40
#1 iox::mepoo::MemPool::getChunk() /iceoryx/iceoryx_posh/source/mepoo/mem_pool.cpp:81:24
#2 iox::mepoo::MemoryManager::getChunk(iox::mepoo::ChunkSettings const&) /iceoryx/iceoryx_posh/source/mepoo/memory_manager.cpp:205:67
#3 iox::popo::ChunkSender<iox::popo::ChunkSenderData<8u, iox::popo::ChunkDistributorData<iox::DefaultChunkDistributorConfig, iox::popo::ThreadSafePolicy, iox::popo::ChunkQueuePusher<iox::popo::ChunkQueueData<iox::DefaultChunkQueueConfig, iox::popo::ThreadSafePolicy>>>>>::tryAllocate(iox::popo::UniquePortId, unsigned int, unsigned int, unsigned int, unsigned int) /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_sender.inl:144:58
#4 iox::popo::PublisherPortUser::tryAllocateChunk(unsigned int, unsigned int, unsigned int, unsigned int) /iceoryx/iceoryx_posh/source/popo/ports/publisher_port_user.cpp:47:26
#5 iox::popo::PublisherImpl<RadarObject, iox::mepoo::NoUserHeader, iox::popo::BasePublisheriox::popo::PublisherPortUser>::loanSample() /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/publisher_impl.inl:75:26
#6 iox::cxx::expected<iox::popo::Sample<RadarObject, iox::mepoo::NoUserHeader>, iox::popo::AllocationError> iox::popo::PublisherImpl<RadarObject, iox::mepoo::NoUserHeader, iox::popo::BasePublisheriox::popo::PublisherPortUser>::loan<>() /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/publisher_impl.inl:41:22
#7 Peer::SendMsg(double) /home/Test_Iceoryx_DataRace.cpp:67:20
#8 main /home/Test_Iceoryx_DataRace.cpp:145:14
Location is global '??' at 0x7f6ac2512000 (iceoryx_mgmt+0x1b5f2cc)
Mutex M0 (0x7ffe60026ff8) created at:
[...]
Thread T2 (tid=245, running) created by main thread at:
[...]
SUMMARY: ThreadSanitizer: data race /iceoryx/iceoryx_hoofs/source/concurrent/loffli.cpp:104:32 in iox::concurrent::LoFFLi::push(unsigned int)
==================
Test program
This small test program, based on the iceoryx examples icedelivery
and callbacks
, can be used to reproduce the data race. Start each instance with two command line arguments, reversed wrt the processes (for example ./Test_Iceoryx_DataRace One Two
and ./Test_Iceoryx_DataRace Two One
).
#include <iostream>
#include "iceoryx_hoofs/cxx/string.hpp"
#include "iceoryx_hoofs/posix_wrapper/signal_watcher.hpp"
#include "iceoryx_posh/runtime/posh_runtime.hpp"
#include "iceoryx_posh/popo/publisher.hpp"
#include "iceoryx_posh/popo/subscriber.hpp"
#include "iceoryx_posh/popo/listener.hpp"
constexpr const char GREEN_RIGHT_ARROW[] = "\033[32m->\033[m ";
constexpr const char ORANGE_LEFT_ARROW[] = "\033[33m<-\033[m ";
struct RadarObject
{
RadarObject() noexcept {}
RadarObject(double x, double y, double z) noexcept
: x(x)
, y(y)
, z(z)
{
}
double x = 0.0;
double y = 0.0;
double z = 0.0;
};
iox::popo::PublisherOptions GetPubOptions()
{
iox::popo::PublisherOptions publisherOptions;
publisherOptions.subscriberTooSlowPolicy = iox::popo::ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER;
return publisherOptions;
}
iox::popo::SubscriberOptions GetSubOptions()
{
iox::popo::SubscriberOptions subscriberOptions;
subscriberOptions.queueCapacity = 10U;
subscriberOptions.queueFullPolicy = iox::popo::QueueFullPolicy::BLOCK_PRODUCER;
return subscriberOptions;
}
class Peer
{
public:
Peer(iox::capro::IdString_t pubTopic, iox::capro::IdString_t subTopic)
: _publisher({"Radar", pubTopic, "Object"}, GetPubOptions())
, _subscriber({"Radar", subTopic, "Object"}, GetSubOptions())
{
_listener
.attachEvent(_subscriber, iox::popo::SubscriberEvent::DATA_RECEIVED,
iox::popo::createNotificationCallback(onSampleReceivedCallback))
.or_else([](auto) {
std::cerr << "unable to attach subscriber" << std::endl;
std::exit(EXIT_FAILURE);
});
}
void SendMsg(double val)
{
_publisher.loan()
.and_then([&](auto& sample) {
sample->x = val;
sample->y = val;
sample->z = val;
sample.publish();
//std::cout << GREEN_RIGHT_ARROW << "Sent value: " << val << std::endl;
})
.or_else([](auto& error) {
std::cerr << "Unable to loan sample, error: " << error << std::endl;
});
}
private:
static void onSampleReceivedCallback(iox::popo::Subscriber<RadarObject>* subscriber)
{
subscriber
->take()
.and_then([/* subscriber*/](auto& sample) {
//std::cout << ORANGE_LEFT_ARROW << "Received value: " << sample->x << std::endl;
})
.or_else([](auto& result) {
if (result != iox::popo::ChunkReceiveResult::NO_CHUNK_AVAILABLE)
{
std::cout << "Error receiving chunk." << std::endl;
}
});
}
iox::popo::Publisher<RadarObject> _publisher;
iox::popo::Subscriber<RadarObject> _subscriber;
iox::popo::Listener _listener;
};
int main(int argc, char* argv[])
{
// command line parsing, get peer name for topic construction
try
{
if (argc != 3)
{
std::cout << "Only 2 command line arguments (PubSub topics) allowed." << std::endl;
throw std::exception();
}
else
{
std::cout << "Publisher topic is: " << argv[1] << "." << std::endl;
std::cout << "Subscriber topic is: " << argv[2] << "." << std::endl;
}
}
catch (...)
{
return (EXIT_FAILURE);
}
// conversion char array to cxx::string
iox::cxx::string<100> topicPub, topicSub;
topicPub.unsafe_assign(argv[1]);
topicSub.unsafe_assign(argv[2]);
// initialize runtime
iox::cxx::string<100> appName;
appName.unsafe_append(topicPub);
appName.unsafe_append(topicSub);
iox::runtime::PoshRuntime::initRuntime(appName);
// ctor launches listener
Peer peer(topicPub, topicSub);
double ct = 0.0;
while (!iox::posix::hasTerminationRequested())
{
++ct;
double sampleValue = ct + 0;
peer.SendMsg(sampleValue);
}
return (EXIT_SUCCESS);
}