Skip to content

Data races during bidirectional message exchange #2074

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
JonasHolleyHjo opened this issue Nov 2, 2023 · 2 comments
Closed

Data races during bidirectional message exchange #2074

JonasHolleyHjo opened this issue Nov 2, 2023 · 2 comments

Comments

@JonasHolleyHjo
Copy link

Required information

Operating system:
Ubuntu 18.04 via WSL (Windows 11)

Compiler version:
Clang TSan

Eclipse iceoryx version:
v2.0.3 (Blueberry), master branch

Observed result or behaviour:
During the message exchange between two processes with one publisher and one subscriber per process, the thread sanitizer reports data races. The listener is employed for receiving data.
Potentially related issues are #2046 and #1119.

Expected result or behaviour:
Bidirectional communication without concurrency issues.

Conditions where it occurred / Performed steps:
The issue can be reproduced with the code snippet below. Just let it run for a while (a restart of the application may be necessary) until the compiler reports data races.

Questions:
I have a few questions regarding the described problem:
Am I using iceoryx as intended (see the code snippet below)? More precisely, is it allowed to use one publisher and one subscriber per process (I haven't seen this situation in the iceoryx examples)?
Is there a way to fix or circumvent the issue?
Have you experienced any resultant problems?

Detailed description of the problem

Bidirectional communication using the listener leads to data races. The critical parts seem to be the loan() and take() routine of the publisher and the subscriber, respectively. Using a mutex and locking the sending & receiving part is unfortunately not possible, as the specific pub/sub options lead to a blocking behavior.

Here goes the TSan output:

==================
WARNING: ThreadSanitizer: data race (pid=241)
Write of size 4 at 0x7f6ac40712cc by thread T2 (mutexes: write M0):
#0 iox::concurrent::LoFFLi::push(unsigned int) /iceoryx/iceoryx_hoofs/source/concurrent/loffli.cpp:104:32
#1 iox::mepoo::MemPool::freeChunk(void const*) /iceoryx/iceoryx_posh/source/mepoo/mem_pool.cpp:106:24
#2 iox::mepoo::SharedChunk::freeChunk() /iceoryx/iceoryx_posh/source/mepoo/shared_chunk.cpp:65:47
#3 iox::mepoo::SharedChunk::decrementReferenceCounter() /iceoryx/iceoryx_posh/source/mepoo/shared_chunk.cpp:58:9
#4 iox::mepoo::SharedChunk::~SharedChunk() /iceoryx/iceoryx_posh/source/mepoo/shared_chunk.cpp:42:5
#5 iox::popo::ChunkReceiver<iox::popo::ChunkReceiverData<256u, iox::popo::ChunkQueueData<iox::DefaultChunkQueueConfig, iox::popo::ThreadSafePolicy>>>::release(iox::mepoo::ChunkHeader const*) /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_receiver.inl:108:1
#6 iox::popo::SubscriberPortUser::releaseChunk(iox::mepoo::ChunkHeader const*) /iceoryx/iceoryx_posh/source/popo/ports/subscriber_port_user.cpp:74:21
#7 void iox::popo::SampleDeleteriox::popo::SubscriberPortUser::operator()(RadarObject const*) /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/sample_deleter.inl:37:13
#8 iox::cxx::function_ref<void (RadarObject const*)>::function_ref<iox::popo::SampleDeleteriox::popo::SubscriberPortUser&, void>(iox::popo::SampleDeleteriox::popo::SubscriberPortUser&)::'lambda'(void*, RadarObject const*)::operator()(void*, RadarObject const*) const /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/function_ref.inl:34:16
#9 iox::cxx::function_ref<void (RadarObject const*)>::function_ref<iox::popo::SampleDeleteriox::popo::SubscriberPortUser&, void>(iox::popo::SampleDeleteriox::popo::SubscriberPortUser&)::'lambda'(void*, RadarObject const*)::__invoke(void*, RadarObject const*) /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/function_ref.inl:33:25
#10 iox::cxx::function_ref<void (RadarObject const*)>::operator()(RadarObject const*) const /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/function_ref.inl:80:12
#11 iox::cxx::unique_ptr::reset(RadarObject const*) /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/unique_ptr.inl:111:9
#12 iox::cxx::unique_ptr::~unique_ptr() /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/unique_ptr.inl:65:5
#13 iox::popo::internal::SmartChunkPrivateData<iox::popo::PublisherInterface<RadarObject const, iox::mepoo::NoUserHeader const>, RadarObject const, iox::mepoo::NoUserHeader const>::~SmartChunkPrivateData() /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/smart_chunk.hpp:60:38
#14 iox::popo::SmartChunk<iox::popo::PublisherInterface<RadarObject const, iox::mepoo::NoUserHeader const>, RadarObject const, iox::mepoo::NoUserHeader const>::~SmartChunk() /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/smart_chunk.hpp:104:36
#15 iox::popo::Sample<RadarObject const, iox::mepoo::NoUserHeader const>::~Sample() /iceoryx/iceoryx_posh/include/iceoryx_posh/popo/sample.hpp:37:7
#16 iox::cxx::internal::call_at_index<0ul, iox::popo::Sample<RadarObject const, iox::mepoo::NoUserHeader const>, iox::popo::ChunkReceiveResult>::destructor(unsigned long, unsigned char*) /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/variant_internal.hpp:101:41
#17 iox::cxx::variant<iox::popo::Sample<RadarObject const, iox::mepoo::NoUserHeader const>, iox::popo::ChunkReceiveResult>::call_element_destructor() /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/variant.inl:136:9
#18 iox::cxx::variant<iox::popo::Sample<RadarObject const, iox::mepoo::NoUserHeader const>, iox::popo::ChunkReceiveResult>::~variant() /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/internal/cxx/variant.inl:128:5
#19 iox::cxx::expected<iox::popo::Sample<RadarObject const, iox::mepoo::NoUserHeader const>, iox::popo::ChunkReceiveResult>::~expected() /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/cxx/expected.hpp:316:34
#20 Peer::onSampleReceivedCallback(iox::popo::Subscriber<RadarObject, iox::mepoo::NoUserHeader>) /home/Test_Iceoryx_DataRace.cpp:84:9
#21 iox::popo::internal::TranslateAndCallTypelessCallback<iox::popo::Subscriber<RadarObject, iox::mepoo::NoUserHeader>, iox::popo::internal::NoType_t>::call(void
, void*, void ()()) /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/notification_callback.inl:40:5
#22 iox::popo::internal::Event_t::executeCallback() /iceoryx/iceoryx_posh/source/popo/listener.cpp:48:5
#23 auto iox::popo::ListenerImpl<256ul>::threadLoop()::'lambda'(auto)::operator()(auto) const /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/listener.inl:168:79
#24 void iox::cxx::forEach<iox::cxx::vector<unsigned short, 256ul>, iox::popo::ListenerImpl<256ul>::threadLoop()::'lambda'(auto)>(auto&, iox::popo::ListenerImpl<256ul>::threadLoop()::'lambda'(auto) const&) /iceoryx/iceoryx_hoofs/include/iceoryx_hoofs/cxx/helplets.hpp:206:9
#25 iox::popo::ListenerImpl<256ul>::threadLoop() /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/listener.inl:168:9
#26 void std::__invoke_impl<void, void (iox::popo::ListenerImpl<256ul>::
)(), iox::popo::ListenerImpl<256ul>>(std::__invoke_memfun_deref, void (iox::popo::ListenerImpl<256ul>::&&)(), iox::popo::ListenerImpl<256ul>&&) /usr/local/bin/../lib64/gcc/x86_64-linux-gnu/12.2.0/../../../../include/c++/12.2.0/bits/invoke.h:74:14
#27 std::__invoke_result<void (iox::popo::ListenerImpl<256ul>::
)(), iox::popo::ListenerImpl<256ul>>::type std::__invoke<void (iox::popo::ListenerImpl<256ul>::)(), iox::popo::ListenerImpl<256ul>>(void (iox::popo::ListenerImpl<256ul>::&&)(), iox::popo::ListenerImpl<256ul>&&) /usr/local/bin/../lib64/gcc/x86_64-linux-gnu/12.2.0/../../../../include/c++/12.2.0/bits/invoke.h:96:14
#28 void std::thread::_Invoker<std::tuple<void (iox::popo::ListenerImpl<256ul>::
)(), iox::popo::ListenerImpl<256ul>>>::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/local/bin/../lib64/gcc/x86_64-linux-gnu/12.2.0/../../../../include/c++/12.2.0/bits/std_thread.h:252:13
#29 std::thread::_Invoker<std::tuple<void (iox::popo::ListenerImpl<256ul>::
)(), iox::popo::ListenerImpl<256ul>>>::operator()() /usr/local/bin/../lib64/gcc/x86_64-linux-gnu/12.2.0/../../../../include/c++/12.2.0/bits/std_thread.h:259:11
#30 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (iox::popo::ListenerImpl<256ul>::
)(), iox::popo::ListenerImpl<256ul>*>>>::_M_run() /usr/local/bin/../lib64/gcc/x86_64-linux-gnu/12.2.0/../../../../include/c++/12.2.0/bits/std_thread.h:210:13
#31 (libstdc++.so.6+0xdf202) (BuildId: c9fa50ce095a2e63a62fe03497f249dbe7dd0a7b)

