Skip to content

Commit 548237c

Browse files
committed
Update to latest DelegateMQ library
1 parent a3bc7dd commit 548237c

File tree

15 files changed

+410
-184
lines changed

15 files changed

+410
-184
lines changed

DelegateMQ/DelegateMQ.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656

5757
#include "delegate/DelegateOpt.h"
5858
#include "delegate/MulticastDelegateSafe.h"
59-
#include "delegate/UnicastDelegate.h"
59+
#include "delegate/UnicastDelegateSafe.h"
6060
#include "delegate/DelegateAsync.h"
6161
#include "delegate/DelegateAsyncWait.h"
6262
#include "delegate/DelegateRemote.h"

DelegateMQ/delegate/DelegateAsyncWait.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ class DelegateFreeAsyncWait<RetType(Args...)> : public DelegateFree<RetType(Args
350350
return GetRetVal();
351351
} else {
352352
// Return a default return value
353-
return RetType{};
353+
return RetType();
354354
}
355355
}
356356
}
@@ -427,7 +427,7 @@ class DelegateFreeAsyncWait<RetType(Args...)> : public DelegateFree<RetType(Args
427427
return std::any_cast<RetType>(m_retVal);
428428
}
429429
catch (const std::bad_any_cast&) {
430-
return RetType{}; // Return a default value if error
430+
return RetType(); // Return a default value if error
431431
}
432432
}
433433

@@ -753,7 +753,7 @@ class DelegateMemberAsyncWait<TClass, RetType(Args...)> : public DelegateMember<
753753
return GetRetVal();
754754
} else {
755755
// Return a default return value
756-
return RetType{};
756+
return RetType();
757757
}
758758
}
759759
}
@@ -830,7 +830,7 @@ class DelegateMemberAsyncWait<TClass, RetType(Args...)> : public DelegateMember<
830830
return std::any_cast<RetType>(m_retVal);
831831
}
832832
catch (const std::bad_any_cast&) {
833-
return RetType{}; // Return a default value if error
833+
return RetType(); // Return a default value if error
834834
}
835835
}
836836

@@ -1075,7 +1075,7 @@ class DelegateFunctionAsyncWait<RetType(Args...)> : public DelegateFunction<RetT
10751075
return GetRetVal();
10761076
} else {
10771077
// Return a default return value
1078-
return RetType{};
1078+
return RetType();
10791079
}
10801080
}
10811081
}
@@ -1152,7 +1152,7 @@ class DelegateFunctionAsyncWait<RetType(Args...)> : public DelegateFunction<RetT
11521152
return std::any_cast<RetType>(m_retVal);
11531153
}
11541154
catch (const std::bad_any_cast&) {
1155-
return RetType{}; // Return a default value if error
1155+
return RetType(); // Return a default value if error
11561156
}
11571157
}
11581158

DelegateMQ/delegate/ISerializer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class ISerializer<RetType(Args...)>
2626
/// data for transport.
2727
/// @param[out] os The output stream
2828
/// @param[in] args The target function arguments
29-
/// @return The input stream
29+
/// @return The output stream
3030
virtual std::ostream& Write(std::ostream& os, Args... args) = 0;
3131

3232
/// Inheriting class implements the read function to unserialize data

DelegateMQ/delegate/MulticastDelegateSafe.h

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,16 @@ class MulticastDelegateSafe<RetType(Args...)> : public MulticastDelegate<RetType
2323

2424
MulticastDelegateSafe() = default;
2525
~MulticastDelegateSafe() = default;
26-
MulticastDelegateSafe(const MulticastDelegateSafe& rhs) = delete;
27-
MulticastDelegateSafe(MulticastDelegateSafe&& rhs) = delete;
26+
27+
MulticastDelegateSafe(const MulticastDelegateSafe& rhs) : BaseType() {
28+
const std::lock_guard<std::mutex> lock(m_lock);
29+
BaseType::operator=(rhs);
30+
}
31+
32+
MulticastDelegateSafe(MulticastDelegateSafe&& rhs) : BaseType() {
33+
const std::lock_guard<std::mutex> lock(m_lock);
34+
BaseType::operator=(std::move(rhs));
35+
}
2836

2937
/// Invoke the bound target function for all stored delegate instances.
3038
/// A void return value is used since multiple targets invoked.
@@ -84,14 +92,14 @@ class MulticastDelegateSafe<RetType(Args...)> : public MulticastDelegate<RetType
8492
/// @return A reference to the current object.
8593
MulticastDelegateSafe& operator=(MulticastDelegateSafe&& rhs) noexcept {
8694
const std::lock_guard<std::mutex> lock(m_lock);
87-
BaseType::operator=(std::forward<MulticastDelegateSafe>(rhs));
95+
BaseType::operator=(std::move(rhs));
8896
return *this;
8997
}
9098

9199
/// @brief Clear the all target functions.
92100
virtual void operator=(std::nullptr_t) noexcept {
93101
const std::lock_guard<std::mutex> lock(m_lock);
94-
return BaseType::Clear();
102+
BaseType::Clear();
95103
}
96104

