Skip to content

Commit dcb1756

Browse files
authored
Merge pull request #35 from doug1234/replaceFilterFix
Fix for replaceFilter not working correctly
2 parents e7ab825 + 80453be commit dcb1756

File tree

3 files changed

+202
-34
lines changed

3 files changed

+202
-34
lines changed

src/dds_callback.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -312,9 +312,6 @@ class Emitter : public EmitterBase
312312
/// The data reader for the target DDS topic.
313313
DDS::DataReader_var m_reader;
314314

315-
/// Is the thread running
316-
bool m_running = false;
317-
318315
/// The data reader thread
319316
std::thread m_dataThread;
320317

src/dds_manager.cpp

Lines changed: 176 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ bool DDSManager::addPartition(const std::string& topicName,
577577
//------------------------------------------------------------------------------
578578
bool DDSManager::createSubscriber(const std::string& topicName,
579579
const std::string& readerName,
580-
const std::string& filter)
580+
const std::string& filter, const DDS::StringSeq &filterParams)
581581
{
582582
// Make sure the data reader name is valid
583583
if (readerName.empty())
@@ -641,14 +641,14 @@ bool DDSManager::createSubscriber(const std::string& topicName,
641641
// Create a new filtered topic if requested
642642
if (!filter.empty())
643643
{
644-
const DDS::StringSeq noParams;
645-
const std::string filterName = topicName + "_" + readerName;
644+
const std::string filterName = topicName + "_" + readerName + "_0";
646645
DDS::ContentFilteredTopic_var filteredTopic =
647646
m_domainParticipant->create_contentfilteredtopic(
648647
filterName.c_str(),
649648
topicGroup->topic,
650649
filter.c_str(),
651-
noParams);
650+
filterParams);
651+
652652

653653
if (!filteredTopic)
654654
{
@@ -784,7 +784,8 @@ bool DDSManager::createPublisher(const std::string& topicName)
784784
//------------------------------------------------------------------------------
785785
bool DDSManager::createPublisherSubscriber(const std::string& topicName,
786786
const std::string& readerName,
787-
const std::string& filter)
787+
const std::string& filter,
788+
const DDS::StringSeq &filterParams)
788789
{
789790
// TODO: Remove this function (No longer used).
790791
bool pass = false;
@@ -795,7 +796,7 @@ bool DDSManager::createPublisherSubscriber(const std::string& topicName,
795796
return false;
796797
}
797798

798-
pass = createSubscriber(topicName, readerName, filter);
799+
pass = createSubscriber(topicName, readerName, filter, filterParams);
799800
if (!pass)
800801
{
801802
return false;
@@ -929,33 +930,57 @@ bool DDSManager::replaceFilter(const std::string& topicName,
929930
decltype(m_sharedLock) lock(m_topicMutex);
930931
std::shared_ptr<TopicGroup> topicGroup = m_topics[topicName];
931932

932-
if (topicGroup->m_readerListeners.find(readerName) != topicGroup->m_readerListeners.end()) {
933+
if (topicGroup->m_readerListeners.find(readerName) == topicGroup->m_readerListeners.end()) {
933934
std::cerr << "Error in replaceFilter: Reader listener '" << readerName
934-
<< "' already registered for topic '"
935+
<< "' not registered for topic '"
935936
<< topicName
936937
<< "'." << std::endl;
937938
return false;
938939
}
939940

940-
// Stop the emitter if it exists
941+
// Stop the emitter if it exists (exists for callbacks)
941942
EmitterBase* emitter = nullptr;
942943
if (topicGroup->emitters.find(readerName) != topicGroup->emitters.end())
943944
{
945+
std::cerr << "found emitter when trying to stop" << std::endl;
944946
emitter = topicGroup->emitters[readerName].get();
945947
if (emitter->isRunning())
946948
{
949+
std::cerr << "emitter told to stop" << std::endl;
947950
emitter->stop();
948951
}
949952
}
950953

951954

952-
DDS::ContentFilteredTopic* topicDesc =
953-
dynamic_cast<DDS::ContentFilteredTopic*>
954-
(dataReader->get_topicdescription());
955+
DDS::TopicDescription_var topic = dataReader->get_topicdescription();
956+
DDS::ContentFilteredTopic_var topicDesc = DDS::ContentFilteredTopic::_narrow(topic);
957+
958+
// We have to destroy the current data reader before building a new one
959+
// The first step is to delete the contained entities (deletes all the ReadConditions and QueryConditions)
960+
DDS::ReturnCode_t return_code = dataReader->delete_contained_entities();
961+
if (return_code != DDS::RETCODE_OK)
962+
{
963+
std::cerr << "dataReader failed on delete_contained_entities, return_code: " << return_code << std::endl;
964+
return false;
965+
}
966+
967+
// Now delete the reader
968+
return_code = subscriber->delete_datareader(dataReader);
969+
dataReader = nullptr;
970+
if (return_code != DDS::RETCODE_OK)
971+
{
972+
std::cerr << "dataReader failed on delete_datareader, return_code: " << return_code << std::endl;
973+
return false;
974+
}
955975

976+
std::string existingFilterName;
956977
// Remove this content filtered topic from the domain if it exists
957978
if (topicDesc && m_domainParticipant)
958979
{
980+
existingFilterName = topicDesc->get_name();
981+
982+
std::cout << "existingFilterName " << existingFilterName << std::endl;
983+
959984
for (auto iter = topicGroup->filteredTopics.begin();
960985
iter != topicGroup->filteredTopics.end();
961986
++iter)
@@ -965,10 +990,17 @@ bool DDSManager::replaceFilter(const std::string& topicName,
965990
continue;
966991
}
967992

968-
m_domainParticipant->delete_contentfilteredtopic(topicDesc);
993+
return_code = m_domainParticipant->delete_contentfilteredtopic(topicDesc);
994+
if (return_code == DDS::RETCODE_OK)
995+
{
969996
iter->second = nullptr;
970997
topicDesc = nullptr;
971998
topicGroup->filteredTopics.erase(iter);
999+
}
1000+
else
1001+
{
1002+
std::cerr << "domain participant failed on delete_contentfilteredtopic, return_code: " << return_code << std::endl;
1003+
}
9721004
break;
9731005
}
9741006
}
@@ -979,19 +1011,25 @@ bool DDSManager::replaceFilter(const std::string& topicName,
9791011
subscriber->delete_datareader(dataReader);
9801012
DDS::TopicDescription* targetTopic = nullptr;
9811013

982-
9831014
// Create a new filtered topic if requested
9841015
if (!filter.empty())
9851016
{
9861017
// The topic filter name must be unique or it will fail on the
9871018
// second time it's created
988-
static int counter = 0;
1019+
int counter = 0;
1020+
if (!existingFilterName.empty())
1021+
{
1022+
std::size_t found = existingFilterName.find_last_of("_");
1023+
std::string count_string = existingFilterName.substr(found + 1);
1024+
counter = std::stoi(count_string);
1025+
}
9891026
++counter;
9901027
const std::string filterName =
9911028
topicName + "_" +
9921029
readerName + "_" +
9931030
std::to_string(counter);
9941031

1032+
//Create the content filter with no swappable params
9951033
const DDS::StringSeq noParams;
9961034
DDS::ContentFilteredTopic_var filteredTopic =
9971035
m_domainParticipant->create_contentfilteredtopic(
@@ -1011,7 +1049,17 @@ bool DDSManager::replaceFilter(const std::string& topicName,
10111049

10121050
return false;
10131051
}
1052+
else
1053+
{
1054+
std::cerr << "Success in updating content filtered topic '"
1055+
<< topicName
1056+
<< "' with the filter ["
1057+
<< filter
1058+
<< "]"
1059+
<< std::endl;
1060+
}
10141061

1062+
// Save of content filter
10151063
topicGroup->filteredTopics[filterName] = filteredTopic;
10161064
targetTopic = filteredTopic;
10171065
}
@@ -1023,30 +1071,57 @@ bool DDSManager::replaceFilter(const std::string& topicName,
10231071
targetTopic = topicGroup->topic;
10241072
}
10251073

1026-
// Create the new data reader
1074+
// Create the new data reader, but first create a listener for it
10271075
auto readerListener = std::make_unique<GenericReaderListener>();
10281076
readerListener->SetHandler(m_rlHandler);
1029-
dataReader = topicGroup->subscriber->create_datareader(
1030-
targetTopic,
1031-
topicGroup->dataReaderQos,
1032-
readerListener.get(),
1033-
DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS |
1034-
DDS::SUBSCRIPTION_MATCHED_STATUS |
1035-
DDS::SAMPLE_LOST_STATUS);
10361077

1037-
if (!dataReader)
1078+
if (readerListener)
10381079
{
1039-
std::cerr << "Error creating data reader for '"
1040-
<< topicName
1041-
<< "'"
1042-
<< std::endl;
1080+
std::cerr << "Success in creating reader listener" << std::endl;
1081+
1082+
// move replaces the original listener with the new listener and since there is no longer an existing
1083+
// reference to the orginial lister it deletes itself since it is a unique pointer.
1084+
// This must be done here rather than after the new reader, otherwise there will be duplicate reader
1085+
// listeners
1086+
1087+
auto rlEmplaceRes = topicGroup->m_readerListeners.emplace(readerName, std::move(readerListener));
1088+
1089+
//create the new data reader
1090+
1091+
dataReader = topicGroup->subscriber->create_datareader(
1092+
targetTopic,
1093+
topicGroup->dataReaderQos,
1094+
rlEmplaceRes.first->second.get(),
1095+
DDS::INCONSISTENT_TOPIC_STATUS |
1096+
DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS |
1097+
DDS::SUBSCRIPTION_MATCHED_STATUS |
1098+
DDS::SAMPLE_LOST_STATUS);
1099+
1100+
if (!dataReader)
1101+
{
1102+
std::cerr << "Error creating data reader for '"
1103+
<< topicName
1104+
<< "'"
1105+
<< std::endl;
10431106

1107+
return false;
1108+
}
1109+
else
1110+
{
1111+
std::cerr << "Success in creating data reader for '"
1112+
<< topicName
1113+
<< "'"
1114+
<< std::endl;
1115+
}
1116+
}
1117+
else
1118+
{
1119+
std::cerr << "Failure in creating reader listener" << std::endl;
10441120
return false;
10451121
}
10461122

10471123
// Store the data reader with the reference name
10481124
topicGroup->readers[readerName] = dataReader;
1049-
topicGroup->m_readerListeners.emplace(readerName, std::move(readerListener));
10501125
lock.unlock();
10511126

10521127
// Restart the emitter thread with the new reader if it existed
@@ -1060,6 +1135,78 @@ bool DDSManager::replaceFilter(const std::string& topicName,
10601135

10611136
} // End DDSManager::replaceFilter
10621137

1138+
bool DDSManager::replaceFilterParams(const std::string& topicName,
1139+
const std::string& readerName,
1140+
const DDS::StringSeq &filterParams)
1141+
{
1142+
1143+
// Make sure the data reader name is valid
1144+
if (readerName.empty())
1145+
{
1146+
std::cerr << "Error replacing topic filter for '"
1147+
<< topicName
1148+
<< "'. The reader name must not be empty."
1149+
<< std::endl;
1150+
1151+
return false;
1152+
}
1153+
1154+
1155+
decltype(m_sharedLock) lock(m_topicMutex);
1156+
// Has the subscriber already been registered?
1157+
DDS::Subscriber_var subscriber = getSubscriber(topicName);
1158+
if (!subscriber)
1159+
{
1160+
std::cerr << "Error replacing topic filter for '"
1161+
<< topicName
1162+
<< "'. The subscriber has not been created."
1163+
<< std::endl;
1164+
1165+
return false;
1166+
}
1167+
1168+
// Make sure this data reader exists
1169+
DDS::DataReader_var dataReader = getReader(topicName, readerName);
1170+
if (!dataReader)
1171+
{
1172+
std::cerr << "Error replacing topic filter for '"
1173+
<< topicName
1174+
<< "'. The data reader named '"
1175+
<< readerName
1176+
<< "' does not exist."
1177+
<< std::endl;
1178+
1179+
return false;
1180+
}
1181+
1182+
// Get the ContentFilteredTopic
1183+
std::shared_ptr<TopicGroup> topicGroup = m_topics[topicName];
1184+
1185+
1186+
DDS::TopicDescription_var topic = dataReader->get_topicdescription();
1187+
DDS::ContentFilteredTopic_var topicDesc = DDS::ContentFilteredTopic::_narrow(topic);
1188+
bool status = false;
1189+
1190+
// Update this content filtered topic if it exists
1191+
if (topicDesc)
1192+
{
1193+
for (auto iter = topicGroup->filteredTopics.begin();
1194+
iter != topicGroup->filteredTopics.end();
1195+
++iter)
1196+
{
1197+
if (topicDesc == iter->second)
1198+
{
1199+
if (iter->second->set_expression_parameters(filterParams) == DDS::RETCODE_OK)
1200+
{
1201+
status = true;
1202+
}
1203+
break;
1204+
}
1205+
}
1206+
}
1207+
return status;
1208+
}
1209+
10631210

10641211
//------------------------------------------------------------------------------
10651212
bool DDSManager::setMaxDataRate(const std::string& topicName,

src/dds_manager.h

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,25 @@ class DDSManager
162162
*/
163163
bool createSubscriber(const std::string& topicName,
164164
const std::string& readerName,
165-
const std::string& filter = "");
165+
const std::string& filter = "",
166+
const DDS::StringSeq &filterParams = DDS::StringSeq());
167+
168+
/**
169+
* @brief Replace the content filter parameters for a given topic.
170+
* @details This method lets the user swap out paramaters of the filter while leaving operators intact. This is a less disruptive way of
171+
* changing filtering and it does not make the reader appear as a late joiner like the replaceFilter method does. If the user wants
172+
* to swap parameters then a filter that supports swappable parameter must first be specified in the createSubscriber or createPublisherSubscriber
173+
* methods. If the opendds.ini file has "DCPSPublisherContentFilter=0" set then filtering is on the subscriber only
174+
* and never get communiciated to the publisher. Publisher filtering may be preferred as a way to eliminate uneeded NW traffic
175+
* depending on use case.
176+
* @param[in] topicName The name of the topic.
177+
* @param[in] readerName Unique data reader name per topic.
178+
* @param[in] filter parameters to be swappped in the data filter.
179+
* @return True if the operation was successful; false otherwise.
180+
*/
181+
bool replaceFilterParams(const std::string & topicName,
182+
const std::string & readerName,
183+
const DDS::StringSeq &filterParams);
166184

167185
/**
168186
* @brief Create a new topic publisher.
@@ -180,7 +198,8 @@ class DDSManager
180198
*/
181199
bool createPublisherSubscriber(const std::string& topicName,
182200
const std::string& readerName,
183-
const std::string& filter = "");
201+
const std::string& filter = "",
202+
const DDS::StringSeq &filterParams = DDS::StringSeq());
184203

185204
/**
186205
* @brief Read a single data sample for a given topic.
@@ -276,6 +295,11 @@ class DDSManager
276295

277296
/**
278297
* @brief Replace the content filter for a given topic.
298+
* @details This method replaces the complete content filter (operators and paramters) and does so by tearing down
299+
* the reader, listener and content filter and then reconstructing them. This has the affect of making the reader appear as a late joiner
300+
* to the publisher. If the opendds.ini file has "DCPSPublisherContentFilter=0" set then filtering is on the subscriber only
301+
* and never get communiciated to the publisher. Publisher filtering may be preferred as a way to eliminate uneeded NW traffic
302+
* depending on use case.
279303
* @remarks This method is slow, so it should be used sparingly. Use the
280304
* filter parameter in takeSample or takeAllSamples when the
281305
* filter must change often.

0 commit comments

Comments
 (0)