Previous read of size 4 at 0x7f6ac40712cc by main thread:
#0 iox::concurrent::LoFFLi::pop(unsigned int&) /iceoryx/iceoryx_hoofs/source/concurrent/loffli.cpp:63:40
#1 iox::mepoo::MemPool::getChunk() /iceoryx/iceoryx_posh/source/mepoo/mem_pool.cpp:81:24
#2 iox::mepoo::MemoryManager::getChunk(iox::mepoo::ChunkSettings const&) /iceoryx/iceoryx_posh/source/mepoo/memory_manager.cpp:205:67
#3 iox::popo::ChunkSender<iox::popo::ChunkSenderData<8u, iox::popo::ChunkDistributorData<iox::DefaultChunkDistributorConfig, iox::popo::ThreadSafePolicy, iox::popo::ChunkQueuePusher<iox::popo::ChunkQueueData<iox::DefaultChunkQueueConfig, iox::popo::ThreadSafePolicy>>>>>::tryAllocate(iox::popo::UniquePortId, unsigned int, unsigned int, unsigned int, unsigned int) /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_sender.inl:144:58
#4 iox::popo::PublisherPortUser::tryAllocateChunk(unsigned int, unsigned int, unsigned int, unsigned int) /iceoryx/iceoryx_posh/source/popo/ports/publisher_port_user.cpp:47:26
#5 iox::popo::PublisherImpl<RadarObject, iox::mepoo::NoUserHeader, iox::popo::BasePublisheriox::popo::PublisherPortUser>::loanSample() /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/publisher_impl.inl:75:26
#6 iox::cxx::expected<iox::popo::Sample<RadarObject, iox::mepoo::NoUserHeader>, iox::popo::AllocationError> iox::popo::PublisherImpl<RadarObject, iox::mepoo::NoUserHeader, iox::popo::BasePublisheriox::popo::PublisherPortUser>::loan<>() /iceoryx/iceoryx_posh/include/iceoryx_posh/internal/popo/publisher_impl.inl:41:22
#7 Peer::SendMsg(double) /home/Test_Iceoryx_DataRace.cpp:67:20
#8 main /home/Test_Iceoryx_DataRace.cpp:145:14