97105
/// Insert a delegate into the container.

DelegateMQ/delegate/UnicastDelegate.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ class UnicastDelegate<RetType(Args...)>
4343
/// @param[in] args The arguments used when invoking the target function
4444
/// @return The target function return value.
4545
RetType operator()(Args... args) {
46-
return (*m_delegate)(args...); // Invoke delegate callback
46+
if (m_delegate)
47+
return (*m_delegate)(args...); // Invoke delegate callback
48+
else
49+
return RetType();
4750
}
4851

4952
/// Invoke the bound target functions.
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
#ifndef _UNICAST_DELEGATE_SAFE_H
2+
#define _UNICAST_DELEGATE_SAFE_H
3+
4+
/// @file
5+
/// @brief Delegate container for storing an invoking a single delegate instance.
6+
/// Class is not thread-safe.
7+
8+
#include "UnicastDelegate.h"
9+
#include <mutex>
10+
11+
namespace dmq {
12+
13+
template <class R>
14+
struct UnicastDelegateSafe; // Not defined
15+
16+
/// @brief A thread-safe delegate container storing one delegate. Void and
17+
/// non-void return values supported.
18+
template<class RetType, class... Args>
19+
class UnicastDelegateSafe<RetType(Args...)> : public UnicastDelegate<RetType(Args...)>
20+
{
21+
public:
22+
using DelegateType = Delegate<RetType(Args...)>;
23+
using BaseType = UnicastDelegate<RetType(Args...)>;
24+
25+
UnicastDelegateSafe() = default;
26+
~UnicastDelegateSafe() = default;
27+
28+
UnicastDelegateSafe(const UnicastDelegateSafe& rhs) : BaseType() {
29+
const std::lock_guard<std::mutex> lock(m_lock);
30+
BaseType::operator=(rhs);
31+
}
32+
33+
UnicastDelegateSafe(UnicastDelegateSafe&& rhs) : BaseType() {
34+
const std::lock_guard<std::mutex> lock(m_lock);
35+
BaseType::operator=(std::move(rhs));
36+
}
37+
38+
/// Invoke the bound target.
39+
/// @param[in] args The arguments used when invoking the target function
40+
/// @return The target function return value.
41+
RetType operator()(Args... args) {
42+
const std::lock_guard<std::mutex> lock(m_lock);
43+
BaseType::operator ()(args...);
44+
}
45+
46+
/// Invoke the bound target functions.
47+
/// @param[in] args The arguments used when invoking the target function
48+
void Broadcast(Args... args) {
49+
const std::lock_guard<std::mutex> lock(m_lock);
50+
BaseType::Broadcast(args...);
51+
}
52+
53+
/// Assign a delegate to the container.
54+
/// @param[in] rhs A delegate target to assign
55+
void operator=(const DelegateType& rhs) {
56+
const std::lock_guard<std::mutex> lock(m_lock);
57+
BaseType::operator=(rhs);
58+
}
59+
60+
/// Assign a delegate to the container.
61+
/// @param[in] rhs A delegate target to assign
62+
void operator=(DelegateType&& rhs) {
63+
const std::lock_guard<std::mutex> lock(m_lock);
64+
BaseType::operator=(rhs);
65+
}
66+
67+
/// @brief Assignment operator that assigns the state of one object to another.
68+
/// @param[in] rhs The object whose state is to be assigned to the current object.
69+
/// @return A reference to the current object.
70+
UnicastDelegateSafe& operator=(const UnicastDelegateSafe& rhs) {
71+
const std::lock_guard<std::mutex> lock(m_lock);
72+
BaseType::operator=(rhs);
73+
return *this;
74+
}
75+
76+
/// @brief Move assignment operator that transfers ownership of resources.
77+
/// @param[in] rhs The object to move from.
78+
/// @return A reference to the current object.
79+
UnicastDelegateSafe& operator=(UnicastDelegateSafe&& rhs) noexcept {
80+
const std::lock_guard<std::mutex> lock(m_lock);
81+
BaseType::operator=(std::move(rhs));
82+
return *this;
83+
}
84+
85+
/// @brief Clear the all target functions.
86+
virtual void operator=(std::nullptr_t) noexcept {
87+
const std::lock_guard<std::mutex> lock(m_lock);
88+
BaseType::Clear();
89+
}
90+
91+
/// Any registered delegates?
92+
/// @return `true` if delegate container is empty.
93+
bool Empty() {
94+
const std::lock_guard<std::mutex> lock(m_lock);
95+
return BaseType::Empty();
96+
}
97+
98+
/// Remove the registered delegate
99+
void Clear() {
100+
const std::lock_guard<std::mutex> lock(m_lock);
101+
BaseType::Clear();
102+
}
103+
104+
/// Get the number of delegates stored.
105+
/// @return The number of delegates stored.
106+
std::size_t Size() {
107+
const std::lock_guard<std::mutex> lock(m_lock);
108+
return BaseType::Size();
109+
}
110+
111+
/// @brief Implicit conversion operator to `bool`.
112+
/// @return `true` if the container is not empty, `false` if the container is empty.
113+
explicit operator bool() {
114+
const std::lock_guard<std::mutex> lock(m_lock);
115+
return BaseType::operator bool();
116+
}
117+
118+
private:
119+
/// Lock to make the class thread-safe
120+
std::mutex m_lock;
121+
};
122+
123+
}
124+
125+
#endif

