Skip to content

Commit 28552ce

Browse files
Release participant_stateless secure builtin writer history change when authentication has finished (#5386) (#5394)
* Release `participant_stateless` secure builtin writer history change when authentication has finished (#5386) * TMP: REMOVE THIS COMMIT Signed-off-by: Mario Dominguez <[email protected]> * Refs #22033: BB test Signed-off-by: Mario Dominguez <[email protected]> * Refs #22033: Modify secure builtins initial payload size Signed-off-by: Mario Dominguez <[email protected]> * Refs #22033: Fix: release stateless msg payload pool when participant cryptography succeeds Signed-off-by: Mario Dominguez <[email protected]> --------- Signed-off-by: Mario Dominguez <[email protected]> (cherry picked from commit b414621) * Update waitAuthorized in PubSub fastrtps_deprecated API Signed-off-by: Mario Dominguez <[email protected]> * Fix windows compilation error Signed-off-by: Mario Dominguez <[email protected]> --------- Signed-off-by: Mario Dominguez <[email protected]> Co-authored-by: Mario Domínguez López <[email protected]> Co-authored-by: Mario Dominguez <[email protected]>
1 parent 8935858 commit 28552ce

File tree

7 files changed

+229
-30
lines changed

7 files changed

+229
-30
lines changed

src/cpp/rtps/security/SecurityManager.cpp

+28-5
Original file line numberDiff line numberDiff line change
@@ -1065,11 +1065,13 @@ void SecurityManager::delete_participant_stateless_message_entities()
10651065
void SecurityManager::create_participant_stateless_message_pool()
10661066
{
10671067
participant_stateless_message_writer_hattr_ =
1068-
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, participant_->getMaxMessageSize(), 20, 100 };
1068+
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, static_cast<uint32_t>(PARTICIPANT_STATELESS_MESSAGE_PAYLOAD_DEFAULT_SIZE),
1069+
20, 100};
10691070
participant_stateless_message_reader_hattr_ =
1070-
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, participant_->getMaxMessageSize(), 10, 5000 };
1071+
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, static_cast<uint32_t>(PARTICIPANT_STATELESS_MESSAGE_PAYLOAD_DEFAULT_SIZE),
1072+
10, 5000};
10711073

1072-
BasicPoolConfig cfg{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, participant_->getMaxMessageSize() };
1074+
BasicPoolConfig cfg{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, PARTICIPANT_STATELESS_MESSAGE_PAYLOAD_DEFAULT_SIZE};
10731075
participant_stateless_message_pool_ = TopicPayloadPoolRegistry::get("DCPSParticipantStatelessMessage", cfg);
10741076

10751077
PoolConfig writer_cfg = PoolConfig::from_history_attributes(participant_stateless_message_writer_hattr_);
@@ -1221,7 +1223,8 @@ void SecurityManager::delete_participant_volatile_message_secure_entities()
12211223
void SecurityManager::create_participant_volatile_message_secure_pool()
12221224
{
12231225
participant_volatile_message_secure_hattr_ =
1224-
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, participant_->getMaxMessageSize(), 10, 0 };
1226+
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, static_cast<uint32_t>(PARTICIPANT_VOLATILE_MESSAGE_PAYLOAD_DEFAULT_SIZE),
1227+
10, 0 };
12251228

12261229
PoolConfig pool_cfg = PoolConfig::from_history_attributes(participant_volatile_message_secure_hattr_);
12271230
participant_volatile_message_secure_pool_ =
@@ -1720,6 +1723,7 @@ void SecurityManager::process_participant_volatile_message_secure(
17201723
const GUID_t remote_participant_key(message.message_identity().source_guid().guidPrefix,
17211724
c_EntityId_RTPSParticipant);
17221725
std::shared_ptr<ParticipantCryptoHandle> remote_participant_crypto;
1726+
DiscoveredParticipantInfo::AuthUniquePtr remote_participant_info;
17231727

17241728
// Search remote participant crypto handle.
17251729
{
@@ -1735,6 +1739,7 @@ void SecurityManager::process_participant_volatile_message_secure(
17351739
}
17361740

17371741
remote_participant_crypto = dp_it->second->get_participant_crypto();
1742+
remote_participant_info = dp_it->second->get_auth();
17381743
}
17391744
else
17401745
{
@@ -1756,12 +1761,30 @@ void SecurityManager::process_participant_volatile_message_secure(
17561761
EPROSIMA_LOG_ERROR(SECURITY, "Cannot set remote participant crypto tokens ("
17571762
<< remote_participant_key << ") - (" << exception.what() << ")");
17581763
}
1764+
else
1765+
{
1766+
// Release the change from the participant_stateless_message_writer_pool_
1767+
// As both participants have already authorized each other
1768+
1769+
if (remote_participant_info &&
1770+
remote_participant_info->change_sequence_number_ != SequenceNumber_t::unknown())
1771+
{
1772+
participant_stateless_message_writer_history_->remove_change(
1773+
remote_participant_info->change_sequence_number_);
1774+
remote_participant_info->change_sequence_number_ = SequenceNumber_t::unknown();
1775+
}
1776+
}
17591777
}
17601778
else
17611779
{
17621780
std::lock_guard<shared_mutex> _(mutex_);
17631781
remote_participant_pending_messages_.emplace(remote_participant_key, std::move(message.message_data()));
17641782
}
1783+
1784+
if (remote_participant_info)
1785+
{
1786+
restore_discovered_participant_info(remote_participant_key, remote_participant_info);
1787+
}
17651788
}
17661789
else if (message.message_class_id().compare(GMCLASSID_SECURITY_READER_CRYPTO_TOKENS) == 0)
17671790
{
@@ -1917,7 +1940,7 @@ void SecurityManager::process_participant_volatile_message_secure(
19171940
}
19181941
else
19191942
{
1920-
EPROSIMA_LOG_INFO(SECURITY, "Discarted ParticipantGenericMessage with class id " << message.message_class_id());
1943+
EPROSIMA_LOG_INFO(SECURITY, "Discarded ParticipantGenericMessage with class id " << message.message_class_id());
19211944
}
19221945
}
19231946

src/cpp/rtps/security/SecurityManager.h