Location is global '??' at 0x7f6ac2512000 (iceoryx_mgmt+0x1b5f2cc)

Mutex M0 (0x7ffe60026ff8) created at:
[...]

Thread T2 (tid=245, running) created by main thread at:
[...]

SUMMARY: ThreadSanitizer: data race /iceoryx/iceoryx_hoofs/source/concurrent/loffli.cpp:104:32 in iox::concurrent::LoFFLi::push(unsigned int)

==================

Test program

This small test program, based on the iceoryx examples icedelivery and callbacks, can be used to reproduce the data race. Start each instance with two command line arguments, reversed wrt the processes (for example ./Test_Iceoryx_DataRace One Two and ./Test_Iceoryx_DataRace Two One).

#include <iostream>

#include "iceoryx_hoofs/cxx/string.hpp"
#include "iceoryx_hoofs/posix_wrapper/signal_watcher.hpp"
#include "iceoryx_posh/runtime/posh_runtime.hpp"
#include "iceoryx_posh/popo/publisher.hpp"
#include "iceoryx_posh/popo/subscriber.hpp"
#include "iceoryx_posh/popo/listener.hpp"

constexpr const char GREEN_RIGHT_ARROW[] = "\033[32m->\033[m ";
constexpr const char ORANGE_LEFT_ARROW[] = "\033[33m<-\033[m ";

