Skip to content

Commit

Permalink
Make the trigger of the DDS Pipe callbacks configurable (#67)
Browse files Browse the repository at this point in the history
* Dynamically trigger the creation of entities

Signed-off-by: tempate <[email protected]>

* Rebase fix

Signed-off-by: tempate <[email protected]>

* Add none entity-creation-trigger

Signed-off-by: tempate <[email protected]>

* Avoid adding endpoints in the SchemaParticipant

Signed-off-by: tempate <[email protected]>

* Rename entity_creation_trigger to discovery_trigger

Signed-off-by: tempate <[email protected]>

* Apply suggestions

Signed-off-by: tempate <[email protected]>

* Check discovery trigger in services

Signed-off-by: tempate <[email protected]>

* Apply suggestions

Signed-off-by: tempate <[email protected]>

* Apply suggestions

Signed-off-by: tempate <[email protected]>

* Apply suggestions

Signed-off-by: tempate <[email protected]>

* Apply suggestions

Signed-off-by: tempate <[email protected]>

* Apply suggestions

Signed-off-by: tempate <[email protected]>

---------

Signed-off-by: tempate <[email protected]>
  • Loading branch information
Tempate authored and juanlofer-eprosima committed Nov 22, 2023
1 parent fc59c37 commit c26cca3
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <set>

#include <cpp_utils/Formatter.hpp>
#include <cpp_utils/macros/custom_enumeration.hpp>

#include <ddspipe_core/configuration/IConfiguration.hpp>
#include <ddspipe_core/configuration/RoutesConfiguration.hpp>
Expand All @@ -32,6 +33,15 @@ namespace eprosima {
namespace ddspipe {
namespace core {

//! Possible kinds of discovery triggers
ENUMERATION_BUILDER(
DiscoveryTrigger,
READER, //! The discovery callbacks get triggered by the discovery of a reader.
WRITER, //! The discovery callbacks get triggered by the discovery of a writer.
NONE, //! The discovery callbacks don't get triggered by the discovery of readers or writers.
ANY //! The discovery callbacks get triggered by the discovery of either a reader or a writer.
);

/**
* Configuration structure encapsulating the configuration of a \c DdsPipe instance.
*/
Expand Down Expand Up @@ -108,6 +118,9 @@ struct DdsPipeConfiguration : public IConfiguration

//! Whether the DDS Pipe should be initialized enabled.
bool init_enabled = false;

//! The type of the entity whose discovery should trigger the discovery callbacks.
DiscoveryTrigger discovery_trigger = DiscoveryTrigger::READER;
};

} /* namespace core */
Expand Down
19 changes: 18 additions & 1 deletion ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,26 @@ class DdsPipe
const types::Endpoint& endpoint) noexcept;

/**
* @brief Method called every time a new endpoint has been discovered, removed, or updated.
* @brief Check whether the kind of an endpoint matches the discovery trigger kind.
*
* Method called every time a new endpoint has been discovered, removed, or updated.
*
* @param [in] endpoint : endpoint discovered, removed, or updated.
*
* @return Whether the endpoint's kind matches the discovery trigger.
*/
bool is_endpoint_kind_relevant_(
const types::Endpoint& endpoint) noexcept;

/**
* @brief Check whether an endpoint is the first endpoint discovered or the last removed.
*
* Method called every time a new endpoint has been discovered, removed, or updated.
* This method calls \c is_endpoint_kind_relevant_
*
* @param [in] endpoint : endpoint discovered, removed, or updated.
*
* @return Whether the DdsPipe's discovery callbacks need to process the endpoint.
*/
bool is_endpoint_relevant_(
const types::Endpoint& endpoint) noexcept;
Expand Down
6 changes: 6 additions & 0 deletions ddspipe_core/src/cpp/configuration/DdsPipeConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ namespace core {
bool DdsPipeConfiguration::is_valid(
utils::Formatter& error_msg) const noexcept
{
if (remove_unused_entities && discovery_trigger != DiscoveryTrigger::READER)
{
error_msg << "A discovery-trigger different from reader is incompatible with remove-unused-entities.";
return false;
}

return routes.is_valid(error_msg) && topic_routes.is_valid(error_msg);
}

Expand Down
35 changes: 28 additions & 7 deletions ddspipe_core/src/cpp/core/DdsPipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <cpp_utils/exception/InitializationException.hpp>
#include <cpp_utils/exception/InconsistencyException.hpp>
#include <cpp_utils/Log.hpp>
#include <cpp_utils/utils.hpp>

#include <ddspipe_core/core/DdsPipe.hpp>

Expand Down Expand Up @@ -80,9 +81,6 @@ DdsPipe::DdsPipe(
}

// Init discovery database
// The entities should not be added to the Discovery Database until the builtin topics have been created.
// This is due to the fact that the Participants endpoints start discovering topics with different configuration
// than the one specified in the yaml configuration file.
discovery_database_->start();

logDebug(DDSPIPE, "DDS Pipe created.");
Expand Down Expand Up @@ -284,7 +282,7 @@ void DdsPipe::discovered_endpoint_nts_(

if (RpcTopic::is_service_topic(endpoint.topic))
{
if (endpoint.is_reader() && endpoint.is_server_endpoint())
if (is_endpoint_kind_relevant_(endpoint) && endpoint.is_server_endpoint())
{
// Service server discovered
discovered_service_nts_(RpcTopic(
Expand Down Expand Up @@ -347,18 +345,41 @@ void DdsPipe::updated_endpoint_nts_(
}
}

bool DdsPipe::is_endpoint_kind_relevant_(
const Endpoint& endpoint) noexcept
{
switch (configuration_.discovery_trigger)
{
case DiscoveryTrigger::READER:
return endpoint.is_reader();

case DiscoveryTrigger::WRITER:
return endpoint.is_writer();

case DiscoveryTrigger::ANY:
return true;

case DiscoveryTrigger::NONE:
return false;

default:
utils::tsnh(utils::Formatter() << "Invalid Discovery Trigger.");
return false;
}
}

bool DdsPipe::is_endpoint_relevant_(
const Endpoint& endpoint) noexcept
{
if (!endpoint.is_reader())
if (!is_endpoint_kind_relevant_(endpoint))
{
return false;
}

auto is_endpoint_relevant = [endpoint](const Endpoint& entity)
auto is_endpoint_relevant = [&](const Endpoint& entity)
{
return entity.active &&
entity.is_reader() &&
is_endpoint_kind_relevant_(entity) &&
entity.topic == endpoint.topic &&
entity.discoverer_participant_id == endpoint.discoverer_participant_id;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,6 @@ SchemaParticipant::SchemaParticipant(
, discovery_database_(discovery_database)
, schema_handler_(schema_handler)
{
// Simulate that there is a reader of type object to force this track creation
discovery_database_->add_endpoint(
rtps::CommonParticipant::simulate_endpoint(type_object_topic(), this->id())
);

// Force for every topic found to create track by creating simulated readers
// NOTE: this could change for: in DDS Pipe change that only readers create track
discovery_database_->add_endpoint_discovered_callback(
[this](Endpoint endpoint_discovered)
{
if (endpoint_discovered.is_writer() && endpoint_discovered.discoverer_participant_id != this->id())
{
discovery_database_->add_endpoint(
rtps::CommonParticipant::simulate_endpoint(endpoint_discovered.topic,
endpoint_discovered.discoverer_participant_id)
);
}
}
);
}

ParticipantId SchemaParticipant::id() const noexcept
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ constexpr const char* SPECS_QOS_TAG("qos"); //! Global Topic QoS
constexpr const char* NUMBER_THREADS_TAG("threads"); //! Number of threads to configure the thread pool
constexpr const char* WAIT_ALL_ACKED_TIMEOUT_TAG("wait-all-acked-timeout"); //! Wait for a maximum of *wait-all-acked-timeout* ms until all msgs sent by reliable writers are acknowledged by their matched readers
constexpr const char* REMOVE_UNUSED_ENTITIES_TAG("remove-unused-entities"); //! Dynamically create and delete entities and tracks.
constexpr const char* DISCOVERY_TRIGGER_TAG("discovery-trigger"); //! Make the trigger of the DDS Pipe callbacks configurable.

// XML configuration tags
constexpr const char* XML_TAG("xml"); //! Tag to read xml configuration
Expand Down

0 comments on commit c26cca3

Please sign in to comment.