From c26cca30704ed4cd003cb57f514c6bc2e821609d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20D=C3=ADaz=20Qu=C3=ADlez?= <33602217+Tempate@users.noreply.github.com> Date: Wed, 22 Nov 2023 11:16:37 +0000 Subject: [PATCH] Make the trigger of the DDS Pipe callbacks configurable (#67) * Dynamically trigger the creation of entities Signed-off-by: tempate * Rebase fix Signed-off-by: tempate * Add none entity-creation-trigger Signed-off-by: tempate * Avoid adding endpoints in the SchemaParticipant Signed-off-by: tempate * Rename entity_creation_trigger to discovery_trigger Signed-off-by: tempate * Apply suggestions Signed-off-by: tempate * Check discovery trigger in services Signed-off-by: tempate * Apply suggestions Signed-off-by: tempate * Apply suggestions Signed-off-by: tempate * Apply suggestions Signed-off-by: tempate * Apply suggestions Signed-off-by: tempate * Apply suggestions Signed-off-by: tempate --------- Signed-off-by: tempate --- .../configuration/DdsPipeConfiguration.hpp | 13 +++++++ .../include/ddspipe_core/core/DdsPipe.hpp | 19 +++++++++- .../configuration/DdsPipeConfiguration.cpp | 6 ++++ ddspipe_core/src/cpp/core/DdsPipe.cpp | 35 +++++++++++++++---- .../dynamic_types/SchemaParticipant.cpp | 19 ---------- .../ddspipe_yaml/yaml_configuration_tags.hpp | 1 + 6 files changed, 66 insertions(+), 27 deletions(-) diff --git a/ddspipe_core/include/ddspipe_core/configuration/DdsPipeConfiguration.hpp b/ddspipe_core/include/ddspipe_core/configuration/DdsPipeConfiguration.hpp index fe8bfb56..8420a2a1 100644 --- a/ddspipe_core/include/ddspipe_core/configuration/DdsPipeConfiguration.hpp +++ b/ddspipe_core/include/ddspipe_core/configuration/DdsPipeConfiguration.hpp @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -32,6 +33,15 @@ namespace eprosima { namespace ddspipe { namespace core { +//! Possible kinds of discovery triggers +ENUMERATION_BUILDER( + DiscoveryTrigger, + READER, //! The discovery callbacks get triggered by the discovery of a reader. + WRITER, //! The discovery callbacks get triggered by the discovery of a writer. + NONE, //! The discovery callbacks don't get triggered by the discovery of readers or writers. + ANY //! The discovery callbacks get triggered by the discovery of either a reader or a writer. + ); + /** * Configuration structure encapsulating the configuration of a \c DdsPipe instance. */ @@ -108,6 +118,9 @@ struct DdsPipeConfiguration : public IConfiguration //! Whether the DDS Pipe should be initialized enabled. bool init_enabled = false; + + //! The type of the entity whose discovery should trigger the discovery callbacks. + DiscoveryTrigger discovery_trigger = DiscoveryTrigger::READER; }; } /* namespace core */ diff --git a/ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp b/ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp index ef61ad51..57687b03 100644 --- a/ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp +++ b/ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp @@ -213,9 +213,26 @@ class DdsPipe const types::Endpoint& endpoint) noexcept; /** - * @brief Method called every time a new endpoint has been discovered, removed, or updated. + * @brief Check whether the kind of an endpoint matches the discovery trigger kind. + * + * Method called every time a new endpoint has been discovered, removed, or updated. + * + * @param [in] endpoint : endpoint discovered, removed, or updated. + * + * @return Whether the endpoint's kind matches the discovery trigger. + */ + bool is_endpoint_kind_relevant_( + const types::Endpoint& endpoint) noexcept; + + /** + * @brief Check whether an endpoint is the first endpoint discovered or the last removed. + * + * Method called every time a new endpoint has been discovered, removed, or updated. + * This method calls \c is_endpoint_kind_relevant_ * * @param [in] endpoint : endpoint discovered, removed, or updated. + * + * @return Whether the DdsPipe's discovery callbacks need to process the endpoint. */ bool is_endpoint_relevant_( const types::Endpoint& endpoint) noexcept; diff --git a/ddspipe_core/src/cpp/configuration/DdsPipeConfiguration.cpp b/ddspipe_core/src/cpp/configuration/DdsPipeConfiguration.cpp index 39d30008..5e3f8561 100644 --- a/ddspipe_core/src/cpp/configuration/DdsPipeConfiguration.cpp +++ b/ddspipe_core/src/cpp/configuration/DdsPipeConfiguration.cpp @@ -29,6 +29,12 @@ namespace core { bool DdsPipeConfiguration::is_valid( utils::Formatter& error_msg) const noexcept { + if (remove_unused_entities && discovery_trigger != DiscoveryTrigger::READER) + { + error_msg << "A discovery-trigger different from reader is incompatible with remove-unused-entities."; + return false; + } + return routes.is_valid(error_msg) && topic_routes.is_valid(error_msg); } diff --git a/ddspipe_core/src/cpp/core/DdsPipe.cpp b/ddspipe_core/src/cpp/core/DdsPipe.cpp index 87bf0b42..af966a6f 100644 --- a/ddspipe_core/src/cpp/core/DdsPipe.cpp +++ b/ddspipe_core/src/cpp/core/DdsPipe.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include @@ -80,9 +81,6 @@ DdsPipe::DdsPipe( } // Init discovery database - // The entities should not be added to the Discovery Database until the builtin topics have been created. - // This is due to the fact that the Participants endpoints start discovering topics with different configuration - // than the one specified in the yaml configuration file. discovery_database_->start(); logDebug(DDSPIPE, "DDS Pipe created."); @@ -284,7 +282,7 @@ void DdsPipe::discovered_endpoint_nts_( if (RpcTopic::is_service_topic(endpoint.topic)) { - if (endpoint.is_reader() && endpoint.is_server_endpoint()) + if (is_endpoint_kind_relevant_(endpoint) && endpoint.is_server_endpoint()) { // Service server discovered discovered_service_nts_(RpcTopic( @@ -347,18 +345,41 @@ void DdsPipe::updated_endpoint_nts_( } } +bool DdsPipe::is_endpoint_kind_relevant_( + const Endpoint& endpoint) noexcept +{ + switch (configuration_.discovery_trigger) + { + case DiscoveryTrigger::READER: + return endpoint.is_reader(); + + case DiscoveryTrigger::WRITER: + return endpoint.is_writer(); + + case DiscoveryTrigger::ANY: + return true; + + case DiscoveryTrigger::NONE: + return false; + + default: + utils::tsnh(utils::Formatter() << "Invalid Discovery Trigger."); + return false; + } +} + bool DdsPipe::is_endpoint_relevant_( const Endpoint& endpoint) noexcept { - if (!endpoint.is_reader()) + if (!is_endpoint_kind_relevant_(endpoint)) { return false; } - auto is_endpoint_relevant = [endpoint](const Endpoint& entity) + auto is_endpoint_relevant = [&](const Endpoint& entity) { return entity.active && - entity.is_reader() && + is_endpoint_kind_relevant_(entity) && entity.topic == endpoint.topic && entity.discoverer_participant_id == endpoint.discoverer_participant_id; }; diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/SchemaParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/SchemaParticipant.cpp index 26b0d6c2..5fbb30e7 100644 --- a/ddspipe_participants/src/cpp/participant/dynamic_types/SchemaParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/SchemaParticipant.cpp @@ -47,25 +47,6 @@ SchemaParticipant::SchemaParticipant( , discovery_database_(discovery_database) , schema_handler_(schema_handler) { - // Simulate that there is a reader of type object to force this track creation - discovery_database_->add_endpoint( - rtps::CommonParticipant::simulate_endpoint(type_object_topic(), this->id()) - ); - - // Force for every topic found to create track by creating simulated readers - // NOTE: this could change for: in DDS Pipe change that only readers create track - discovery_database_->add_endpoint_discovered_callback( - [this](Endpoint endpoint_discovered) - { - if (endpoint_discovered.is_writer() && endpoint_discovered.discoverer_participant_id != this->id()) - { - discovery_database_->add_endpoint( - rtps::CommonParticipant::simulate_endpoint(endpoint_discovered.topic, - endpoint_discovered.discoverer_participant_id) - ); - } - } - ); } ParticipantId SchemaParticipant::id() const noexcept diff --git a/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp b/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp index 29702841..c5191f82 100644 --- a/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp +++ b/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp @@ -141,6 +141,7 @@ constexpr const char* SPECS_QOS_TAG("qos"); //! Global Topic QoS constexpr const char* NUMBER_THREADS_TAG("threads"); //! Number of threads to configure the thread pool constexpr const char* WAIT_ALL_ACKED_TIMEOUT_TAG("wait-all-acked-timeout"); //! Wait for a maximum of *wait-all-acked-timeout* ms until all msgs sent by reliable writers are acknowledged by their matched readers constexpr const char* REMOVE_UNUSED_ENTITIES_TAG("remove-unused-entities"); //! Dynamically create and delete entities and tracks. +constexpr const char* DISCOVERY_TRIGGER_TAG("discovery-trigger"); //! Make the trigger of the DDS Pipe callbacks configurable. // XML configuration tags constexpr const char* XML_TAG("xml"); //! Tag to read xml configuration