struct RadarObject
{
    RadarObject() noexcept {}
    RadarObject(double x, double y, double z) noexcept
        : x(x)
        , y(y)
        , z(z)
    {
    }
    double x = 0.0;
    double y = 0.0;
    double z = 0.0;
};

iox::popo::PublisherOptions GetPubOptions()
{
    iox::popo::PublisherOptions publisherOptions;

    publisherOptions.subscriberTooSlowPolicy = iox::popo::ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER;

    return publisherOptions;
}

iox::popo::SubscriberOptions GetSubOptions()
{
    iox::popo::SubscriberOptions subscriberOptions;

    subscriberOptions.queueCapacity = 10U;
    subscriberOptions.queueFullPolicy = iox::popo::QueueFullPolicy::BLOCK_PRODUCER;

    return subscriberOptions;
}

class Peer
{
public:
    Peer(iox::capro::IdString_t pubTopic, iox::capro::IdString_t subTopic)
        : _publisher({"Radar", pubTopic, "Object"}, GetPubOptions())
        , _subscriber({"Radar", subTopic, "Object"}, GetSubOptions())
    {
        _listener
            .attachEvent(_subscriber, iox::popo::SubscriberEvent::DATA_RECEIVED,
                         iox::popo::createNotificationCallback(onSampleReceivedCallback))
            .or_else([](auto) {
                std::cerr << "unable to attach subscriber" << std::endl;
                std::exit(EXIT_FAILURE);
            });
    }

    void SendMsg(double val)
    {
        _publisher.loan()
            .and_then([&](auto& sample) {
                sample->x = val;
                sample->y = val;
                sample->z = val;
                sample.publish();

                //std::cout << GREEN_RIGHT_ARROW << "Sent value: " << val << std::endl;
            })
            .or_else([](auto& error) {
                std::cerr << "Unable to loan sample, error: " << error << std::endl;
            });        
    }

private:
    static void onSampleReceivedCallback(iox::popo::Subscriber<RadarObject>* subscriber)
    {
        subscriber
            ->take()

            .and_then([/* subscriber*/](auto& sample) {
                //std::cout << ORANGE_LEFT_ARROW << "Received value: " << sample->x << std::endl;
            })

            .or_else([](auto& result) {
                if (result != iox::popo::ChunkReceiveResult::NO_CHUNK_AVAILABLE)
                {
                    std::cout << "Error receiving chunk." << std::endl;
                }
            });
    }

    iox::popo::Publisher<RadarObject> _publisher;
    iox::popo::Subscriber<RadarObject> _subscriber;
    iox::popo::Listener _listener;
};

