diff --git a/examples/standalone/comms/CMakeLists.txt b/examples/standalone/comms/CMakeLists.txt new file mode 100644 index 0000000000..a52dbfefea --- /dev/null +++ b/examples/standalone/comms/CMakeLists.txt @@ -0,0 +1,10 @@ +cmake_minimum_required(VERSION 3.10.2 FATAL_ERROR) + +project(ign-gazebo-comms) + +find_package(ignition-transport11 QUIET REQUIRED) +set(IGN_TRANSPORT_VER ${ignition-transport11_VERSION_MAJOR}) + +add_executable(publisher publisher.cc) +target_link_libraries(publisher + ignition-transport${IGN_TRANSPORT_VER}::core) diff --git a/examples/standalone/comms/publisher.cc b/examples/standalone/comms/publisher.cc new file mode 100644 index 0000000000..0579da04bf --- /dev/null +++ b/examples/standalone/comms/publisher.cc @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2022 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +// Example: ./publisher addr1 + +#include +#include +#include +#include +#include +#include + +#include +#include + +/// \brief Flag used to break the publisher loop and terminate the program. +static std::atomic g_terminatePub(false); + +////////////////////////////////////////////////// +/// \brief Usage function. +void usage() +{ + std::cerr << "./publisher " << std::endl; +} + +////////////////////////////////////////////////// +/// \brief Function callback executed when a SIGINT or SIGTERM signals are +/// captured. This is used to break the infinite loop that publishes messages +/// and exit the program smoothly. +void signal_handler(int _signal) +{ + if (_signal == SIGINT || _signal == SIGTERM) + g_terminatePub = true; +} + +////////////////////////////////////////////////// +int main(int argc, char **argv) +{ + if (argc != 2) + { + usage(); + return -1; + } + + // Install a signal handler for SIGINT and SIGTERM. + std::signal(SIGINT, signal_handler); + std::signal(SIGTERM, signal_handler); + + // Create a transport node and advertise a topic. + ignition::transport::Node node; + std::string topic = "/broker/msgs"; + + auto pub = node.Advertise(topic); + if (!pub) + { + std::cerr << "Error advertising topic [" << topic << "]" << std::endl; + return -1; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + // Prepare the message. + ignition::msgs::Dataframe msg; + msg.set_src_address("addr1"); + msg.set_dst_address(argv[1]); + + // Publish messages at 1Hz. + int counter = 0; + while (!g_terminatePub) + { + // Prepare the payload. + ignition::msgs::StringMsg payload; + payload.set_data("hello world " + std::to_string(counter)); + std::string serializedData; + if (!payload.SerializeToString(&serializedData)) + { + std::cerr << "Error serializing message\n" + << payload.DebugString() << std::endl; + } + msg.set_data(serializedData); + + if (!pub.Publish(msg)) + break; + + ++counter; + + std::cout << "Publishing hello on topic [" << topic << "]" << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + + return 0; +} diff --git a/examples/worlds/perfect_comms.sdf b/examples/worlds/perfect_comms.sdf new file mode 100644 index 0000000000..ff88ce7bea --- /dev/null +++ b/examples/worlds/perfect_comms.sdf @@ -0,0 +1,177 @@ + + + + + + + 0.001 + 1.0 + + + + + + + + + + + + true + 0 0 10 0 0 0 + 1 1 1 1 + 0.5 0.5 0.5 1 + + 1000 + 0.9 + 0.01 + 0.001 + + -0.5 0.1 -0.9 + + + + true + + + + + 0 0 1 + 100 100 + + + + + + + 0 0 1 + 100 100 + + + + 0.8 0.8 0.8 1 + 0.8 0.8 0.8 1 + 0.8 0.8 0.8 1 + + + + + + + 2 0 0.5 0 0 0 + + + + 0.16666 + 0 + 0 + 0.16666 + 0 + 0.16666 + + 1.0 + + + + + 1 1 1 + + + + + + + + 1 1 1 + + + + 1 0 0 1 + 1 0 0 1 + 1 0 0 1 + + + + + +
addr1
+ addr1/rx +
+
+ + + -2 0 0.5 0 0 0 + + + + 0.16666 + 0 + 0 + 0.16666 + 0 + 0.16666 + + 1.0 + + + + + 1 1 1 + + + + + + + + 1 1 1 + + + + 0 0 1 1 + 0 0 1 1 + 0 0 1 1 + + + + + +
addr2
+ addr2/rx + + /broker/bind + /broker/unbind + +
+ +
+ +
+
diff --git a/include/ignition/gazebo/comms/Broker.hh b/include/ignition/gazebo/comms/Broker.hh new file mode 100644 index 0000000000..bf06cdf383 --- /dev/null +++ b/include/ignition/gazebo/comms/Broker.hh @@ -0,0 +1,154 @@ +/* + * Copyright (C) 2022 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef IGNITION_GAZEBO_BROKER_HH_ +#define IGNITION_GAZEBO_BROKER_HH_ + +#include + +#include +#include +#include "ignition/gazebo/comms/MsgManager.hh" +#include "ignition/gazebo/config.hh" + +namespace ignition +{ +namespace msgs +{ + // Forward declarations. + class Boolean; + class Dataframe; + class StringMsg_V; +} +namespace gazebo +{ +// Inline bracket to help doxygen filtering. +inline namespace IGNITION_GAZEBO_VERSION_NAMESPACE { +namespace comms +{ + // Forward declarations. + class MsgManager; + + /// \brief A class to store messages to be delivered using a comms model. + /// This class should be used in combination with a specific comms model that + /// implements the ICommsModel interface. + /// \sa ICommsModel.hh + /// The broker maintains two queues: inbound and outbound. When a client + /// sends a communication request, we'll store it in the outbound queue of + /// the sender's address. When the comms model decides that a message needs + /// to be delivered to one of the destination, it'll be stored in the inbound + /// queue of the destination's address. + /// + /// The main goal of this class is to receive the comms requests, stamp the + /// time, and place them in the appropriate outbound queue, as well as deliver + /// the messages that are in the inbound queues. + /// + /// The instance of the comms model is responsible for moving around the + /// messages from the outbound queues to the inbound queues. + /// + /// The broker can be configured with the following SDF parameters: + /// + /// * Optional parameters: + /// Element used to capture the broker parameters. This block can + /// contain any of the next parameters: + /// : Topic name where the broker receives all the incoming + /// messages. The default value is "/broker/msgs" + /// : Service name used to bind an address. + /// The default value is "/broker/bind" + /// : Service name used to unbind from an address. + /// The default value is "/broker/unbind" + /// + /// Here's an example: + /// + /// + /// /broker/inbound + /// /broker/bind_address + /// /broker/unbind_address + /// + /// + class IGNITION_GAZEBO_VISIBLE Broker + { + /// \brief Constructor. + public: Broker(); + + /// \brief Configure the broker via SDF. + /// \param[in] _sdf The SDF Element associated with the broker parameters. + public: void Load(std::shared_ptr _sdf); + + /// \brief Start handling comms services. + /// + /// This function allows us to wait to advertise capabilities to + /// clients until the broker has been entirely initialized. + public: void Start(); + + /// \brief Get the current time. + /// \return Current time. + public: std::chrono::steady_clock::duration Time() const; + + /// \brief Set the current time. + /// \param[in] _time Current time. + public: void SetTime(const std::chrono::steady_clock::duration &_time); + + /// \brief This method associates an address with a client topic used as + /// callback for receiving messages. This is a client requirement to + /// start receiving messages. + /// \param[in] _req Bind request containing the following content: + /// _req[0] Client address. + /// _req[1] Model name associated to the address. + /// _req[2] Client subscription topic. + /// \param[out] _rep Unused + /// \return True when the bind service succeeded or false otherwise. + public: bool OnBind(const ignition::msgs::StringMsg_V &_req, + ignition::msgs::Boolean &_rep); + + /// \brief Unbind a given client address. The client associated to this + /// address will not receive any more messages. + /// \param[in] _req Bind request containing the following content: + /// _req[0] Client address. + /// _req[1] Client subscription topic. + public: void OnUnbind(const ignition::msgs::StringMsg_V &_req); + + /// \brief Callback executed to process a communication request from one of + /// the clients. + /// \param[in] _msg The message from the client. + public: void OnMsg(const ignition::msgs::Dataframe &_msg); + + /// \brief Process all the messages in the inbound queue and deliver them + /// to the destination clients. + public: void DeliverMsgs(); + + /// \brief Get a mutable reference to the message manager. + /// \return The mutable reference. + public: MsgManager &DataManager(); + + /// \brief Lock the mutex to access the message manager. + public: void Lock(); + + /// \brief Unlock the mutex to access the message manager. + public: void Unlock(); + + /// \brief Private data pointer. + IGN_UTILS_UNIQUE_IMPL_PTR(dataPtr) + }; +} +} +} +} + +#endif diff --git a/include/ignition/gazebo/comms/ICommsModel.hh b/include/ignition/gazebo/comms/ICommsModel.hh new file mode 100644 index 0000000000..367cbe975b --- /dev/null +++ b/include/ignition/gazebo/comms/ICommsModel.hh @@ -0,0 +1,135 @@ +/* + * Copyright (C) 2022 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef IGNITION_GAZEBO_ICOMMSMODEL_HH_ +#define IGNITION_GAZEBO_ICOMMSMODEL_HH_ + +#include + +#include +#include +#include "ignition/gazebo/comms/MsgManager.hh" +#include "ignition/gazebo/config.hh" +#include "ignition/gazebo/System.hh" + +namespace ignition +{ +namespace gazebo +{ +// Inline bracket to help doxygen filtering. +inline namespace IGNITION_GAZEBO_VERSION_NAMESPACE { + + // Forward declarations + class EntityComponentManager; + class EventManager; + +namespace comms +{ + /// \brief Abstract interface to define how the environment should handle + /// communication simulation. As an example, this class could be responsible + /// for handling dropouts, decay and packet collisions. + /// + /// The derived comms models can be configured with the following SDF + /// parameters: + /// + /// * Optional parameters: + /// If defined this will allow the comms model to run at a + /// higher frequency than the physics engine. This is useful when dealing + /// with ranging. If the is set larger than the physics engine dt + /// then the comms model step size will default to dt. + /// Note: for consistency it is adviced that the dt is a multiple of timestep. + /// Units are in seconds. + /// + /// Here's an example: + /// + /// 2 + /// 1.0 + /// + /// + /// 1 + /// + class IGNITION_GAZEBO_VISIBLE ICommsModel: +#ifdef _MSC_VER + #pragma warning(push) + #pragma warning(disable:4275) +#endif + public System, +#ifdef _MSC_VER + #pragma warning(pop) +#endif + public ISystemConfigure, + public ISystemPreUpdate + { + /// \brief Constructor. + public: explicit ICommsModel(); + + // Documentation inherited. + public: void Configure(const Entity &_entity, + const std::shared_ptr &_sdf, + EntityComponentManager &_ecm, + EventManager &_eventMgr) override; + + // Documentation inherited. + public: void PreUpdate( + const ignition::gazebo::UpdateInfo &_info, + ignition::gazebo::EntityComponentManager &_ecm) override; + + /// \brief This method is called when there is a timestep in the simulator. + /// \param[in] _info Simulator information about the current timestep. + /// will become the new registry. + /// \param[in] _ecm - Ignition's ECM. + public: virtual void StepImpl(const UpdateInfo &_info, + EntityComponentManager &_ecm); + + /// \brief This method is called when the system is being configured + /// override this to load any additional params for the comms model + /// \param[in] _entity The entity this plugin is attached to. + /// \param[in] _sdf The SDF Element associated with this system plugin. + /// \param[in] _ecm The EntityComponentManager of the given simulation + /// instance. + /// \param[in] _eventMgr The EventManager of the given simulation instance. + public: virtual void Load(const Entity &_entity, + std::shared_ptr _sdf, + EntityComponentManager &_ecm, + EventManager &_eventMgr) = 0; + + /// \brief This method is called when there is a timestep in the simulator + /// override this to update your data structures as needed. + /// + /// Note: this is an experimental interface and might change in the future. + /// + /// \param[in] _info Simulator information about the current timestep. + /// \param[in] _currentRegistry The current registry. + /// \param[out] _newRegistry The new registry. When Step() is finished this + /// will become the new registry. + /// \param[in] _ecm - Ignition's ECM. + public: virtual void Step(const UpdateInfo &_info, + const Registry &_currentRegistry, + Registry &_newRegistry, + EntityComponentManager &_ecm) = 0; + + /// \brief Private data pointer. + IGN_UTILS_UNIQUE_IMPL_PTR(dataPtr) + }; +} +} +} +} + +#endif diff --git a/include/ignition/gazebo/comms/MsgManager.hh b/include/ignition/gazebo/comms/MsgManager.hh new file mode 100644 index 0000000000..aa5bde7f4d --- /dev/null +++ b/include/ignition/gazebo/comms/MsgManager.hh @@ -0,0 +1,159 @@ +/* + * Copyright (C) 2022 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef IGNITION_GAZEBO_MSGMANAGER_HH_ +#define IGNITION_GAZEBO_MSGMANAGER_HH_ + +#include +#include +#include +#include + +#include +#include +#include "ignition/gazebo/config.hh" +#include "ignition/gazebo/Entity.hh" +#include "ignition/gazebo/System.hh" + +namespace ignition +{ +namespace msgs +{ + // Forward declarations. + class Dataframe; +} +namespace gazebo +{ +// Inline bracket to help doxygen filtering. +inline namespace IGNITION_GAZEBO_VERSION_NAMESPACE { +namespace comms +{ + +/// \brief A queue of message pointers. +using DataQueue = std::deque; + +/// \brief A map where the key is the topic subscribed to an address and +/// the value is a publisher to reach that topic. +using SubscriptionHandler = + std::unordered_map; + +/// \brief All the information associated to an address. +struct AddressContent +{ + /// \brief Queue of inbound messages. + public: DataQueue inboundMsgs; + + /// \brief Queue of outbound messages. + public: DataQueue outboundMsgs; + + /// \brief Subscribers. + public: SubscriptionHandler subscriptions; + + /// \brief Model name associated to this address. + public: std::string modelName; + + /// \brief Entity of the model associated to this address. + public: gazebo::Entity entity; +}; + +/// \brief A map where the key is an address and the value is all the +/// information associated to each address (subscribers, queues, ...). +using Registry = std::unordered_map; + +/// \brief Class to handle messages and subscriptions. +class IGNITION_GAZEBO_VISIBLE MsgManager +{ + /// \brief Default constructor. + public: MsgManager(); + + /// \brief Add a new subscriber. It's possible to associate multiple topics + /// to the same address/model pair. However, the same address cannot be + /// attached to multiple models. When all the subscribers are removed, it's + /// posible to bind to this address using a different model. + /// \param[in] _address The subscriber address. + /// \param[in] _modelName The model name. + /// \param[in] _topic The subscriber topic. + /// \return True if the subscriber was successfully added or false otherwise. + public: bool AddSubscriber(const std::string &_address, + const std::string &_modelName, + const std::string &_topic); + + /// \brief Add a new message to the inbound queue. + /// \param[in] _address The destination address. + /// \param[in] _msg The message. + public: void AddInbound(const std::string &_address, + const msgs::DataframeSharedPtr &_msg); + + /// \brief Add a new message to the outbound queue. + /// \param[in] _address The sender address. + /// \param[in] _msg The message. + public: void AddOutbound(const std::string &_address, + const msgs::DataframeSharedPtr &_msg); + + /// \brief Remove an existing subscriber. + /// \param[in] _address The subscriber address. + /// \param[in] _topic The Subscriber topic. + /// \return True if the subscriber was removed or false otherwise. + public: bool RemoveSubscriber(const std::string &_address, + const std::string &_topic); + + /// \brief Remove a message from the inbound queue. + /// \param[in] _address The destination address. + /// \param[in] _Msg Message pointer to remove. + /// \return True if the message was removed or false otherwise. + public: bool RemoveInbound(const std::string &_address, + const msgs::DataframeSharedPtr &_msg); + + /// \brief Remove a message from the outbound queue. + /// \param[in] _address The sender address. + /// \param[in] _msg Message pointer to remove. + /// \return True if the message was removed or false otherwise. + public: bool RemoveOutbound(const std::string &_address, + const msgs::DataframeSharedPtr &_msg); + + /// \brief This function delivers all the messages in the inbound queue to + /// the appropriate subscribers. This function also clears the inbound queue. + public: void DeliverMsgs(); + + /// \brief Get an inmutable reference to the data containing subscriptions and + /// data queues. + /// \return A const reference to the data. + public: const Registry &DataConst() const; + + /// \brief Get a mutable reference to the data containing subscriptions and + /// data queues. + /// \return A mutable reference to the data. + public: Registry &Data(); + + /// \brief Get a copy of the data structure containing subscriptions and data + /// queues. + /// \return A copy of the data. + public: Registry Copy() const; + + /// \brief Set the data structure containing subscriptions and data queues. + /// \param[in] _newContent New content to be set. + public: void Set(const Registry &_newContent); + + /// \brief Private data pointer. + IGN_UTILS_UNIQUE_IMPL_PTR(dataPtr) +}; +} +} +} +} + +#endif diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 764d1e0bd6..2aef8d0996 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -26,6 +26,12 @@ set(network_sources network/PeerTracker.cc ) +set(comms_sources + comms/Broker.cc + comms/ICommsModel.cc + comms/MsgManager.cc +) + set(gui_sources ${gui_sources} PARENT_SCOPE @@ -68,6 +74,7 @@ set (sources cmd/ModelCommandAPI.cc ${PROTO_PRIVATE_SRC} ${network_sources} + ${comms_sources} ) set (gtest_sources @@ -94,6 +101,8 @@ set (gtest_sources Util_TEST.cc World_TEST.cc ign_TEST.cc + comms/Broker_TEST.cc + comms/MsgManager_TEST.cc network/NetworkConfig_TEST.cc network/PeerTracker_TEST.cc network/NetworkManager_TEST.cc diff --git a/src/comms/Broker.cc b/src/comms/Broker.cc new file mode 100644 index 0000000000..8c33c5fa21 --- /dev/null +++ b/src/comms/Broker.cc @@ -0,0 +1,219 @@ +/* + * Copyright (C) 2022 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include + +#include +#include +#include + +#include +#include "ignition/gazebo/comms/Broker.hh" +#include "ignition/gazebo/comms/MsgManager.hh" +#include "ignition/gazebo/Conversions.hh" +#include "ignition/gazebo/Util.hh" + +/// \brief Private Broker data class. +class ignition::gazebo::comms::Broker::Implementation +{ + /// \brief The message manager. + public: MsgManager data; + + /// \brief Protect data from races. + public: std::mutex mutex; + + /// \brief Topic used to centralize all messages sent from the agents. + public: std::string msgTopic = "/broker/msgs"; + + /// \brief Service used to bind to an address. + public: std::string bindSrv = "/broker/bind"; + + /// \brief Service used to unbind from an address. + public: std::string unbindSrv = "/broker/unbind"; + + /// \brief The current time. + public: std::chrono::steady_clock::duration time{0}; + + /// \brief An Ignition Transport node for communications. + public: std::unique_ptr node; +}; + +using namespace ignition; +using namespace gazebo; +using namespace comms; + +////////////////////////////////////////////////// +Broker::Broker() + : dataPtr(ignition::utils::MakeUniqueImpl()) +{ + this->dataPtr->node = std::make_unique(); +} + +////////////////////////////////////////////////// +void Broker::Load(std::shared_ptr _sdf) +{ + if (!_sdf->HasElement("broker")) + return; + + sdf::ElementPtr elem = _sdf->Clone()->GetElement("broker"); + this->dataPtr->msgTopic = + elem->Get("messages_topic", this->dataPtr->msgTopic).first; + this->dataPtr->bindSrv = + elem->Get("bind_service", this->dataPtr->bindSrv).first; + this->dataPtr->unbindSrv = + elem->Get("unbind_service", this->dataPtr->unbindSrv).first; +} + +////////////////////////////////////////////////// +void Broker::Start() +{ + // Advertise the service for binding addresses. + if (!this->dataPtr->node->Advertise(this->dataPtr->bindSrv, + &Broker::OnBind, this)) + { + ignerr << "Error advertising srv [" << this->dataPtr->bindSrv << "]" + << std::endl; + return; + } + + // Advertise the service for unbinding addresses. + if (!this->dataPtr->node->Advertise(this->dataPtr->unbindSrv, + &Broker::OnUnbind, this)) + { + ignerr << "Error advertising srv [" << this->dataPtr->unbindSrv << "]" + << std::endl; + return; + } + + // Advertise the topic for receiving data messages. + if (!this->dataPtr->node->Subscribe(this->dataPtr->msgTopic, + &Broker::OnMsg, this)) + { + ignerr << "Error subscribing to topic [" << this->dataPtr->msgTopic << "]" + << std::endl; + return; + } + + igndbg << "Broker services:" << std::endl; + igndbg << " Bind: [" << this->dataPtr->bindSrv << "]" << std::endl; + igndbg << " Unbind: [" << this->dataPtr->unbindSrv << "]" << std::endl; + igndbg << "Broker topics:" << std::endl; + igndbg << " Incoming messages: [" << this->dataPtr->msgTopic << "]" + << std::endl; +} + +////////////////////////////////////////////////// +std::chrono::steady_clock::duration Broker::Time() const +{ + return this->dataPtr->time; +} + +////////////////////////////////////////////////// +void Broker::SetTime(const std::chrono::steady_clock::duration &_time) +{ + this->dataPtr->time = _time; +} + +////////////////////////////////////////////////// +bool Broker::OnBind(const ignition::msgs::StringMsg_V &_req, + ignition::msgs::Boolean &/*_rep*/) +{ + auto count = _req.data_size(); + if (count != 3) + { + ignerr << "Receive incorrect number of arguments. " + << "Expecting 3 and receive " << count << std::endl; + return false; + } + + std::string address = _req.data(0); + std::string model = _req.data(1); + std::string topic = _req.data(2); + + std::lock_guard lock(this->dataPtr->mutex); + + if (!this->DataManager().AddSubscriber(address, model, topic)) + return false; + + ignmsg << "Address [" << address << "] bound to model [" << model + << "] on topic [" << topic << "]" << std::endl; + + return true; +} + +////////////////////////////////////////////////// +void Broker::OnUnbind(const ignition::msgs::StringMsg_V &_req) +{ + auto count = _req.data_size(); + if (count != 2) + { + ignerr << "Received incorrect number of arguments. " + << "Expecting 2 and received " << count << std::endl; + return; + } + + std::string address = _req.data(0); + std::string topic = _req.data(1); + + std::lock_guard lock(this->dataPtr->mutex); + this->DataManager().RemoveSubscriber(address, topic); + + ignmsg << "Address [" << address << "] unbound on topic [" + << topic << "]" << std::endl; +} + +////////////////////////////////////////////////// +void Broker::OnMsg(const ignition::msgs::Dataframe &_msg) +{ + // Place the message in the outbound queue of the sender. + auto msgPtr = std::make_shared(_msg); + + std::lock_guard lock(this->dataPtr->mutex); + + // Stamp the time. + msgPtr->mutable_header()->mutable_stamp()->CopyFrom( + gazebo::convert(this->dataPtr->time)); + + this->DataManager().AddOutbound(_msg.src_address(), msgPtr); +} + +////////////////////////////////////////////////// +void Broker::DeliverMsgs() +{ + std::lock_guard lock(this->dataPtr->mutex); + this->DataManager().DeliverMsgs(); +} + +////////////////////////////////////////////////// +MsgManager &Broker::DataManager() +{ + return this->dataPtr->data; +} + +////////////////////////////////////////////////// +void Broker::Lock() +{ + this->dataPtr->mutex.lock(); +} + +////////////////////////////////////////////////// +void Broker::Unlock() +{ + this->dataPtr->mutex.unlock(); +} diff --git a/src/comms/Broker_TEST.cc b/src/comms/Broker_TEST.cc new file mode 100644 index 0000000000..66e263fa9b --- /dev/null +++ b/src/comms/Broker_TEST.cc @@ -0,0 +1,103 @@ +/* + * Copyright (C) 2022 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +#include +#include +#include +#include + +#include "ignition/gazebo/comms/Broker.hh" +#include "ignition/gazebo/comms/MsgManager.hh" +#include "helpers/EnvTestFixture.hh" + +using namespace ignition; +using namespace gazebo; + +/// \brief Tests for Broker class +class BrokerTest : public InternalFixture<::testing::Test> +{ +}; + +///////////////////////////////////////////////// +TEST_F(BrokerTest, Broker) +{ + comms::Broker broker; + + // Test locking / unlocking and accessing data. + EXPECT_NO_THROW(broker.Start()); + EXPECT_NO_THROW(broker.Lock()); + auto &allData = broker.DataManager().Data(); + EXPECT_TRUE(allData.empty()); + EXPECT_NO_THROW(broker.Unlock()); + + // Test manually binding with an incorrect number of arguments. + msgs::StringMsg_V wrongReqBind; + wrongReqBind.add_data("addr1"); + wrongReqBind.add_data("model1"); + ignition::msgs::Boolean unused; + EXPECT_FALSE(broker.OnBind(wrongReqBind, unused)); + allData = broker.DataManager().Data(); + EXPECT_EQ(0u, allData.size()); + + // Test manually binding address and topic. + msgs::StringMsg_V reqBind; + reqBind.add_data("addr1"); + reqBind.add_data("model1"); + reqBind.add_data("topic"); + EXPECT_TRUE(broker.OnBind(reqBind, unused)); + EXPECT_EQ(1u, allData.size()); + EXPECT_EQ(1u, allData["addr1"].subscriptions.size()); + EXPECT_NE(allData["addr1"].subscriptions.end(), + allData["addr1"].subscriptions.find("topic")); + + // Test manually adding a msg. + EXPECT_TRUE(allData["addr1"].outboundMsgs.empty()); + msgs::Dataframe msg; + msg.set_src_address("addr1"); + broker.OnMsg(msg); + EXPECT_EQ(1u, allData["addr1"].outboundMsgs.size()); + EXPECT_EQ("addr1", allData["addr1"].outboundMsgs[0u]->src_address()); + + // Test manually unbinding with an incorrect number of arguments. + msgs::StringMsg_V wrongReqUnbind; + wrongReqUnbind.add_data("addr1"); + broker.OnUnbind(wrongReqUnbind); + EXPECT_EQ(1u, allData.size()); + EXPECT_FALSE(allData["addr1"].subscriptions.empty()); + + // Test manually unbinding address and topic. + msgs::StringMsg_V reqUnbind; + reqUnbind.add_data("addr1"); + reqUnbind.add_data("topic"); + broker.OnUnbind(reqUnbind); + EXPECT_EQ(1u, allData.size()); + EXPECT_TRUE(allData["addr1"].subscriptions.empty()); + + // Test msg delivery. + auto msgIn = std::make_shared(); + allData["addr2"].inboundMsgs.push_back(msgIn); + EXPECT_EQ(1u, allData["addr2"].inboundMsgs.size()); + broker.DeliverMsgs(); + EXPECT_TRUE(allData["addr2"].inboundMsgs.empty()); + + // Test time. + const std::chrono::steady_clock::duration time0{0}; + const std::chrono::steady_clock::duration time1{1}; + EXPECT_EQ(time0, broker.Time()); + broker.SetTime(time1); + EXPECT_EQ(time1, broker.Time()); +} diff --git a/src/comms/ICommsModel.cc b/src/comms/ICommsModel.cc new file mode 100644 index 0000000000..7b0074821c --- /dev/null +++ b/src/comms/ICommsModel.cc @@ -0,0 +1,128 @@ +/* + * Copyright (C) 2022 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include +#include + +#include +#include +#include "ignition/gazebo/comms/Broker.hh" +#include "ignition/gazebo/comms/ICommsModel.hh" +#include "ignition/gazebo/comms/MsgManager.hh" +#include "ignition/gazebo/EntityComponentManager.hh" +#include "ignition/gazebo/EventManager.hh" + +using namespace ignition; +using namespace gazebo; +using namespace comms; + +/// \brief Private ICommsModel data class. +class ignition::gazebo::comms::ICommsModel::Implementation +{ + /// \brief Broker instance. + public: Broker broker; + + /// \brief The step size for each step iteration. + public: std::optional + timeStep = std::nullopt; + + /// \brief Current time. + public: std::chrono::steady_clock::time_point currentTime; +}; + +////////////////////////////////////////////////// +ICommsModel::ICommsModel() + : dataPtr(ignition::utils::MakeUniqueImpl()) +{ +} + +////////////////////////////////////////////////// +void ICommsModel::Configure(const Entity &_entity, + const std::shared_ptr &_sdf, + EntityComponentManager &_ecm, + EventManager &_eventMgr) +{ + // Parse the optional . + if (_sdf->HasElement("step_size")) + { + this->dataPtr->timeStep = std::chrono::duration( + static_cast(_sdf->Get("step_size") * 1e9)); + } + this->Load(_entity, _sdf, _ecm, _eventMgr); + this->dataPtr->broker.Load(_sdf); + this->dataPtr->broker.Start(); +} + +////////////////////////////////////////////////// +void ICommsModel::PreUpdate(const ignition::gazebo::UpdateInfo &_info, + ignition::gazebo::EntityComponentManager &_ecm) +{ + IGN_PROFILE("ICommsModel::PreUpdate"); + + if (_info.paused) + return; + + this->dataPtr->currentTime = + std::chrono::steady_clock::time_point(_info.simTime); + + if (!this->dataPtr->timeStep.has_value()) + { + // If no step_size is defined simply execute one step of the comms model + this->StepImpl(_info, _ecm); + } + else + { + // Otherwise step at the specified time step until we converge on the + // final timestep. If the timestep is larger than the dt, then dt will + // be used. + auto endTime = this->dataPtr->currentTime + _info.dt; + + while (this->dataPtr->currentTime < endTime) + { + ignition::gazebo::UpdateInfo info(_info); + info.dt = std::min(this->dataPtr->timeStep.value(), _info.dt); + info.simTime = this->dataPtr->currentTime.time_since_epoch(); + this->StepImpl(_info, _ecm); + this->dataPtr->currentTime += info.dt; + } + } +} + +////////////////////////////////////////////////// +void ICommsModel::StepImpl(const UpdateInfo &_info, + EntityComponentManager &_ecm) +{ + // We lock while we manipulate data. + this->dataPtr->broker.Lock(); + + // Update the time in the broker. + this->dataPtr->broker.SetTime(_info.simTime); + + // Step the comms model. + const Registry ¤tRegistry = + this->dataPtr->broker.DataManager().DataConst(); + Registry newRegistry = this->dataPtr->broker.DataManager().Copy(); + this->Step(_info, currentRegistry, newRegistry, _ecm); + this->dataPtr->broker.DataManager().Set(newRegistry); + + this->dataPtr->broker.Unlock(); + + // Deliver the inbound messages. + this->dataPtr->broker.DeliverMsgs(); +} diff --git a/src/comms/MsgManager.cc b/src/comms/MsgManager.cc new file mode 100644 index 0000000000..4219588099 --- /dev/null +++ b/src/comms/MsgManager.cc @@ -0,0 +1,187 @@ +/* + * Copyright (C) 2022 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include +#include + +#include +#include +#include "ignition/gazebo/config.hh" +#include "ignition/gazebo/comms/MsgManager.hh" + +/// \brief Private MsgManager data class. +class ignition::gazebo::comms::MsgManager::Implementation +{ + /// \brief Buffer to store the content associated to each address. + /// The key is an address. The value contains all the information associated + /// to the address. + public: Registry data; + + /// \brief An Ignition Transport node for communications. + public: std::unique_ptr node; +}; + +using namespace ignition; +using namespace gazebo; +using namespace comms; + +////////////////////////////////////////////////// +MsgManager::MsgManager() + : dataPtr(ignition::utils::MakeUniqueImpl()) +{ + this->dataPtr->node = std::make_unique(); +} + +////////////////////////////////////////////////// +bool MsgManager::AddSubscriber(const std::string &_address, + const std::string &_modelName, + const std::string &_topic) +{ + auto it = this->dataPtr->data.find(_address); + if (it != this->dataPtr->data.end()) + { + if (!it->second.modelName.empty() && it->second.modelName != _modelName) + { + ignerr << "AddSubscriber() error: Address already attached to a different" + << " model" << std::endl; + return false; + } + } + this->dataPtr->data[_address].modelName = _modelName; + + ignition::transport::Node::Publisher publisher = + this->dataPtr->node->Advertise(_topic); + + this->dataPtr->data[_address].subscriptions[_topic] = publisher; + return true; +} + +////////////////////////////////////////////////// +void MsgManager::AddInbound(const std::string &_address, + const msgs::DataframeSharedPtr &_msg) +{ + this->dataPtr->data[_address].inboundMsgs.push_back(_msg); +} + +////////////////////////////////////////////////// +void MsgManager::AddOutbound(const std::string &_address, + const msgs::DataframeSharedPtr &_msg) +{ + this->dataPtr->data[_address].outboundMsgs.push_back(_msg); +} + +////////////////////////////////////////////////// +bool MsgManager::RemoveSubscriber(const std::string &_address, + const std::string &_topic) +{ + auto it = this->dataPtr->data.find(_address); + if (it == this->dataPtr->data.end()) + { + ignerr << "RemoveSubscriber() error: Unable to find address [" + << _address << "]" << std::endl; + return false; + } + + auto res = it->second.subscriptions.erase(_topic) > 0; + + // It there are no subscribers we clear the model name. This way the address + // can be bound to a separate model. We also clear the queues. + if (it->second.subscriptions.empty()) + it->second.modelName = ""; + + return res; +} + +////////////////////////////////////////////////// +bool MsgManager::RemoveInbound(const std::string &_address, + const msgs::DataframeSharedPtr &_msg) +{ + auto it = this->dataPtr->data.find(_address); + if (it == this->dataPtr->data.end()) + { + ignerr << "RemoveInbound() error: Unable to find address [" + << _address << "]" << std::endl; + return false; + } + + auto &q = it->second.inboundMsgs; + q.erase(std::remove(q.begin(), q.end(), _msg), q.end()); + return true; +} + +////////////////////////////////////////////////// +bool MsgManager::RemoveOutbound(const std::string &_address, + const msgs::DataframeSharedPtr &_msg) +{ + auto it = this->dataPtr->data.find(_address); + if (it == this->dataPtr->data.end()) + { + ignerr << "RemoveOutbound() error: Unable to find address [" + << _address << "]" << std::endl; + return false; + } + + auto &q = it->second.outboundMsgs; + q.erase(std::remove(q.begin(), q.end(), _msg), q.end()); + return true; +} + +////////////////////////////////////////////////// +void MsgManager::DeliverMsgs() +{ + for (auto & [address, content] : this->dataPtr->data) + { + // Reference to the inbound queue for this address. + auto &inbound = content.inboundMsgs; + + // All these messages need to be delivered. + for (auto &msg : inbound) + { + // Use the publisher associated to the destination address. + for (auto & [topic, publisher] : content.subscriptions) + publisher.Publish(*msg); + } + + content.inboundMsgs.clear(); + } +} + +////////////////////////////////////////////////// +const Registry &MsgManager::DataConst() const +{ + return this->dataPtr->data; +} + +////////////////////////////////////////////////// +Registry &MsgManager::Data() +{ + return this->dataPtr->data; +} + +////////////////////////////////////////////////// +Registry MsgManager::Copy() const +{ + return this->dataPtr->data; +} + +////////////////////////////////////////////////// +void MsgManager::Set(const Registry &_newContent) +{ + this->dataPtr->data = _newContent; +} diff --git a/src/comms/MsgManager_TEST.cc b/src/comms/MsgManager_TEST.cc new file mode 100644 index 0000000000..b0e0989405 --- /dev/null +++ b/src/comms/MsgManager_TEST.cc @@ -0,0 +1,146 @@ +/* + * Copyright (C) 2022 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +#include +#include + +#include + +#include "ignition/gazebo/comms/MsgManager.hh" +#include "helpers/EnvTestFixture.hh" + +using namespace ignition; +using namespace gazebo; + +/// \brief Tests for MsgManager class +class MsgManagerTest : public InternalFixture<::testing::Test> +{ +}; + +///////////////////////////////////////////////// +TEST_F(MsgManagerTest, MsgManager) +{ + comms::MsgManager msgManager; + + EXPECT_TRUE(msgManager.Data().empty()); + EXPECT_TRUE(msgManager.Copy().empty()); + + // Test subscriber. + EXPECT_TRUE(msgManager.AddSubscriber("addr1", "model1", "topic1")); + EXPECT_EQ(1u, msgManager.Data().size()); + EXPECT_NE(msgManager.Data().end(), msgManager.Data().find("addr1")); + EXPECT_EQ(1u, msgManager.Data()["addr1"].subscriptions.size()); + EXPECT_NE(msgManager.Data()["addr1"].subscriptions.end(), + msgManager.Data()["addr1"].subscriptions.find("topic1")); + EXPECT_EQ("model1", msgManager.Data()["addr1"].modelName); + + // Try to bind to an address attached to another model. + EXPECT_FALSE(msgManager.AddSubscriber("addr1", "model2", "topic2")); + EXPECT_EQ(1u, msgManager.Data().size()); + EXPECT_NE(msgManager.Data().end(), msgManager.Data().find("addr1")); + EXPECT_EQ(1u, msgManager.Data()["addr1"].subscriptions.size()); + EXPECT_NE(msgManager.Data()["addr1"].subscriptions.end(), + msgManager.Data()["addr1"].subscriptions.find("topic1")); + EXPECT_EQ("model1", msgManager.Data()["addr1"].modelName); + + // Add an additional topic. + EXPECT_TRUE(msgManager.AddSubscriber("addr1", "model1", "topic2")); + EXPECT_EQ(1u, msgManager.Data().size()); + EXPECT_NE(msgManager.Data().end(), msgManager.Data().find("addr1")); + EXPECT_EQ(2u, msgManager.Data()["addr1"].subscriptions.size()); + EXPECT_NE(msgManager.Data()["addr1"].subscriptions.end(), + msgManager.Data()["addr1"].subscriptions.find("topic1")); + EXPECT_NE(msgManager.Data()["addr1"].subscriptions.end(), + msgManager.Data()["addr1"].subscriptions.find("topic2")); + EXPECT_EQ("model1", msgManager.Data()["addr1"].modelName); + + // Test inbound. + auto msgIn = std::make_shared(); + EXPECT_EQ(msgManager.Data().end(), msgManager.Data().find("addr2")); + msgManager.AddInbound("addr2", msgIn); + EXPECT_NE(msgManager.Data().end(), msgManager.Data().find("addr2")); + EXPECT_FALSE(msgManager.Data()["addr2"].inboundMsgs.empty()); + EXPECT_EQ(msgIn, msgManager.Data()["addr2"].inboundMsgs[0]); + + // Test outbound. + auto msgOut = std::make_shared(); + EXPECT_EQ(msgManager.Data().end(), msgManager.Data().find("addr3")); + msgManager.AddOutbound("addr3", msgOut); + EXPECT_NE(msgManager.Data().end(), msgManager.Data().find("addr3")); + EXPECT_FALSE(msgManager.Data()["addr3"].outboundMsgs.empty()); + EXPECT_EQ(msgOut, msgManager.Data()["addr3"].outboundMsgs[0]); + + // Test msg removal. + EXPECT_FALSE(msgManager.RemoveInbound("not_found", msgIn)); + EXPECT_TRUE(msgManager.RemoveInbound("addr2", msgIn)); + EXPECT_TRUE(msgManager.Data()["addr2"].inboundMsgs.empty()); + EXPECT_FALSE(msgManager.RemoveOutbound("not_found", msgOut)); + EXPECT_TRUE(msgManager.RemoveOutbound("addr3", msgOut)); + EXPECT_TRUE(msgManager.Data()["addr3"].outboundMsgs.empty()); + + // Test msg delivery. + msgManager.AddInbound("addr4", msgIn); + EXPECT_FALSE(msgManager.Data()["addr4"].inboundMsgs.empty()); + msgManager.DeliverMsgs(); + EXPECT_TRUE(msgManager.Data()["addr4"].inboundMsgs.empty()); + + // Test subscriber removal. + msgManager.RemoveSubscriber("addr1", "topic1"); + EXPECT_NE(msgManager.Data().end(), msgManager.Data().find("addr1")); + EXPECT_EQ(1u, msgManager.Data()["addr1"].subscriptions.size()); + EXPECT_NE(msgManager.Data()["addr1"].subscriptions.end(), + msgManager.Data()["addr1"].subscriptions.find("topic2")); + msgManager.RemoveSubscriber("addr1", "topic2"); + EXPECT_NE(msgManager.Data().end(), msgManager.Data().find("addr1")); + EXPECT_EQ(0u, msgManager.Data()["addr1"].subscriptions.size()); + EXPECT_TRUE(msgManager.Data()["addr1"].modelName.empty()); + + // Without subscribers, we should be able to recycle the address for a + // different model. + EXPECT_TRUE(msgManager.AddSubscriber("addr1", "model2", "topic2")); + EXPECT_EQ(4u, msgManager.Data().size()); + EXPECT_NE(msgManager.Data().end(), msgManager.Data().find("addr1")); + EXPECT_EQ(1u, msgManager.Data()["addr1"].subscriptions.size()); + EXPECT_NE(msgManager.Data()["addr1"].subscriptions.end(), + msgManager.Data()["addr1"].subscriptions.find("topic2")); + EXPECT_EQ("model2", msgManager.Data()["addr1"].modelName); + + // Test setting msg content. + auto msgIn2 = std::make_shared(); + auto msgOut2 = std::make_shared(); + std::unordered_map content; + content["addr6"].inboundMsgs.push_back(msgIn2); + content["addr6"].outboundMsgs.push_back(msgOut2); + msgManager.Set(content); + EXPECT_TRUE(msgManager.Data()["addr6"].subscriptions.empty()); + EXPECT_EQ(1u, msgManager.Data()["addr6"].inboundMsgs.size()); + EXPECT_EQ(msgIn2, msgManager.Data()["addr6"].inboundMsgs[0u]); + EXPECT_EQ(1u, msgManager.Data()["addr6"].outboundMsgs.size()); + EXPECT_EQ(msgOut2, msgManager.Data()["addr6"].outboundMsgs[0u]); + + // Test copy. + EXPECT_TRUE(msgManager.Copy()["addr6"].subscriptions.empty()); + EXPECT_EQ(1u, msgManager.Copy()["addr6"].inboundMsgs.size()); + EXPECT_EQ(msgIn2, msgManager.Copy()["addr6"].inboundMsgs[0u]); + EXPECT_EQ(1u, msgManager.Copy()["addr6"].outboundMsgs.size()); + EXPECT_EQ(msgOut2, msgManager.Copy()["addr6"].outboundMsgs[0u]); + + // Test DataConst. + auto it = msgManager.DataConst().find("addr6"); + EXPECT_TRUE(it->second.subscriptions.empty()); + +} diff --git a/src/systems/CMakeLists.txt b/src/systems/CMakeLists.txt index 3983736a68..ad6ba7ba4a 100644 --- a/src/systems/CMakeLists.txt +++ b/src/systems/CMakeLists.txt @@ -102,6 +102,7 @@ add_subdirectory(breadcrumbs) add_subdirectory(buoyancy) add_subdirectory(buoyancy_engine) add_subdirectory(collada_world_exporter) +add_subdirectory(comms_endpoint) add_subdirectory(contact) add_subdirectory(camera_video_recorder) add_subdirectory(detachable_joint) @@ -136,6 +137,7 @@ add_subdirectory(optical_tactile_plugin) add_subdirectory(particle_emitter) add_subdirectory(particle_emitter2) add_subdirectory(performer_detector) +add_subdirectory(perfect_comms) add_subdirectory(physics) add_subdirectory(pose_publisher) add_subdirectory(scene_broadcaster) diff --git a/src/systems/comms_endpoint/CMakeLists.txt b/src/systems/comms_endpoint/CMakeLists.txt new file mode 100644 index 0000000000..c0ca01e034 --- /dev/null +++ b/src/systems/comms_endpoint/CMakeLists.txt @@ -0,0 +1,6 @@ +gz_add_system(comms-endpoint + SOURCES + CommsEndpoint.cc + PUBLIC_LINK_LIBS + ignition-common${IGN_COMMON_VER}::ignition-common${IGN_COMMON_VER} +) diff --git a/src/systems/comms_endpoint/CommsEndpoint.cc b/src/systems/comms_endpoint/CommsEndpoint.cc new file mode 100644 index 0000000000..98c4522bac --- /dev/null +++ b/src/systems/comms_endpoint/CommsEndpoint.cc @@ -0,0 +1,193 @@ +/* + * Copyright (C) 2022 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include "ignition/gazebo/Model.hh" +#include "ignition/gazebo/Util.hh" +#include "CommsEndpoint.hh" + +using namespace ignition; +using namespace gazebo; +using namespace systems; + +class ignition::gazebo::systems::CommsEndpoint::Implementation +{ + /// \brief Send the bind request. + public: void Bind(); + + /// \brief Service response callback. + /// \brief \param[in] _rep Unused. + /// \brief \param[in] _result Bind result. + public: void BindCallback(const ignition::msgs::Boolean &_rep, + const bool _result); + + /// \brief The address. + public: std::string address; + + /// \brief The topic where the messages will be delivered. + public: std::string topic; + + /// \brief Model interface + public: Model model{kNullEntity}; + + /// \brief True when the address has been bound in the broker. + public: std::atomic_bool bound{false}; + + /// \brief Service where the broker is listening bind requests. + public: std::string bindSrv = "/broker/bind"; + + /// \brief Service where the broker is listening unbind requests. + public: std::string unbindSrv = "/broker/unbind"; + + /// \brief Message to send the bind request. + public: ignition::msgs::StringMsg_V bindReq; + + /// \brief Message to send the unbind request. + public: ignition::msgs::StringMsg_V unbindReq; + + /// \brief Time between bind retries (secs). + public: std::chrono::steady_clock::duration bindRequestPeriod{1}; + + /// \brief Last simulation time we tried to bind. + public: std::chrono::steady_clock::duration lastBindRequestTime{-2}; + + /// \brief The ignition transport node. + public: std::unique_ptr node; +}; + +////////////////////////////////////////////////// +void CommsEndpoint::Implementation::BindCallback( + const ignition::msgs::Boolean &/*_rep*/, const bool _result) +{ + if (_result) + this->bound = true; + + igndbg << "Succesfuly bound to [" << this->address << "] on topic [" + << this->topic << "]" << std::endl; +} + +////////////////////////////////////////////////// +void CommsEndpoint::Implementation::Bind() +{ + this->node->Request(this->bindSrv, this->bindReq, + &CommsEndpoint::Implementation::BindCallback, this); +} + +////////////////////////////////////////////////// +CommsEndpoint::CommsEndpoint() + : dataPtr(ignition::utils::MakeUniqueImpl()) +{ + this->dataPtr->node = std::make_unique(); +} + +////////////////////////////////////////////////// +CommsEndpoint::~CommsEndpoint() +{ + if (!this->dataPtr->bound) + return; + + // Unbind. + // We use a oneway request because we're not going + // to be alive to check the result or retry. + this->dataPtr->node->Request( + this->dataPtr->unbindSrv, this->dataPtr->unbindReq); +} + +////////////////////////////////////////////////// +void CommsEndpoint::Configure(const Entity &_entity, + const std::shared_ptr &_sdf, + EntityComponentManager &_ecm, + EventManager &/*_eventMgr*/) +{ + // Parse
. + if (!_sdf->HasElement("address")) + { + ignerr << "No
specified." << std::endl; + return; + } + this->dataPtr->address = _sdf->Get("address"); + + // Parse . + if (!_sdf->HasElement("topic")) + { + ignerr << "No specified." << std::endl; + return; + } + this->dataPtr->topic = _sdf->Get("topic"); + + // Parse . + if (_sdf->HasElement("broker")) + { + sdf::ElementPtr elem = _sdf->Clone()->GetElement("broker"); + this->dataPtr->bindSrv = + elem->Get("bind_service", this->dataPtr->bindSrv).first; + this->dataPtr->unbindSrv = + elem->Get("unbind_service", this->dataPtr->unbindSrv).first; + } + + // Set model. + this->dataPtr->model = Model(_entity); + + // Prepare the bind parameters. + this->dataPtr->bindReq.add_data(this->dataPtr->address); + this->dataPtr->bindReq.add_data(this->dataPtr->model.Name(_ecm)); + this->dataPtr->bindReq.add_data(this->dataPtr->topic); + + // Prepare the unbind parameters. + this->dataPtr->unbindReq.add_data(this->dataPtr->address); + this->dataPtr->unbindReq.add_data(this->dataPtr->topic); +} + +////////////////////////////////////////////////// +void CommsEndpoint::PreUpdate( + const ignition::gazebo::UpdateInfo &_info, + ignition::gazebo::EntityComponentManager &/*_ecm*/) +{ + IGN_PROFILE("CommsEndpoint::PreUpdate"); + + if (this->dataPtr->bound) + return; + + auto elapsed = _info.simTime - this->dataPtr->lastBindRequestTime; + if (elapsed > std::chrono::steady_clock::duration::zero() && + elapsed < this->dataPtr->bindRequestPeriod) + { + return; + } + this->dataPtr->lastBindRequestTime = _info.simTime; + + // Let's try to bind. + this->dataPtr->Bind(); +} + +IGNITION_ADD_PLUGIN(CommsEndpoint, + ignition::gazebo::System, + CommsEndpoint::ISystemConfigure, + CommsEndpoint::ISystemPreUpdate) + +IGNITION_ADD_PLUGIN_ALIAS(CommsEndpoint, + "ignition::gazebo::systems::CommsEndpoint") diff --git a/src/systems/comms_endpoint/CommsEndpoint.hh b/src/systems/comms_endpoint/CommsEndpoint.hh new file mode 100644 index 0000000000..a21e9b3089 --- /dev/null +++ b/src/systems/comms_endpoint/CommsEndpoint.hh @@ -0,0 +1,95 @@ +/* + * Copyright (C) 2022 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef IGNITION_GAZEBO_SYSTEMS_COMMSENDPOINT_HH_ +#define IGNITION_GAZEBO_SYSTEMS_COMMSENDPOINT_HH_ + +#include + +#include +#include +#include "ignition/gazebo/System.hh" + +namespace ignition +{ +namespace gazebo +{ +// Inline bracket to help doxygen filtering. +inline namespace IGNITION_GAZEBO_VERSION_NAMESPACE { +namespace systems +{ + /// \brief A system that registers in the comms broker an endpoint. + /// You're creating an address attached to the model where the plugin is + /// running. The system will bind this address in the broker automatically + /// for you and unbind it when the model is destroyed. + /// + /// The endpoint can be configured with the following SDF parameters: + /// + /// * Required parameters: + ///
An identifier used to receive messages (string). + /// The topic name where you want to receive the messages targeted to + /// this address. + /// + /// * Optional parameters: + /// Element used to capture where are the broker services. + /// This block can contain any of the next optional parameters: + /// : Service name used to bind an address. + /// The default value is "/broker/bind" + /// : Service name used to unbind from an address. + /// The default value is "/broker/unbind" + /// + /// Here's an example: + /// + ///
addr1
+ /// addr1/rx + /// + /// /broker/bind + /// /broker/unbind + /// + ///
+ class CommsEndpoint + : public System, + public ISystemConfigure, + public ISystemPreUpdate + { + /// \brief Constructor + public: CommsEndpoint(); + + /// \brief Destructor + public: ~CommsEndpoint(); + + // Documentation inherited + public: void Configure(const Entity &_entity, + const std::shared_ptr &_sdf, + EntityComponentManager &_ecm, + EventManager &_eventMgr) override; + + // Documentation inherited + public: void PreUpdate( + const ignition::gazebo::UpdateInfo &_info, + ignition::gazebo::EntityComponentManager &_ecm) override; + + /// \brief Private data pointer. + IGN_UTILS_UNIQUE_IMPL_PTR(dataPtr) + }; + } +} +} +} + +#endif diff --git a/src/systems/perfect_comms/CMakeLists.txt b/src/systems/perfect_comms/CMakeLists.txt new file mode 100644 index 0000000000..fae61ef801 --- /dev/null +++ b/src/systems/perfect_comms/CMakeLists.txt @@ -0,0 +1,6 @@ +gz_add_system(perfect-comms + SOURCES + PerfectComms.cc + PUBLIC_LINK_LIBS + ignition-common${IGN_COMMON_VER}::ignition-common${IGN_COMMON_VER} +) diff --git a/src/systems/perfect_comms/PerfectComms.cc b/src/systems/perfect_comms/PerfectComms.cc new file mode 100644 index 0000000000..08f24cc033 --- /dev/null +++ b/src/systems/perfect_comms/PerfectComms.cc @@ -0,0 +1,115 @@ +/* + * Copyright (C) 2022 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include +#include +#include "ignition/gazebo/comms/Broker.hh" +#include "ignition/gazebo/comms/MsgManager.hh" +#include "ignition/gazebo/Util.hh" +#include "PerfectComms.hh" + +using namespace ignition; +using namespace gazebo; +using namespace systems; + +class ignition::gazebo::systems::PerfectComms::Implementation +{ +}; + +////////////////////////////////////////////////// +PerfectComms::PerfectComms() + : dataPtr(ignition::utils::MakeUniqueImpl()) +{ +} + +////////////////////////////////////////////////// +void PerfectComms::Load(const Entity &/*_entity*/, + std::shared_ptr /*_sdf*/, + EntityComponentManager &/*_ecm*/, + EventManager &/*_eventMgr*/) +{ +} + +////////////////////////////////////////////////// +void PerfectComms::Step( + const UpdateInfo &/*_info*/, + const comms::Registry &_currentRegistry, + comms::Registry &_newRegistry, + EntityComponentManager &_ecm) +{ + // Initialize entity if needed. + for (auto & [address, content] : _currentRegistry) + { + if (content.entity == kNullEntity) + { + auto entities = gazebo::entitiesFromScopedName(content.modelName, _ecm); + if (entities.empty()) + continue; + + auto entityId = *(entities.begin()); + if (entityId == kNullEntity) + continue; + + _newRegistry[address].entity = entityId; + } + } + + for (auto & [address, content] : _currentRegistry) + { + // Reference to the outbound queue for this address. + auto &outbound = content.outboundMsgs; + + // Is the source address bound? + auto itSrc = _currentRegistry.find(address); + bool srcAddressBound = itSrc != _currentRegistry.end(); + + // Is the source address attached to a model? + bool srcAddressAttachedToModel = + srcAddressBound && itSrc->second.entity != kNullEntity; + + if (srcAddressAttachedToModel) + { + // All these messages need to be processed. + for (auto &msg : outbound) + { + // Is the destination address bound? + auto itDst = _currentRegistry.find(msg->dst_address()); + bool dstAddressBound = itDst != _currentRegistry.end(); + + // Is the destination address attached to a model? + bool dstAddressAttachedToModel = + dstAddressBound && itDst->second.entity != kNullEntity; + + if (dstAddressAttachedToModel) + _newRegistry[msg->dst_address()].inboundMsgs.push_back(msg); + } + } + + // Clear the outbound queue. + _newRegistry[address].outboundMsgs.clear(); + } +} + +IGNITION_ADD_PLUGIN(PerfectComms, + ignition::gazebo::System, + comms::ICommsModel::ISystemConfigure, + comms::ICommsModel::ISystemPreUpdate) + +IGNITION_ADD_PLUGIN_ALIAS(PerfectComms, + "ignition::gazebo::systems::PerfectComms") diff --git a/src/systems/perfect_comms/PerfectComms.hh b/src/systems/perfect_comms/PerfectComms.hh new file mode 100644 index 0000000000..862b47ef1c --- /dev/null +++ b/src/systems/perfect_comms/PerfectComms.hh @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2022 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef IGNITION_GAZEBO_SYSTEMS_PERFECTCOMMS_HH_ +#define IGNITION_GAZEBO_SYSTEMS_PERFECTCOMMS_HH_ + +#include + +#include +#include +#include "ignition/gazebo/comms/ICommsModel.hh" +#include "ignition/gazebo/System.hh" + +namespace ignition +{ +namespace gazebo +{ +// Inline bracket to help doxygen filtering. +inline namespace IGNITION_GAZEBO_VERSION_NAMESPACE { +namespace systems +{ + // Forward declarations. + class MsgManager; + + /// \brief An example of a comms model. + /// This model always delivers any message to its destination. + class PerfectComms + : public comms::ICommsModel + { + /// \brief Constructor. + public: explicit PerfectComms(); + + // Documentation inherited. + public: void Load(const Entity &_entity, + std::shared_ptr _sdf, + EntityComponentManager &_ecm, + EventManager &_eventMgr) override; + + // Documentation inherited. + public: void Step(const ignition::gazebo::UpdateInfo &_info, + const comms::Registry &_currentRegistry, + comms::Registry &_newRegistry, + EntityComponentManager &_ecm); + + /// \brief Private data pointer. + IGN_UTILS_UNIQUE_IMPL_PTR(dataPtr) + }; + } +} +} +} + +#endif diff --git a/test/integration/CMakeLists.txt b/test/integration/CMakeLists.txt index f9c1612fdd..21f55f5841 100644 --- a/test/integration/CMakeLists.txt +++ b/test/integration/CMakeLists.txt @@ -46,6 +46,7 @@ set(tests odometry_publisher.cc particle_emitter.cc particle_emitter2.cc + perfect_comms.cc performer_detector.cc physics_system.cc play_pause.cc diff --git a/test/integration/perfect_comms.cc b/test/integration/perfect_comms.cc new file mode 100644 index 0000000000..56c77957c8 --- /dev/null +++ b/test/integration/perfect_comms.cc @@ -0,0 +1,115 @@ +/* + * Copyright (C) 2022 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +#include + +#include +#include +#include + +#include +#include +#include +#include "ignition/gazebo/Server.hh" +#include "ignition/gazebo/test_config.hh" // NOLINT(build/include) +#include "../helpers/EnvTestFixture.hh" + +using namespace ignition; +using namespace gazebo; + +///////////////////////////////////////////////// +class PerfectCommsTest : public InternalFixture<::testing::Test> +{ +}; + +///////////////////////////////////////////////// +TEST_F(PerfectCommsTest, IGN_UTILS_TEST_DISABLED_ON_WIN32(PerfectComms)) +{ + // Start server + ServerConfig serverConfig; + const auto sdfFile = + ignition::common::joinPaths(std::string(PROJECT_SOURCE_PATH), + "examples", "worlds", "perfect_comms.sdf"); + serverConfig.SetSdfFile(sdfFile); + + Server server(serverConfig); + EXPECT_FALSE(server.Running()); + EXPECT_FALSE(*server.Running(0)); + + // Run server + size_t iters = 1000; + server.Run(true, iters, false); + + unsigned int msgCounter = 0u; + std::mutex mutex; + auto cb = [&](const msgs::Dataframe &_msg) -> void + { + // Verify msg content + std::lock_guard lock(mutex); + std::string expected = "hello world " + std::to_string(msgCounter); + + ignition::msgs::StringMsg receivedMsg; + receivedMsg.ParseFromString(_msg.data()); + EXPECT_EQ(expected, receivedMsg.data()); + msgCounter++; + }; + + // Create subscriber. + ignition::transport::Node node; + std::string addr = "addr1"; + std::string subscriptionTopic = "addr1/rx"; + + // Subscribe to a topic by registering a callback. + auto cbFunc = std::function(cb); + EXPECT_TRUE(node.Subscribe(subscriptionTopic, cbFunc)) + << "Error subscribing to topic [" << subscriptionTopic << "]"; + + // Create publisher. + std::string publicationTopic = "/broker/msgs"; + auto pub = node.Advertise(publicationTopic); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + // Prepare the message. + ignition::msgs::Dataframe msg; + msg.set_src_address("addr2"); + msg.set_dst_address(addr); + + // Publish 10 messages. + ignition::msgs::StringMsg payload; + unsigned int pubCount = 10u; + for (unsigned int i = 0u; i < pubCount; ++i) + { + // Prepare the payload. + payload.set_data("hello world " + std::to_string(i)); + std::string serializedData; + EXPECT_TRUE(payload.SerializeToString(&serializedData)) + << payload.DebugString(); + msg.set_data(serializedData); + EXPECT_TRUE(pub.Publish(msg)); + server.Run(true, 100, false); + } + + // Verify subscriber received all msgs. + int sleep = 0; + bool done = false; + while (!done && sleep++ < 10) + { + std::lock_guard lock(mutex); + done = msgCounter == pubCount; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + EXPECT_EQ(pubCount, msgCounter); +}