+9-1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ struct EndpointSecurityAttributes;
6969
*/
7070
class SecurityManager : private WriterListener
7171
{
72+
static constexpr std::size_t PARTICIPANT_STATELESS_MESSAGE_PAYLOAD_DEFAULT_SIZE = 8192;
73+
static constexpr std::size_t PARTICIPANT_VOLATILE_MESSAGE_PAYLOAD_DEFAULT_SIZE = 1024;
74+
7275
public:
7376

7477
/**
@@ -401,14 +404,19 @@ class SecurityManager : private WriterListener
401404
}
402405

403406
AuthenticationInfo(
404-
AuthenticationInfo&& auth)
407+
AuthenticationInfo&& auth) noexcept
405408
: identity_handle_(std::move(auth.identity_handle_))
406409
, handshake_handle_(std::move(auth.handshake_handle_))
407410
, auth_status_(auth.auth_status_)
408411
, expected_sequence_number_(auth.expected_sequence_number_)
409412
, change_sequence_number_(std::move(auth.change_sequence_number_))
410413
, event_(std::move(auth.event_))
411414
{
415+
auth.identity_handle_ = nullptr;
416+
auth.handshake_handle_ = nullptr;
417+
auth.auth_status_ = AUTHENTICATION_NOT_AVAILABLE;
418+
auth.expected_sequence_number_ = 0;
419+
auth.change_sequence_number_ = SequenceNumber_t::unknown();
412420
}
413421

414422
int32_t handshake_requests_sent_;

test/blackbox/api/dds-pim/PubSubReader.hpp

+26-5
Original file line numberDiff line numberDiff line change
@@ -834,16 +834,28 @@ class PubSubReader
834834
}
835835

836836
#if HAVE_SECURITY
837-
void waitAuthorized()
837+
void waitAuthorized(
838+
std::chrono::seconds timeout = std::chrono::seconds::zero(),
839+
unsigned int expected = 1)
838840
{
839841
std::unique_lock<std::mutex> lock(mutexAuthentication_);
840842

841843
std::cout << "Reader is waiting authorization..." << std::endl;
842844

843-
cvAuthentication_.wait(lock, [&]() -> bool
844-
{
845-
return authorized_ > 0;
846-
});
845+
if (timeout == std::chrono::seconds::zero())
846+
{
847+
cvAuthentication_.wait(lock, [&]()
848+
{
849+
return authorized_ >= expected;
850+
});
851+
}
852+
else
853+
{
854+
cvAuthentication_.wait_for(lock, timeout, [&]()
855+
{
856+
return authorized_ >= expected;
857+
});
858+
}
847859

848860
std::cout << "Reader authorization finished..." << std::endl;
849861
}
@@ -1134,6 +1146,15 @@ class PubSubReader
11341146
return *this;
11351147
}
11361148

1149+
PubSubReader& participants_allocation_properties(
1150+
size_t initial,
1151+
size_t maximum)
1152+
{
1153+
participant_qos_.allocation().participants.initial = initial;
1154+
participant_qos_.allocation().participants.maximum = maximum;
1155+
return *this;
1156+
}
1157+
11371158
PubSubReader& expect_no_allocs()
11381159
{
11391160
// TODO(Mcc): Add no allocations check code when feature is completely ready

test/blackbox/api/dds-pim/PubSubWriter.hpp

+26-5
Original file line numberDiff line numberDiff line change
@@ -700,16 +700,28 @@ class PubSubWriter
700700
}
701701

702702
#if HAVE_SECURITY
703-
void waitAuthorized()
703+
void waitAuthorized(
704+
std::chrono::seconds timeout = std::chrono::seconds::zero(),
705+
unsigned int expected = 1)
704706
{
705707
std::unique_lock<std::mutex> lock(mutexAuthentication_);
706708

707709
std::cout << "Writer is waiting authorization..." << std::endl;
708710

709-
cvAuthentication_.wait(lock, [&]() -> bool
710-
{
711-
return authorized_ > 0;
712-
});
711+
if (timeout == std::chrono::seconds::zero())
712+
{
713+
cvAuthentication_.wait(lock, [&]()
714+
{
715+
return authorized_ >= expected;
716+
});
717+
}
718+
else
719+
{
720+
cvAuthentication_.wait_for(lock, timeout, [&]()
721+
{
722+
return authorized_ >= expected;
723+
});
724+
}
713725

714726
std::cout << "Writer authorization finished..." << std::endl;
715727
}
@@ -1080,6 +1092,15 @@ class PubSubWriter
10801092
return *this;
10811093
}
10821094

1095+
PubSubWriter& participants_allocation_properties(
1096+
size_t initial,
1097+
size_t maximum)
1098+
{
1099+
participant_qos_.allocation().participants.initial = initial;
1100+
participant_qos_.allocation().participants.maximum = maximum;
1101+
return *this;
1102+
}
1103+
10831104
PubSubWriter& expect_no_allocs()
10841105
{
10851106
// TODO(Mcc): Add no allocations check code when feature is completely ready

test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp

+17-5
Original file line numberDiff line numberDiff line change
@@ -599,16 +599,28 @@ class PubSubReader
599599
}
600600

601601
#if HAVE_SECURITY
602-
void waitAuthorized()
602+
void waitAuthorized(
603+
std::chrono::seconds timeout = std::chrono::seconds::zero(),
604+
unsigned int expected = 1)
603605
{
604606
std::unique_lock<std::mutex> lock(mutexAuthentication_);
605607

606608
std::cout << "Reader is waiting authorization..." << std::endl;
607609

608-
cvAuthentication_.wait(lock, [&]() -> bool
609-
{
610-
return authorized_ > 0;
611-
});
610+
if (timeout == std::chrono::seconds::zero())
611+
{
612+
cvAuthentication_.wait(lock, [&]()
613+
{
614+
return authorized_ >= expected;
615+
});
616+
}
617+
else
618+
{
619+
cvAuthentication_.wait_for(lock, timeout, [&]()
620+
{
621+
return authorized_ >= expected;
622+
});
623+
}
612624

613625
std::cout << "Reader authorization finished..." << std::endl;
614626
}

test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp

+17-5
Original file line numberDiff line numberDiff line change
@@ -561,16 +561,28 @@ class PubSubWriter
561561
}
562562

563563
#if HAVE_SECURITY
564-
void waitAuthorized()
564+
void waitAuthorized(
565+
std::chrono::seconds timeout = std::chrono::seconds::zero(),
566+
unsigned int expected = 1)
565567
{
566568
std::unique_lock<std::mutex> lock(mutexAuthentication_);
567569

568570
std::cout << "Writer is waiting authorization..." << std::endl;
569571

570-
cvAuthentication_.wait(lock, [&]() -> bool
571-
{
572-
return authorized_ > 0;
573-
});
572+
if (timeout == std::chrono::seconds::zero())
573+
{
574+
cvAuthentication_.wait(lock, [&]()
575+
{
576+
return authorized_ >= expected;
577+
});
578+
}
579+
else
580+
{
581+
cvAuthentication_.wait_for(lock, timeout, [&]()
582+
{
583+
return authorized_ >= expected;
584+
});
585+
}
574586

575587
std::cout << "Writer authorization finished..." << std::endl;
576588
}

0 commit comments

Comments
 (0)