int main(int argc, char* argv[])
{    
    // command line parsing, get peer name for topic construction
    try
    {
        if (argc != 3)
        {
            std::cout << "Only 2 command line arguments (PubSub topics) allowed." << std::endl;
            throw std::exception();
        }
        else
        {
            std::cout << "Publisher topic is: " << argv[1] << "." << std::endl;
            std::cout << "Subscriber topic is: " << argv[2] << "." << std::endl;
        }
    }
    catch (...)
    {
        return (EXIT_FAILURE);
    }

    // conversion char array to cxx::string
    iox::cxx::string<100> topicPub, topicSub;
    topicPub.unsafe_assign(argv[1]);
    topicSub.unsafe_assign(argv[2]);

    // initialize runtime
    iox::cxx::string<100> appName;
    appName.unsafe_append(topicPub);
    appName.unsafe_append(topicSub);
    iox::runtime::PoshRuntime::initRuntime(appName);

    // ctor launches listener
    Peer peer(topicPub, topicSub);

    double ct = 0.0;
        
    while (!iox::posix::hasTerminationRequested())
    {
        ++ct;
        double sampleValue = ct + 0;
        peer.SendMsg(sampleValue);
    }

    return (EXIT_SUCCESS);
}
@elBoberido
Copy link
Member

@VJonasHolley although tsan detects data races, the lock-free algorithms takes care to this. There are for example compare exchange loops to detect that there are no data races and if there was a race, the loop continues until a read was successful without a data race. Unfortunately, tsan has issues with lock-free programming and does not realize that the data is not used when a race is detected. So there is quite a high chance that the tsan output you showed is a false positive.

I had a look at the code and it looks sound. There is a tsan warning for this code is the push method of LoFFLi

do
{
    // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) index is limited by capacity
    m_nextFreeIndex[index] = oldHead.indexToNextFreeIndex;
    newHead.indexToNextFreeIndex = index;
     newHead.abaCounter += 1;
} while (!m_head.compare_exchange_weak(oldHead, newHead, std::memory_order_acq_rel, std::memory_order_acquire));

To be more precise, it is for m_nextFreeIndex[index] = oldHead.indexToNextFreeIndex;. This isn't an issue though. The slot at index is still owned by the thread which pushes to the container. Writing to that location is totally safe. The while-loop ensures that the memory is properly synchronized and not accessed after the ownership is passed to the container.

We did not yet look at the tsan warnings but we are quite confident that most, if not all, are false positives since our lock-free code already runs for quite a long time and we did not get bug reports for data races. At some point we need to look more closely at the warnings an suppress them if they are indeed false positives like the one I explained above.

What I'm more concerned of are not our lock-free data structures but more stuff like accidentally accessing members from a thread, e.g. with the Listener. Unfortunately, if there are such issues, they are currently buried in all the false positives from the lock-free data structures.

Your example looks good. I'm not quite sure what you mean by is it allowed to use one publisher and one subscriber per process. You can use any combination of Publisher and Subscriber from different processes or threads, but each process or thread must have it's own instance or if the same instance is used across threads, it must be guarded by a mutex.

This is possible

Publisher<Foo> pub1({"a", "b", "c"});
Publisher<Foo> pub2({"a", "b", "c"});

auto th1 = std::thread([&] {
    // use pub1
});
auto th2 = std::thread([&] {
    // use pub2
});

This is now allowed

Publisher<Foo> pub({"a", "b", "c"});

auto th1 = std::thread([&] {
    // use pub
});
auto th2 = std::thread([&] {
    // use pub
});

A publisher can transmit data across threads or to other processes and a subscriber can receive data from other threads or processes but an instance of a publisher or subscriber cannot be accessed from multiple threads.

@JonasHolleyHjo
Copy link
Author

Alright, thanks for your detailed answer!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants