Skip to content

Commit 8bc31ef

Browse files
committed
Update to use distributed condition
1 parent d64d0ba commit 8bc31ef

File tree

4 files changed

+382
-453
lines changed

4 files changed

+382
-453
lines changed

tests/DCPS/RecorderReplayer/Args.h

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
/*
2-
*
3-
*
42
* Distributed under the OpenDDS License.
53
* See: http://www.opendds.org/license.html
64
*/
@@ -9,7 +7,6 @@
97
#define MESSENGER_TEST_ARGS_H
108

119
#include <dds/DCPS/transport/framework/TransportRegistry.h>
12-
1310
#include <dds/DCPS/transport/framework/TransportConfig.h>
1411
#include <dds/DCPS/transport/framework/TransportInst.h>
1512

@@ -18,6 +15,14 @@
1815
#include <ace/Log_Msg.h>
1916
#include <ace/OS_NS_stdlib.h>
2017

18+
const char* ACTOR_PUBLISHER = "Publisher";
19+
const char* ACTOR_RECORDER = "Recorder";
20+
const char* ACTOR_REPLAYER = "Replayer";
21+
const char* ACTOR_SUBSCRIBER = "Subscriber";
22+
23+
const char* EVENT_RECORDER_JOINED = "RecorderJoined";
24+
const char* EVENT_SUBSCRIBER_JOINED = "SubscriberJoined";
25+
2126
inline int
2227
parse_args(int argc, ACE_TCHAR *argv[])
2328
{
@@ -29,18 +34,13 @@ parse_args(int argc, ACE_TCHAR *argv[])
2934
while ((c = get_opts()) != -1) {
3035
switch (c) {
3136
case 't':
32-
3337
if (ACE_OS::strcmp(get_opts.opt_arg(), ACE_TEXT("udp")) == 0) {
3438
transport_type = "udp";
35-
3639
} else if (ACE_OS::strcmp(get_opts.opt_arg(), ACE_TEXT("multicast")) == 0) {
3740
transport_type = "multicast";
38-
3941
} else if (ACE_OS::strcmp(get_opts.opt_arg(), ACE_TEXT("tcp")) == 0) {
4042
transport_type = "tcp";
41-
4243
}
43-
4444
break;
4545
case 'p':
4646
thread_per_connection = true;
@@ -49,7 +49,7 @@ parse_args(int argc, ACE_TCHAR *argv[])
4949
default:
5050
ACE_ERROR_RETURN((LM_ERROR,
5151
ACE_TEXT("usage: %s [-t transport]\n"), argv[0]),
52-
-1);
52+
-1);
5353
}
5454
}
5555

@@ -61,19 +61,16 @@ parse_args(int argc, ACE_TCHAR *argv[])
6161
}
6262

6363
if (thread_per_connection) {
64-
OpenDDS::DCPS::TransportConfig_rch config =
65-
TheTransportRegistry->fix_empty_default();
64+
OpenDDS::DCPS::TransportConfig_rch config = TheTransportRegistry->fix_empty_default();
6665
if (config.in() == 0) {
6766
ACE_ERROR_RETURN((LM_ERROR,
6867
ACE_TEXT("no default config\n"), argv[0]),
69-
-1);
70-
}
71-
else if (config->instances_.size() < 1) {
68+
-1);
69+
} else if (config->instances_.size() < 1) {
7270
ACE_ERROR_RETURN((LM_ERROR,
7371
ACE_TEXT("no instances on default config\n"), argv[0]),
74-
-1);
75-
}
76-
else if (config->instances_.size() > 1) {
72+
-1);
73+
} else if (config->instances_.size() > 1) {
7774
ACE_ERROR((LM_ERROR,
7875
ACE_TEXT("too many instances on default config, using first\n"), argv[0]));
7976
}
@@ -84,5 +81,4 @@ parse_args(int argc, ACE_TCHAR *argv[])
8481
return 0;
8582
}
8683

87-
8884
#endif

tests/DCPS/RecorderReplayer/Publisher.cpp

Lines changed: 103 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -1,186 +1,149 @@
11
/*
2-
*
3-
*
42
* Distributed under the OpenDDS License.
53
* See: http://www.opendds.org/license.html
64
*/
75

8-
#include <ace/Get_Opt.h>
9-
#include <ace/Log_Msg.h>
10-
#include <ace/OS_NS_stdlib.h>
6+
#include "MessengerTypeSupportImpl.h"
7+
#include "Args.h"
8+
9+
#include <tests/Utils/DistributedConditionSet.h>
1110

1211
#include <dds/DCPS/Marked_Default_Qos.h>
1312
#include <dds/DCPS/PublisherImpl.h>
1413
#include <dds/DCPS/Service_Participant.h>
1514
#include <dds/DCPS/WaitSet.h>
16-
17-
#include "dds/DCPS/StaticIncludes.h"
15+
#include <dds/DCPS/StaticIncludes.h>
1816
#if defined ACE_AS_STATIC_LIBS && !defined OPENDDS_SAFETY_PROFILE
19-
#include <dds/DCPS/transport/udp/Udp.h>
20-
#include <dds/DCPS/transport/multicast/Multicast.h>
21-
#include <dds/DCPS/RTPS/RtpsDiscovery.h>
22-
#include <dds/DCPS/transport/rtps_udp/RtpsUdp.h>
23-
#include <dds/DCPS/transport/shmem/Shmem.h>
17+
# include <dds/DCPS/transport/udp/Udp.h>
18+
# include <dds/DCPS/transport/multicast/Multicast.h>
19+
# include <dds/DCPS/RTPS/RtpsDiscovery.h>
20+
# include <dds/DCPS/transport/rtps_udp/RtpsUdp.h>
21+
# include <dds/DCPS/transport/shmem/Shmem.h>
2422
#endif
2523

26-
#include "MessengerTypeSupportImpl.h"
27-
#include "Args.h"
24+
#include <ace/Get_Opt.h>
25+
#include <ace/Log_Msg.h>
26+
#include <ace/OS_NS_stdlib.h>
2827

2928
int ACE_TMAIN(int argc, ACE_TCHAR *argv[])
3029
{
31-
try {
32-
// Initialize DomainParticipantFactory
33-
DDS::DomainParticipantFactory_var dpf =
34-
TheParticipantFactoryWithArgs(argc, argv);
30+
try {
31+
// Initialize DomainParticipantFactory
32+
DDS::DomainParticipantFactory_var dpf = TheParticipantFactoryWithArgs(argc, argv);
3533

36-
int error;
37-
if ((error = parse_args(argc, argv)) != 0) {
38-
return error;
39-
}
34+
int error;
35+
if ((error = parse_args(argc, argv)) != 0) {
36+
return error;
37+
}
4038

4139
// Create DomainParticipant
42-
DDS::DomainParticipant_var participant =
43-
dpf->create_participant(4,
44-
PARTICIPANT_QOS_DEFAULT,
45-
0,
46-
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
40+
DDS::DomainParticipant_var participant = dpf->create_participant(4,
41+
PARTICIPANT_QOS_DEFAULT,
42+
0,
43+
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
4744

4845
if (!participant) {
4946
ACE_ERROR_RETURN((LM_ERROR,
5047
ACE_TEXT("ERROR: %N:%l: main() -")
5148
ACE_TEXT(" create_participant failed!\n")),
52-
-1);
49+
-1);
5350
}
5451

5552
ACE_DEBUG((LM_DEBUG, "(%P|%t) Start publisher\n"));
5653

57-
{
58-
// Register TypeSupport (Messenger::Message)
59-
Messenger::MessageTypeSupport_var ts =
60-
new Messenger::MessageTypeSupportImpl;
54+
// Register TypeSupport (Messenger::Message)
55+
Messenger::MessageTypeSupport_var ts = new Messenger::MessageTypeSupportImpl;
6156

62-
if (ts->register_type(participant, "Messenger") != DDS::RETCODE_OK) {
63-
ACE_ERROR_RETURN((LM_ERROR,
64-
ACE_TEXT("ERROR: %N:%l: main() -")
65-
ACE_TEXT(" register_type failed!\n")),
66-
-1);
67-
}
57+
if (ts->register_type(participant, "Messenger") != DDS::RETCODE_OK) {
58+
ACE_ERROR_RETURN((LM_ERROR,
59+
ACE_TEXT("ERROR: %N:%l: main() -")
60+
ACE_TEXT(" register_type failed!\n")),
61+
-1);
62+
}
6863

69-
// Create Topic (Movie Discussion List)
70-
DDS::Topic_var topic =
71-
participant->create_topic("Movie Discussion List",
72-
"Messenger",
73-
TOPIC_QOS_DEFAULT,
74-
0,
75-
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
76-
77-
if (!topic) {
78-
ACE_ERROR_RETURN((LM_ERROR,
79-
ACE_TEXT("ERROR: %N:%l: main() -")
80-
ACE_TEXT(" create_topic failed!\n")),
81-
-1);
82-
}
64+
// Create Topic (Movie Discussion List)
65+
DDS::Topic_var topic = participant->create_topic("Movie Discussion List",
66+
"Messenger",
67+
TOPIC_QOS_DEFAULT,
68+
0,
69+
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
8370

84-
// setup partition
85-
DDS::PublisherQos pub_qos;
86-
participant->get_default_publisher_qos(pub_qos);
87-
88-
DDS::StringSeq my_partition;
89-
my_partition.length(1);
90-
my_partition[0] = "One";
91-
pub_qos.partition.name = my_partition;
92-
93-
// Create Publisher
94-
DDS::Publisher_var publisher =
95-
participant->create_publisher(pub_qos,
96-
0,
97-
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
98-
99-
if (!publisher) {
100-
ACE_ERROR_RETURN((LM_ERROR,
101-
ACE_TEXT("ERROR: %N:%l: main() -")
102-
ACE_TEXT(" create_publisher failed!\n")),
103-
-1);
104-
}
71+
if (!topic) {
72+
ACE_ERROR_RETURN((LM_ERROR,
73+
ACE_TEXT("ERROR: %N:%l: main() -")
74+
ACE_TEXT(" create_topic failed!\n")),
75+
-1);
76+
}
10577

106-
// Create DataWriter
107-
DDS::DataWriter_var writer =
108-
publisher->create_datawriter(topic,
109-
DATAWRITER_QOS_DEFAULT,
110-
0,
111-
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
112-
113-
if (!writer) {
114-
ACE_ERROR_RETURN((LM_ERROR,
115-
ACE_TEXT("ERROR: %N:%l: main() -")
116-
ACE_TEXT(" create_datawriter failed!\n")),
117-
-1);
118-
}
78+
// Setup partition
79+
DDS::PublisherQos pub_qos;
80+
participant->get_default_publisher_qos(pub_qos);
11981

120-
Messenger::MessageDataWriter_var message_writer =
121-
Messenger::MessageDataWriter::_narrow(writer);
82+
DDS::StringSeq my_partition;
83+
my_partition.length(1);
84+
my_partition[0] = "One";
85+
pub_qos.partition.name = my_partition;
12286

123-
if (!message_writer) {
124-
ACE_ERROR_RETURN((LM_ERROR,
125-
ACE_TEXT("ERROR: %N:%l: main() -")
126-
ACE_TEXT(" _narrow failed!\n")),
127-
-1);
128-
}
87+
// Create Publisher
88+
DDS::Publisher_var publisher = participant->create_publisher(pub_qos,
89+
0,
90+
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
12991

130-
// Block until Subscriber is available
131-
DDS::StatusCondition_var condition = writer->get_statuscondition();
132-
condition->set_enabled_statuses(DDS::PUBLICATION_MATCHED_STATUS);
133-
134-
DDS::WaitSet_var ws = new DDS::WaitSet;
135-
ws->attach_condition(condition);
136-
137-
while (true) {
138-
DDS::PublicationMatchedStatus matches;
139-
if (writer->get_publication_matched_status(matches) != ::DDS::RETCODE_OK) {
140-
ACE_ERROR_RETURN((LM_ERROR,
141-
ACE_TEXT("ERROR: %N:%l: main() -")
142-
ACE_TEXT(" get_publication_matched_status failed!\n")),
143-
-1);
144-
}
145-
146-
if (matches.current_count >= 1) {
147-
break;
148-
}
149-
150-
DDS::ConditionSeq conditions;
151-
DDS::Duration_t timeout = { 60, 0 };
152-
if (ws->wait(conditions, timeout) != DDS::RETCODE_OK) {
153-
ACE_ERROR_RETURN((LM_ERROR,
154-
ACE_TEXT("ERROR: %N:%l: main() -")
155-
ACE_TEXT(" wait failed!\n")),
156-
-1);
157-
}
158-
}
92+
if (!publisher) {
93+
ACE_ERROR_RETURN((LM_ERROR,
94+
ACE_TEXT("ERROR: %N:%l: main() -")
95+
ACE_TEXT(" create_publisher failed!\n")),
96+
-1);
97+
}
15998

160-
ws->detach_condition(condition);
99+
// Create DataWriter
100+
DDS::DataWriter_var writer = publisher->create_datawriter(topic,
101+
DATAWRITER_QOS_DEFAULT,
102+
0,
103+
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
161104

162-
// Write samples
163-
Messenger::Message message;
164-
message.subject_id = 99;
105+
if (!writer) {
106+
ACE_ERROR_RETURN((LM_ERROR,
107+
ACE_TEXT("ERROR: %N:%l: main() -")
108+
ACE_TEXT(" create_datawriter failed!\n")),
109+
-1);
110+
}
165111

166-
message.from = "Comic Book Guy";
167-
message.subject = "Review";
168-
message.text = "Worst. Movie. Ever.";
169-
message.count = 0;
112+
Messenger::MessageDataWriter_var message_writer = Messenger::MessageDataWriter::_narrow(writer);
170113

171-
for (int i = 0; i < 10; ++i) {
172-
const DDS::ReturnCode_t error = message_writer->write(message, DDS::HANDLE_NIL);
173-
++message.count;
174-
++message.subject_id;
114+
if (!message_writer) {
115+
ACE_ERROR_RETURN((LM_ERROR,
116+
ACE_TEXT("ERROR: %N:%l: main() -")
117+
ACE_TEXT(" _narrow failed!\n")),
118+
-1);
119+
}
175120

176-
if (error != DDS::RETCODE_OK) {
177-
ACE_ERROR((LM_ERROR,
178-
ACE_TEXT("ERROR: %N:%l: main() -")
179-
ACE_TEXT(" write returned %d!\n"), error));
180-
}
121+
// Block until Recorder joins
122+
DistributedConditionSet_rch dcs = OpenDDS::DCPS::make_rch<FileBasedDistributedConditionSet>();
123+
dcs->wait_for(ACTOR_PUBLISHER, ACTOR_RECORDER, EVENT_RECORDER_JOINED);
124+
125+
// Write samples
126+
Messenger::Message message;
127+
message.subject_id = 99;
128+
message.from = "Comic Book Guy";
129+
message.subject = "Review";
130+
message.text = "Worst. Movie. Ever.";
131+
message.count = 0;
132+
133+
for (int i = 0; i < 10; ++i) {
134+
const DDS::ReturnCode_t error = message_writer->write(message, DDS::HANDLE_NIL);
135+
++message.count;
136+
++message.subject_id;
137+
138+
if (error != DDS::RETCODE_OK) {
139+
ACE_ERROR((LM_ERROR,
140+
ACE_TEXT("ERROR: %N:%l: main() -")
141+
ACE_TEXT(" write returned %d!\n"), error));
181142
}
182143
}
183144

145+
// TODO(sonndinh): May wait until all samples are received by the Recorder
146+
184147
ACE_DEBUG((LM_DEBUG, "(%P|%t) Stop publisher\n"));
185148

186149
// Clean-up!

0 commit comments

Comments
 (0)