Skip to content

Commit

Permalink
Doxygen, Windows support, and improved readability
Browse files Browse the repository at this point in the history
Signed-off-by: tempate <[email protected]>
  • Loading branch information
Tempate committed Sep 1, 2023
1 parent ad80306 commit 90d0215
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 58 deletions.
30 changes: 20 additions & 10 deletions ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class DdsBridge : public Bridge
const std::shared_ptr<PayloadPool>& payload_pool,
const std::shared_ptr<utils::SlotThreadPool>& thread_pool,
const RoutesConfiguration& routes_config,
const types::ParticipantId& discoverer_id);
const types::ParticipantId& discoverer_participant_id);

DDSPIPE_CORE_DllAPI
~DdsBridge();
Expand All @@ -82,22 +82,32 @@ class DdsBridge : public Bridge
void disable() noexcept override;

/**
* Build the DRs and DWs inside the bridge for the new participant,
* Build the DataReaders and DataWriters inside the bridge for the new participant,
* and add them to the Tracks.
*
* THREAD SAFE?
* Thread safe
*
* @param discoverer_participant_id: The id of the participant who has discovered that the subscriber has become inactive.
*
* @throw InitializationException in case \c IWriters or \c IReaders creation fails.
*/
utils::ReturnCode add_subscriber(
const types::ParticipantId& id) noexcept;
DDSPIPE_CORE_DllAPI
void add_to_tracks(
const types::ParticipantId& discoverer_participant_id);

/**
* Remove the DW from all the Tracks in the bridge.
* Remove the DRs and Tracks that don't have any DWs.
* Remove the DatWriter from all the Tracks in the bridge.
* Remove the DataReaders and Tracks that don't have any DataWriters.
*
* THREAD SAFE?
* Thread safe
*
* @param discoverer_participant_id: The id of the participant who has discovered that the subscriber has become inactive.
*
* @throw InitializationException in case \c IWriters or \c IReaders creation fails.
*/
utils::ReturnCode remove_subscriber(
const types::ParticipantId& id) noexcept;
DDSPIPE_CORE_DllAPI
void remove_from_tracks(
const types::ParticipantId& discoverer_participant_id) noexcept;

protected:

Expand Down
8 changes: 4 additions & 4 deletions ddspipe_core/include/ddspipe_core/communication/dds/Track.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class Track
* Add a writer to the track.
* It doesn't do anything if the writer is already in it.
*
* THREAD SAFE?
* Tread safe
*/
DDSPIPE_CORE_DllAPI
void add_writer(
Expand All @@ -111,7 +111,7 @@ class Track
* Remove a writer from the track.
* It doesn't do anything if the writer isn't in the track.
*
* THREAD SAFE?
* Tread safe
*/
DDSPIPE_CORE_DllAPI
void remove_writer(
Expand All @@ -120,7 +120,7 @@ class Track
/**
* Check if a writer is inside the track.
*
* THREAD SAFE?
* Tread safe
*/
DDSPIPE_CORE_DllAPI
bool has_writer(
Expand All @@ -129,7 +129,7 @@ class Track
/**
* Count the number of writers inside the track.
*
* THREAD SAFE?
* Tread safe
*/
DDSPIPE_CORE_DllAPI
int count_writers() noexcept;
Expand Down
11 changes: 9 additions & 2 deletions ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,14 @@ class DdsPipe
* If the DdsPipe is enabled, the new Bridge is created and enabled.
*
* @note This is the only method that adds topics to \c current_topics_
* @note This is the only method that adds topics to \c current_topics_discoverers_
*
* @param [in] topic : topic discovered
* @param [in] subscriber_id : id of the subscriber who discovered the topic
* @param [in] discoverer_participant_id : id of the subscriber who discovered the topic
*/
void discovered_topic_nts_(
const utils::Heritable<types::DistributedTopic>& topic,
const types::ParticipantId& subscriber_id) noexcept;
const types::ParticipantId& discoverer_participant_id) noexcept;

/**
* @brief Method called every time a new endpoint (corresponding to a server) has been discovered/updated
Expand Down Expand Up @@ -344,6 +345,12 @@ class DdsPipe
* If the value is true, it means this topic is currently activated.
*/
std::map<utils::Heritable<types::DistributedTopic>, bool> current_topics_;

/**
* @brief List of the ids of the participants who discovered each topic.
*
* Every topic discovered is added to the map.
*/
std::map<utils::Heritable<types::DistributedTopic>, types::ParticipantId> current_topics_discoverers_;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class DiscoveryDatabase
*
* @return Map of endpoints
*/
DDSPIPE_CORE_DllAPI
std::map<types::Guid, types::Endpoint> get_endpoints() const noexcept;

/**
Expand Down
57 changes: 25 additions & 32 deletions ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,24 @@ DdsBridge::DdsBridge(
const std::shared_ptr<PayloadPool>& payload_pool,
const std::shared_ptr<utils::SlotThreadPool>& thread_pool,
const RoutesConfiguration& routes_config,
const ParticipantId& discoverer_id)
const ParticipantId& discoverer_participant_id)
: Bridge(participants_database, payload_pool, thread_pool)
, topic_(topic)
{
logDebug(DDSPIPE_DDSBRIDGE, "Creating DdsBridge " << *this << ".");

routes_ = routes_config();

if (discoverer_id == "built-in")
if (discoverer_participant_id == "builtin-participant")
{
for (const ParticipantId& id : participants_->get_participants_ids())
{
add_subscriber(id);
add_to_tracks(id);
}
}
else
{
add_subscriber(discoverer_id);
add_to_tracks(discoverer_participant_id);
}

logDebug(DDSPIPE_DDSBRIDGE, "DdsBridge " << *this << " created.");
Expand Down Expand Up @@ -98,8 +98,8 @@ void DdsBridge::disable() noexcept
}
}

utils::ReturnCode DdsBridge::add_subscriber(
const ParticipantId& subscriber_id) noexcept
void DdsBridge::add_to_tracks(
const ParticipantId& discoverer_participant_id)
{
// A new subscriber has been discovered.
// Check if the writer for this participant already exists.
Expand All @@ -108,45 +108,45 @@ utils::ReturnCode DdsBridge::add_subscriber(
const auto& id = id_to_track.first;
const auto& track = id_to_track.second;

if (track->has_writer(subscriber_id))
if (track->has_writer(discoverer_participant_id))
{
// The writer already exists. There is nothing to do. Exit.
return utils::ReturnCode::RETCODE_OK;
return;
}
}

// Create the writer.
std::shared_ptr<IParticipant> participant = participants_->get_participant(subscriber_id);
std::shared_ptr<IParticipant> participant = participants_->get_participant(discoverer_participant_id);

std::map<ParticipantId, std::shared_ptr<IWriter>> id_to_writer;
id_to_writer[subscriber_id] = participant->create_writer(*topic_);
id_to_writer[discoverer_participant_id] = participant->create_writer(*topic_);

// Create the necessary readers and tracks.
for (const ParticipantId& id : participants_->get_participants_ids())
{
const auto& it = routes_.find(id);

if (it != routes_.end() && it->second.find(subscriber_id) == it->second.end())
if (it != routes_.end() && it->second.find(discoverer_participant_id) == it->second.end())
{
// The Participant has a route and the subscriber_id is not in it.
// The Participant has a route and the discoverer is not in it.
// There can be no changes to the tracks.
continue;
}

if (id == subscriber_id && !participants_->get_participant(id)->is_repeater())
if (id == discoverer_participant_id && !participants_->get_participant(id)->is_repeater())
{
// Don't connect a participant's reader and writer if the participant is not a repeater.
continue;
}

// Create a copy of the writer
std::map<ParticipantId, std::shared_ptr<IWriter>> id_to_dst_writer;
id_to_dst_writer[subscriber_id] = id_to_writer[subscriber_id];
id_to_dst_writer[discoverer_participant_id] = id_to_writer[discoverer_participant_id];

if (tracks_.count(id))
{
// The track already exists. Add the writer.
tracks_[id]->add_writer(subscriber_id, id_to_dst_writer[subscriber_id]);
// The track already exists. Add the writer to it.
tracks_[id]->add_writer(discoverer_participant_id, id_to_dst_writer[discoverer_participant_id]);
}
else
{
Expand All @@ -157,40 +157,33 @@ utils::ReturnCode DdsBridge::add_subscriber(
tracks_[id] = std::make_unique<Track>(
topic_,
id,
std::move(reader), // SHOULD WE USE std::move HERE?
std::move(reader),
std::move(id_to_dst_writer),
payload_pool_,
thread_pool_);
}

tracks_[id]->enable();
}

return utils::ReturnCode::RETCODE_OK;
}

utils::ReturnCode DdsBridge::remove_subscriber(
const ParticipantId& subscriber_id) noexcept
void DdsBridge::remove_from_tracks(
const ParticipantId& discoverer_participant_id) noexcept
{
for (const auto& id_to_track : tracks_)
{
const auto& id = id_to_track.first;
const auto& track = id_to_track.second;

if (track->has_writer(subscriber_id))
// If the writer is in the track, remove it.
track->remove_writer(discoverer_participant_id);

if (track->count_writers() <= 0)
{
// Remove the writer from the track.
track->remove_writer(subscriber_id);

if (track->count_writers() <= 0)
{
// The track doesn't have any writers. Remove it.
tracks_.erase(id);
}
// The track doesn't have any writers. Remove it.
tracks_.erase(id);
}
}

return utils::ReturnCode::RETCODE_OK;
}

std::ostream& operator <<(
Expand Down
23 changes: 13 additions & 10 deletions ddspipe_core/src/cpp/core/DdsPipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,14 @@ void DdsPipe::removed_endpoint_nts_(

if (it_bridge == bridges_.end())
{
// THE BRIDGE HAS TO EXIST. ERROR.
// The bridge does not exist. Error.
logError(DDSPIPE,
"Error finding Bridge for topic " << topic <<
". The Bridge does not exist.");
}
else
{
it_bridge->second->remove_subscriber(endpoint.discoverer_participant_id);
it_bridge->second->remove_from_tracks(endpoint.discoverer_participant_id);
}
}
else if (endpoint.is_server_endpoint())
Expand All @@ -338,32 +341,32 @@ void DdsPipe::init_bridges_nts_(
{
for (const auto& topic : builtin_topics)
{
discovered_topic_nts_(topic, "built-in");
discovered_topic_nts_(topic, "builtin-participant");
create_new_bridge_nts_(topic, false);
}
}

void DdsPipe::discovered_topic_nts_(
const utils::Heritable<DistributedTopic>& topic,
const ParticipantId& discoverer_id) noexcept
const ParticipantId& discoverer_participant_id) noexcept
{
logInfo(DDSPIPE, "Discovered topic: " << topic << ", by: " << discoverer_id << ".");
logInfo(DDSPIPE, "Discovered topic: " << topic << ", by: " << discoverer_participant_id << ".");

// Check if the bridge (and the topic) already exist.
auto it_bridge = bridges_.find(topic);

if (it_bridge != bridges_.end())
{
// The bridge already exists. Add the discoverer_id.
it_bridge->second->add_subscriber(discoverer_id);
// The bridge already exists. Add the discoverer to the tracks.
it_bridge->second->add_to_tracks(discoverer_participant_id);
return;
}

// Add topic to current_topics as non activated
current_topics_.emplace(topic, false);

// Save the id of the participant who discovered the topic
current_topics_discoverers_.emplace(topic, discoverer_id);
current_topics_discoverers_.emplace(topic, discoverer_participant_id);

// If Pipe is enabled and topic allowed, activate it
if (enabled_ && allowed_topics_->is_topic_allowed(*topic))
Expand Down Expand Up @@ -428,14 +431,14 @@ void DdsPipe::create_new_bridge_nts_(
{
auto routes_config__ = topic_routes_config_().count(topic) !=
0 ? topic_routes_config_()[topic] : routes_config_;
auto discoverer_id = current_topics_discoverers_[topic];
auto discoverer_participant_id = current_topics_discoverers_[topic];

auto new_bridge = std::make_unique<DdsBridge>(topic,
participants_database_,
payload_pool_,
thread_pool_,
routes_config__,
discoverer_id);
discoverer_participant_id);

if (enabled)
{
Expand Down
1 change: 1 addition & 0 deletions ddspipe_core/src/cpp/dynamic/DiscoveryDatabase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ Endpoint DiscoveryDatabase::get_endpoint(

std::map<Guid, Endpoint> DiscoveryDatabase::get_endpoints() const noexcept
{
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
return entities_;
}

Expand Down

0 comments on commit 90d0215

Please sign in to comment.