Skip to content

Commit

Permalink
Make dynamic tracks configurable
Browse files Browse the repository at this point in the history
Signed-off-by: tempate <[email protected]>
  • Loading branch information
Tempate committed Sep 6, 2023
1 parent 63d01b9 commit 1c5cb9e
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 62 deletions.
36 changes: 31 additions & 5 deletions ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class DdsBridge : public Bridge
const std::shared_ptr<PayloadPool>& payload_pool,
const std::shared_ptr<utils::SlotThreadPool>& thread_pool,
const RoutesConfiguration& routes_config,
const types::ParticipantId& discoverer_participant_id);
const bool dynamic_tracks,
const types::ParticipantId& discoverer_participant_id = "");

DDSPIPE_CORE_DllAPI
~DdsBridge();
Expand All @@ -81,13 +82,24 @@ class DdsBridge : public Bridge
DDSPIPE_CORE_DllAPI
void disable() noexcept override;

/**
* Create the readers, writers, and tracks that are required by the routes.
*
* Thread safe
*
* @throw InitializationException in case \c IWriters or \c IReaders creation fails.
*/
DDSPIPE_CORE_DllAPI
void create_all_tracks();

/**
* Build the DataReaders and DataWriters inside the bridge for the new participant,
* and add them to the Tracks.
*
* Thread safe
*
* @param discoverer_participant_id: The id of the participant who has discovered that the subscriber has become inactive.
* @param discoverer_participant_id: The id of the participant who has discovered that the subscriber has become
* inactive.
*
* @throw InitializationException in case \c IWriters or \c IReaders creation fails.
*/
Expand All @@ -101,16 +113,30 @@ class DdsBridge : public Bridge
*
* Thread safe
*
* @param discoverer_participant_id: The id of the participant who has discovered that the subscriber has become inactive.
*
* @throw InitializationException in case \c IWriters or \c IReaders creation fails.
* @param discoverer_participant_id: The id of the participant who has discovered that the subscriber has become
* inactive.
*/
DDSPIPE_CORE_DllAPI
void remove_from_tracks(
const types::ParticipantId& discoverer_participant_id) noexcept;

protected:

/**
* Add each Participant's DataWriters to its Track.
* If the Participant's DataReader doesn't exist, create it.
* If the Participant's Track doesn't exist, create it.
*
* Thread safe
*
* @param writers: The map of ids to writers that are required for the tracks.
*
* @throw InitializationException in case \c IReaders creation fails.
*/
DDSPIPE_CORE_DllAPI
void add_writers_to_tracks_(
std::map<types::ParticipantId, std::shared_ptr<IWriter>>& writers);

utils::Heritable<types::DistributedTopic> topic_;

RoutesConfiguration::RoutesMap routes_;
Expand Down
4 changes: 2 additions & 2 deletions ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class DdsPipe
bool start_enable = false,
const RoutesConfiguration& routes_config = {},
const TopicRoutesConfiguration& topic_routes_config = {},
const bool delete_unused_entities = false);
const bool dynamic_tracks = false);

/**
* @brief Destroy the DdsPipe object
Expand Down Expand Up @@ -369,7 +369,7 @@ class DdsPipe
TopicRoutesConfiguration topic_routes_config_;

//! Whether readers that aren't connected to any writers should be deleted
bool delete_unused_entities_;
bool dynamic_tracks_;

/////
// AUXILIAR VARIABLES
Expand Down
176 changes: 126 additions & 50 deletions ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,23 @@ DdsBridge::DdsBridge(
const std::shared_ptr<PayloadPool>& payload_pool,
const std::shared_ptr<utils::SlotThreadPool>& thread_pool,
const RoutesConfiguration& routes_config,
const ParticipantId& discoverer_participant_id)
const bool dynamic_tracks,
const ParticipantId& discoverer_participant_id /* = "" */)
: Bridge(participants_database, payload_pool, thread_pool)
, topic_(topic)
{
logDebug(DDSPIPE_DDSBRIDGE, "Creating DdsBridge " << *this << ".");

routes_ = routes_config();

if (discoverer_participant_id == "builtin-participant")
if (dynamic_tracks && discoverer_participant_id != "")
{
for (const ParticipantId& id : participants_->get_participants_ids())
{
add_to_tracks(id);
}
// The builtin participants and some tests use an empty discoverer participant id
add_to_tracks(discoverer_participant_id);
}
else
{
add_to_tracks(discoverer_participant_id);
create_all_tracks();
}

logDebug(DDSPIPE_DDSBRIDGE, "DdsBridge " << *this << " created.");
Expand Down Expand Up @@ -98,14 +97,59 @@ void DdsBridge::disable() noexcept
}
}

void DdsBridge::create_all_tracks()
{
const auto& ids = participants_->get_participants_ids();

// Figure out what writers need to be created
std::set<ParticipantId> writers_to_create;

for (const ParticipantId& id : ids)
{
const auto& routes_it = routes_.find(id);

if (routes_it != routes_.end())
{
// The reader has a route. Create only the writers in the route.
const auto& writers_ids = routes_it->second;
writers_to_create.insert(writers_ids.begin(), writers_ids.end());
}
else
{
// The reader doesn't have a route. Create every writer (+ itself if repeater)
auto writers_ids = ids;

if (!participants_->get_participant(id)->is_repeater())
{
// The participant is not a repeater. Do not add its writer.
writers_ids.erase(id);
}

writers_to_create.insert(writers_ids.begin(), writers_ids.end());
}
}

// Create the writers.
std::map<ParticipantId, std::shared_ptr<IWriter>> writers;

for (const auto& id : writers_to_create)
{
std::shared_ptr<IParticipant> participant = participants_->get_participant(id);
writers[id] = participant->create_writer(*topic_);
}

// Add the writers to the tracks they have routes for.
add_writers_to_tracks_(writers);
}

void DdsBridge::add_to_tracks(
const ParticipantId& discoverer_participant_id)
{
// A new subscriber has been discovered.
// Check if the writer for this participant already exists.
for (const auto& id_to_track : tracks_)
for (const auto& tracks_it : tracks_)
{
const auto& track = id_to_track.second;
const auto& track = tracks_it.second;

if (track->has_writer(discoverer_participant_id))
{
Expand All @@ -114,40 +158,93 @@ void DdsBridge::add_to_tracks(
}
}

// Create the writer.
std::shared_ptr<IParticipant> participant = participants_->get_participant(discoverer_participant_id);

std::map<ParticipantId, std::shared_ptr<IWriter>> id_to_writer;
id_to_writer[discoverer_participant_id] = participant->create_writer(*topic_);
// Create the writer.
std::map<ParticipantId, std::shared_ptr<IWriter>> writer;
writer[discoverer_participant_id] = participant->create_writer(*topic_);

// Add the writer to the tracks it has routes for.
add_writers_to_tracks_(writer);
}

void DdsBridge::remove_from_tracks(
const ParticipantId& discoverer_participant_id) noexcept
{
for (const auto& id_to_track : tracks_)
{
const auto& id = id_to_track.first;
const auto& track = id_to_track.second;

// If the writer is in the track, remove it.
track->remove_writer(discoverer_participant_id);

if (track->count_writers() <= 0)
{
// The track doesn't have any writers. Remove it.
tracks_.erase(id);
}
}
}

// Create the necessary readers and tracks.
void DdsBridge::add_writers_to_tracks_(
std::map<ParticipantId, std::shared_ptr<IWriter>>& writers)
{
// Add writers to the tracks of the readers in their route.
// If the readers in their route don't exist, create them with their tracks.
for (const ParticipantId& id : participants_->get_participants_ids())
{
const auto& it = routes_.find(id);

if (it != routes_.end() && it->second.find(discoverer_participant_id) == it->second.end())
// Select the necessary writers
std::map<ParticipantId, std::shared_ptr<IWriter>> writers_of_reader;

const auto& routes_it = routes_.find(id);

if (routes_it != routes_.end())
{
// The Participant has a route and the discoverer is not in it.
// There can be no changes to the tracks.
continue;
// The reader has a route. Add only the writers in the route.
const auto& writers_in_route_of_reader = routes_it->second;

for (const auto& writer_id : writers_in_route_of_reader)
{
writers_of_reader[id] = writers[writer_id];
}
}
else
{
// The reader doesn't have a route. Add every writer (+ itself if repeater)
writers_of_reader = writers;

if (!participants_->get_participant(id)->is_repeater())
{
// The participant is not a repeater. Do not add its writer.
writers_of_reader.erase(id);
}
}

if (id == discoverer_participant_id && !participants_->get_participant(id)->is_repeater())
if (writers_of_reader.size() == 0)
{
// Don't connect a participant's reader and writer if the participant is not a repeater.
// There are no writers in the route. Don't create the reader or the track.
continue;
}

// Create a copy of the writer
std::map<ParticipantId, std::shared_ptr<IWriter>> id_to_dst_writer(id_to_writer);

if (tracks_.count(id))
{
// Enable the writer before adding it to the track.
id_to_dst_writer[discoverer_participant_id]->enable();

// The track already exists. Add the writer to it.
tracks_[id]->add_writer(discoverer_participant_id, id_to_dst_writer[discoverer_participant_id]);
// The track already exists. Add the writers to it.
for (const auto& writers_of_reader_it : writers_of_reader)
{
const auto& writer_id = writers_of_reader_it.first;
const auto& writer = writers_of_reader_it.second;

if (!tracks_[id]->has_writer(writer_id))
{
// Enable the writer before adding it to the track.
writer->enable();

// Add the writer to the track
tracks_[id]->add_writer(writer_id, writer);
}
}
}
else
{
Expand All @@ -159,31 +256,10 @@ void DdsBridge::add_to_tracks(
topic_,
id,
std::move(reader),
std::move(id_to_dst_writer),
std::move(writers_of_reader),
payload_pool_,
thread_pool_);
}

tracks_[id]->enable();
}
}

void DdsBridge::remove_from_tracks(
const ParticipantId& discoverer_participant_id) noexcept
{
for (const auto& id_to_track : tracks_)
{
const auto& id = id_to_track.first;
const auto& track = id_to_track.second;

// If the writer is in the track, remove it.
track->remove_writer(discoverer_participant_id);

if (track->count_writers() <= 0)
{
// The track doesn't have any writers. Remove it.
tracks_.erase(id);
}
}
}

Expand Down
9 changes: 5 additions & 4 deletions ddspipe_core/src/cpp/core/DdsPipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ DdsPipe::DdsPipe(
bool start_enable, /* = false */
const RoutesConfiguration& routes_config, /* = {} */
const TopicRoutesConfiguration& topic_routes_config, /* = {} */
const bool delete_unused_entities /* = false */)
const bool dynamic_tracks /* = false */)
: allowed_topics_(allowed_topics)
, discovery_database_(discovery_database)
, payload_pool_(payload_pool)
Expand All @@ -47,7 +47,7 @@ DdsPipe::DdsPipe(
, enabled_(false)
, routes_config_(routes_config)
, topic_routes_config_(topic_routes_config)
, delete_unused_entities_(delete_unused_entities)
, dynamic_tracks_(dynamic_tracks)
{
logDebug(DDSPIPE, "Creating DDS Pipe.");

Expand Down Expand Up @@ -326,7 +326,7 @@ void DdsPipe::removed_endpoint_nts_(
"Error finding Bridge for topic " << topic <<
". The Bridge does not exist.");
}
else if (delete_unused_entities_)
else if (dynamic_tracks_)
{
it_bridge->second->remove_from_tracks(endpoint.discoverer_participant_id);
}
Expand All @@ -343,7 +343,7 @@ void DdsPipe::init_bridges_nts_(
{
for (const auto& topic : builtin_topics)
{
discovered_topic_nts_(topic, "builtin-participant");
discovered_topic_nts_(topic, "");
create_new_bridge_nts_(topic, false);
}
}
Expand Down Expand Up @@ -440,6 +440,7 @@ void DdsPipe::create_new_bridge_nts_(
payload_pool_,
thread_pool_,
routes_config__,
dynamic_tracks_,
discoverer_participant_id);

if (enabled)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ constexpr const char* MAX_HISTORY_DEPTH_TAG("max-depth"); //! Maximum size (numb
constexpr const char* DOWNSAMPLING_TAG("downsampling"); //! Keep 1 out of every *downsampling* samples received
constexpr const char* MAX_RECEPTION_RATE_TAG("max-reception-rate"); //! Process up to *max_reception_rate* samples in a 1 second bin
constexpr const char* WAIT_ALL_ACKED_TIMEOUT_TAG("wait-all-acked-timeout"); //! Wait for a maximum of *wait-all-acked-timeout* ms until all msgs sent by reliable writers are acknowledged by their matched readers
constexpr const char* DELETE_UNUSED_ENTITIES_TAG("delete-unused-entities"); //! Delete the unused entities and tracks.
constexpr const char* DYNAMIC_TRACKS_TAG("dynamic-tracks"); //! Dynamically create and delete tracks.

// XML configuration tags
constexpr const char* XML_TAG("xml"); //! Tag to read xml configuration
Expand Down

0 comments on commit 1c5cb9e

Please sign in to comment.