DelegateMQ/predef/os/stdlib/Thread.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ using namespace dmq;
1515
//----------------------------------------------------------------------------
1616
// Thread
1717
//----------------------------------------------------------------------------
18-
Thread::Thread(const std::string& threadName) : m_thread(nullptr), THREAD_NAME(threadName)
18+
Thread::Thread(const std::string& threadName) : m_thread(nullptr), m_exit(false), THREAD_NAME(threadName)
1919
{
2020
}
2121

@@ -109,15 +109,25 @@ void Thread::ExitThread()
109109
m_cv.notify_one();
110110
}
111111

112+
m_exit.store(true);
112113
m_thread->join();
113-
m_thread = nullptr;
114+
115+
// Clear the queue if anything added while waiting for join
116+
{
117+
lock_guard<mutex> lock(m_mutex);
118+
m_thread = nullptr;
119+
while (!m_queue.empty())
120+
m_queue.pop();
121+
}
114122
}
115123

116124
//----------------------------------------------------------------------------
117125
// DispatchDelegate
118126
//----------------------------------------------------------------------------
119127
void Thread::DispatchDelegate(std::shared_ptr<dmq::DelegateMsg> msg)
120128
{
129+
if (m_exit.load())
130+
return;
121131
if (m_thread == nullptr)
122132
throw std::invalid_argument("Thread pointer is null");
123133

DelegateMQ/predef/os/stdlib/Thread.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ class Thread : public dmq::IThread
6262
// Promise and future to synchronize thread start
6363
std::promise<void> m_threadStartPromise;
6464
std::future<void> m_threadStartFuture;
65+
66+
std::atomic<bool> m_exit;
6567
};
6668

6769
#endif

DelegateMQ/predef/serialize/msgpack/Serializer.h

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,28 +11,36 @@
1111
#include "msgpack.hpp"
1212
#include "delegate/ISerializer.h"
1313
#include <iostream>
14+
#include <type_traits>
1415

15-
// make_serialized serializes each remote function argument
16-
template<typename... Ts>
17-
void make_serialized(msgpack::sbuffer& buffer) { }
16+
// Type trait to check if a type is const
17+
template <typename T>
18+
using is_const_type = std::is_const<std::remove_reference_t<T>>;
1819

20+
// make_serialized serializes each remote function argument
1921
template<typename Arg1, typename... Args>
2022
void make_serialized(msgpack::sbuffer& buffer, Arg1& arg1, Args... args) {
2123
msgpack::pack(buffer, arg1);
22-
make_serialized(buffer, args...);
24+
25+
// Recursively call for other arguments
26+
if constexpr (sizeof...(args) > 0) {
27+
make_serialized(buffer, args...);
28+
}
2329
}
2430

2531
// make_unserialized unserializes each remote function argument
26-
template<typename... Ts>
27-
void make_unserialized(msgpack::unpacker& unpacker) { }
28-
2932
template<typename Arg1, typename... Args>
3033
void make_unserialized(msgpack::unpacker& unpacker, Arg1& arg1, Args&&... args) {
34+
static_assert(!is_const_type<Arg1>::value, "Arg1 cannot be const.");
3135
msgpack::object_handle oh;
3236
if (!unpacker.next(oh))
3337
throw std::runtime_error("Error during MsgPack unpacking.");
3438
arg1 = oh.get().as<Arg1>();
35-
make_unserialized(unpacker, args...);
39+
40+
// Recursively call for other arguments
41+
if constexpr (sizeof...(args) > 0) {
42+
make_unserialized(unpacker, args...);
43+
}
3644
}
3745

3846
template <class R>
@@ -45,9 +53,15 @@ class Serializer<RetType(Args...)> : public dmq::ISerializer<RetType(Args...)>
4553
public:
4654
// Write arguments to a stream
4755
virtual std::ostream& Write(std::ostream& os, Args... args) override {
48-
msgpack::sbuffer buffer;
49-
make_serialized(buffer, args...);
50-
os.write(buffer.data(), buffer.size());
56+
try {
57+
msgpack::sbuffer buffer;
58+
make_serialized(buffer, args...);
59+
os.write(buffer.data(), buffer.size());
60+
}
61+
catch (const std::exception& e) {
62+
std::cerr << "Serialize error: " << e.what() << std::endl;
63+
throw;
64+
}
5165
return os;
5266
}
5367

0 commit comments

Comments
 (0)