Skip to content

Data races during bidirectional message exchange #2074

Closed
@JonasHolleyHjo

Description

@JonasHolleyHjo

Required information

Operating system:
Ubuntu 18.04 via WSL (Windows 11)

Compiler version:
Clang TSan

Eclipse iceoryx version:
v2.0.3 (Blueberry), master branch

Observed result or behaviour:
During the message exchange between two processes with one publisher and one subscriber per process, the thread sanitizer reports data races. The listener is employed for receiving data.
Potentially related issues are #2046 and #1119.

Expected result or behaviour:
Bidirectional communication without concurrency issues.

Conditions where it occurred / Performed steps:
The issue can be reproduced with the code snippet below. Just let it run for a while (a restart of the application may be necessary) until the compiler reports data races.

Questions:
I have a few questions regarding the described problem:
Am I using iceoryx as intended (see the code snippet below)? More precisely, is it allowed to use one publisher and one subscriber per process (I haven't seen this situation in the iceoryx examples)?
Is there a way to fix or circumvent the issue?
Have you experienced any resultant problems?

Detailed description of the problem

Bidirectional communication using the listener leads to data races. The critical parts seem to be the loan() and take() routine of the publisher and the subscriber, respectively. Using a mutex and locking the sending & receiving part is unfortunately not possible, as the specific pub/sub options lead to a blocking behavior.

Here goes the TSan output:

==================
WARNING: ThreadSanitizer: data race (pid=241)
Write of size 4 at 0x7f6ac40712cc by thread T2 (mutexes: write M0):
#0 iox::concurrent::LoFFLi::push(unsigned int) /iceoryx/iceoryx_hoofs/source/concurrent/loffli.cpp:104:32
#1 iox::mepoo::MemPool::freeChunk(void const*) /iceoryx/iceoryx_posh/source/mepoo/mem_pool.cpp:106:24
#2 iox::mepoo::SharedChunk::freeChunk() /iceoryx/iceoryx_posh/source/mepoo/shared_chunk.cpp:65:47
#3 iox::mepoo::SharedChunk::decrementReferenceCounter() /iceoryx/iceoryx_posh/source/mepoo/shared_chunk.cpp:58:9
#4 iox::mepoo::SharedChunk::~SharedChunk() /iceoryx/iceoryx_posh/source/mepoo/shared_chunk.cpp:42:5
#5 iox::popo::ChunkReceiver<iox::popo::ChunkReceiverData<256u, iox::popo::ChunkQueueData<iox::DefaultChunkQueueConfig, iox::popo::ThreadSafePolicy>>>::release(iox::mepoo::ChunkHeader const*) /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_receiver.inl:108:1
#6 iox::popo::SubscriberPortUser::releaseChunk(iox::mepoo::ChunkHeader const*) /iceoryx/iceoryx_posh/source/popo/ports/subscriber_port_user.cpp:74:21
#7 void iox::popo::SampleDeleteriox::popo::SubscriberPortUser::operator()(RadarObject const*) /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/sample_deleter.inl:37:13
#8 iox::cxx::function_ref<void (RadarObject const*)>::function_ref<iox::popo::SampleDeleteriox::popo::SubscriberPortUser&, void>(iox::popo::SampleDeleteriox::popo::SubscriberPortUser&)::'lambda'(void*, RadarObject const*)::operator()(void*, RadarObject const*) const /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/function_ref.inl:34:16
#9 iox::cxx::function_ref<void (RadarObject const*)>::function_ref<iox::popo::SampleDeleteriox::popo::SubscriberPortUser&, void>(iox::popo::SampleDeleteriox::popo::SubscriberPortUser&)::'lambda'(void*, RadarObject const*)::__invoke(void*, RadarObject const*) /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/function_ref.inl:33:25
#10 iox::cxx::function_ref<void (RadarObject const*)>::operator()(RadarObject const*) const /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/function_ref.inl:80:12
#11 iox::cxx::unique_ptr::reset(RadarObject const*) /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/unique_ptr.inl:111:9
#12 iox::cxx::unique_ptr::~unique_ptr() /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/unique_ptr.inl:65:5
#13 iox::popo::internal::SmartChunkPrivateData<iox::popo::PublisherInterface<RadarObject const, iox::mepoo::NoUserHeader const>, RadarObject const, iox::mepoo::NoUserHeader const>::~SmartChunkPrivateData() /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/smart_chunk.hpp:60:38
#14 iox::popo::SmartChunk<iox::popo::PublisherInterface<RadarObject const, iox::mepoo::NoUserHeader const>, RadarObject const, iox::mepoo::NoUserHeader const>::~SmartChunk() /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/smart_chunk.hpp:104:36
#15 iox::popo::Sample<RadarObject const, iox::mepoo::NoUserHeader const>::~Sample() /iceoryx/iceoryx_posh/include/iceoryx_posh/popo/sample.hpp:37:7
#16 iox::cxx::internal::call_at_index<0ul, iox::popo::Sample<RadarObject const, iox::mepoo::NoUserHeader const>, iox::popo::ChunkReceiveResult>::destructor(unsigned long, unsigned char*) /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/variant_internal.hpp:101:41
#17 iox::cxx::variant<iox::popo::Sample<RadarObject const, iox::mepoo::NoUserHeader const>, iox::popo::ChunkReceiveResult>::call_element_destructor() /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/variant.inl:136:9
#18 iox::cxx::variant<iox::popo::Sample<RadarObject const, iox::mepoo::NoUserHeader const>, iox::popo::ChunkReceiveResult>::~variant() /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/variant.inl:128:5
#19 iox::cxx::expected<iox::popo::Sample<RadarObject const, iox::mepoo::NoUserHeader const>, iox::popo::ChunkReceiveResult>::~expected() /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/cxx/expected.hpp:316:34
#20 Peer::onSampleReceivedCallback(iox::popo::Subscriber<RadarObject, iox::mepoo::NoUserHeader>) /home/Test_Iceoryx_DataRace.cpp:84:9
#21 iox::popo::internal::TranslateAndCallTypelessCallback<iox::popo::Subscriber<RadarObject, iox::mepoo::NoUserHeader>, iox::popo::internal::NoType_t>::call(void
, void*, void ()()) /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/notification_callback.inl:40:5
#22 iox::popo::internal::Event_t::executeCallback() /iceoryx/iceoryx_posh/source/popo/listener.cpp:48:5
#23 auto iox::popo::ListenerImpl<256ul>::threadLoop()::'lambda'(auto)::operator()(auto) const /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/listener.inl:168:79
#24 void iox::cxx::forEach<iox::cxx::vector<unsigned short, 256ul>, iox::popo::ListenerImpl<256ul>::threadLoop()::'lambda'(auto)>(auto&, iox::popo::ListenerImpl<256ul>::threadLoop()::'lambda'(auto) const&) /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/cxx/helplets.hpp:206:9
#25 iox::popo::ListenerImpl<256ul>::threadLoop() /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/listener.inl:168:9
#26 void std::__invoke_impl<void, void (iox::popo::ListenerImpl<256ul>::
)(), iox::popo::ListenerImpl<256ul>>(std::__invoke_memfun_deref, void (iox::popo::ListenerImpl<256ul>::&&)(), iox::popo::ListenerImpl<256ul>&&) /usr/local/bin/../lib64/gcc/x86_64-linux-gnu/12.2.0/../../../../include/c++/12.2.0/bits/invoke.h:74:14
#27 std::__invoke_result<void (iox::popo::ListenerImpl<256ul>::
)(), iox::popo::ListenerImpl<256ul>>::type std::__invoke<void (iox::popo::ListenerImpl<256ul>::)(), iox::popo::ListenerImpl<256ul>>(void (iox::popo::ListenerImpl<256ul>::&&)(), iox::popo::ListenerImpl<256ul>&&) /usr/local/bin/../lib64/gcc/x86_64-linux-gnu/12.2.0/../../../../include/c++/12.2.0/bits/invoke.h:96:14
#28 void std::thread::_Invoker<std::tuple<void (iox::popo::ListenerImpl<256ul>::
)(), iox::popo::ListenerImpl<256ul>>>::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/local/bin/../lib64/gcc/x86_64-linux-gnu/12.2.0/../../../../include/c++/12.2.0/bits/std_thread.h:252:13
#29 std::thread::_Invoker<std::tuple<void (iox::popo::ListenerImpl<256ul>::
)(), iox::popo::ListenerImpl<256ul>>>::operator()() /usr/local/bin/../lib64/gcc/x86_64-linux-gnu/12.2.0/../../../../include/c++/12.2.0/bits/std_thread.h:259:11
#30 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (iox::popo::ListenerImpl<256ul>::
)(), iox::popo::ListenerImpl<256ul>*>>>::_M_run() /usr/local/bin/../lib64/gcc/x86_64-linux-gnu/12.2.0/../../../../include/c++/12.2.0/bits/std_thread.h:210:13
#31 (libstdc++.so.6+0xdf202) (BuildId: c9fa50ce095a2e63a62fe03497f249dbe7dd0a7b)

Previous read of size 4 at 0x7f6ac40712cc by main thread:
#0 iox::concurrent::LoFFLi::pop(unsigned int&) /iceoryx/iceoryx_hoofs/source/concurrent/loffli.cpp:63:40
#1 iox::mepoo::MemPool::getChunk() /iceoryx/iceoryx_posh/source/mepoo/mem_pool.cpp:81:24
#2 iox::mepoo::MemoryManager::getChunk(iox::mepoo::ChunkSettings const&) /iceoryx/iceoryx_posh/source/mepoo/memory_manager.cpp:205:67
#3 iox::popo::ChunkSender<iox::popo::ChunkSenderData<8u, iox::popo::ChunkDistributorData<iox::DefaultChunkDistributorConfig, iox::popo::ThreadSafePolicy, iox::popo::ChunkQueuePusher<iox::popo::ChunkQueueData<iox::DefaultChunkQueueConfig, iox::popo::ThreadSafePolicy>>>>>::tryAllocate(iox::popo::UniquePortId, unsigned int, unsigned int, unsigned int, unsigned int) /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_sender.inl:144:58
#4 iox::popo::PublisherPortUser::tryAllocateChunk(unsigned int, unsigned int, unsigned int, unsigned int) /iceoryx/iceoryx_posh/source/popo/ports/publisher_port_user.cpp:47:26
#5 iox::popo::PublisherImpl<RadarObject, iox::mepoo::NoUserHeader, iox::popo::BasePublisheriox::popo::PublisherPortUser>::loanSample() /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/publisher_impl.inl:75:26
#6 iox::cxx::expected<iox::popo::Sample<RadarObject, iox::mepoo::NoUserHeader>, iox::popo::AllocationError> iox::popo::PublisherImpl<RadarObject, iox::mepoo::NoUserHeader, iox::popo::BasePublisheriox::popo::PublisherPortUser>::loan<>() /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/publisher_impl.inl:41:22
#7 Peer::SendMsg(double) /home/Test_Iceoryx_DataRace.cpp:67:20
#8 main /home/Test_Iceoryx_DataRace.cpp:145:14

Location is global '??' at 0x7f6ac2512000 (iceoryx_mgmt+0x1b5f2cc)

Mutex M0 (0x7ffe60026ff8) created at:
[...]

Thread T2 (tid=245, running) created by main thread at:
[...]

SUMMARY: ThreadSanitizer: data race /iceoryx/iceoryx_hoofs/source/concurrent/loffli.cpp:104:32 in iox::concurrent::LoFFLi::push(unsigned int)

==================

Test program

This small test program, based on the iceoryx examples icedelivery and callbacks, can be used to reproduce the data race. Start each instance with two command line arguments, reversed wrt the processes (for example ./Test_Iceoryx_DataRace One Two and ./Test_Iceoryx_DataRace Two One).

#include <iostream>

#include "iceoryx_hoofs/cxx/string.hpp"
#include "iceoryx_hoofs/posix_wrapper/signal_watcher.hpp"
#include "iceoryx_posh/runtime/posh_runtime.hpp"
#include "iceoryx_posh/popo/publisher.hpp"
#include "iceoryx_posh/popo/subscriber.hpp"
#include "iceoryx_posh/popo/listener.hpp"

constexpr const char GREEN_RIGHT_ARROW[] = "\033[32m->\033[m ";
constexpr const char ORANGE_LEFT_ARROW[] = "\033[33m<-\033[m ";

struct RadarObject
{
    RadarObject() noexcept {}
    RadarObject(double x, double y, double z) noexcept
        : x(x)
        , y(y)
        , z(z)
    {
    }
    double x = 0.0;
    double y = 0.0;
    double z = 0.0;
};

iox::popo::PublisherOptions GetPubOptions()
{
    iox::popo::PublisherOptions publisherOptions;

    publisherOptions.subscriberTooSlowPolicy = iox::popo::ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER;

    return publisherOptions;
}

iox::popo::SubscriberOptions GetSubOptions()
{
    iox::popo::SubscriberOptions subscriberOptions;

    subscriberOptions.queueCapacity = 10U;
    subscriberOptions.queueFullPolicy = iox::popo::QueueFullPolicy::BLOCK_PRODUCER;

    return subscriberOptions;
}

class Peer
{
public:
    Peer(iox::capro::IdString_t pubTopic, iox::capro::IdString_t subTopic)
        : _publisher({"Radar", pubTopic, "Object"}, GetPubOptions())
        , _subscriber({"Radar", subTopic, "Object"}, GetSubOptions())
    {
        _listener
            .attachEvent(_subscriber, iox::popo::SubscriberEvent::DATA_RECEIVED,
                         iox::popo::createNotificationCallback(onSampleReceivedCallback))
            .or_else([](auto) {
                std::cerr << "unable to attach subscriber" << std::endl;
                std::exit(EXIT_FAILURE);
            });
    }

    void SendMsg(double val)
    {
        _publisher.loan()
            .and_then([&](auto& sample) {
                sample->x = val;
                sample->y = val;
                sample->z = val;
                sample.publish();

                //std::cout << GREEN_RIGHT_ARROW << "Sent value: " << val << std::endl;
            })
            .or_else([](auto& error) {
                std::cerr << "Unable to loan sample, error: " << error << std::endl;
            });        
    }

private:
    static void onSampleReceivedCallback(iox::popo::Subscriber<RadarObject>* subscriber)
    {
        subscriber
            ->take()

            .and_then([/* subscriber*/](auto& sample) {
                //std::cout << ORANGE_LEFT_ARROW << "Received value: " << sample->x << std::endl;
            })

            .or_else([](auto& result) {
                if (result != iox::popo::ChunkReceiveResult::NO_CHUNK_AVAILABLE)
                {
                    std::cout << "Error receiving chunk." << std::endl;
                }
            });
    }

    iox::popo::Publisher<RadarObject> _publisher;
    iox::popo::Subscriber<RadarObject> _subscriber;
    iox::popo::Listener _listener;
};

int main(int argc, char* argv[])
{    
    // command line parsing, get peer name for topic construction
    try
    {
        if (argc != 3)
        {
            std::cout << "Only 2 command line arguments (PubSub topics) allowed." << std::endl;
            throw std::exception();
        }
        else
        {
            std::cout << "Publisher topic is: " << argv[1] << "." << std::endl;
            std::cout << "Subscriber topic is: " << argv[2] << "." << std::endl;
        }
    }
    catch (...)
    {
        return (EXIT_FAILURE);
    }

    // conversion char array to cxx::string
    iox::cxx::string<100> topicPub, topicSub;
    topicPub.unsafe_assign(argv[1]);
    topicSub.unsafe_assign(argv[2]);

    // initialize runtime
    iox::cxx::string<100> appName;
    appName.unsafe_append(topicPub);
    appName.unsafe_append(topicSub);
    iox::runtime::PoshRuntime::initRuntime(appName);

    // ctor launches listener
    Peer peer(topicPub, topicSub);

    double ct = 0.0;
        
    while (!iox::posix::hasTerminationRequested())
    {
        ++ct;
        double sampleValue = ct + 0;
        peer.SendMsg(sampleValue);
    }

    return (EXIT_SUCCESS);
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions