Skip to content

Commit

Permalink
Add custom forwarding routes feature
Browse files Browse the repository at this point in the history
Signed-off-by: Juan López Fernández <[email protected]>
  • Loading branch information
juanlofer-eprosima committed Aug 1, 2023
1 parent b7a0908 commit a655e48
Show file tree
Hide file tree
Showing 12 changed files with 596 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <ddspipe_core/communication/Bridge.hpp>
#include <ddspipe_core/communication/dds/Track.hpp>
#include <ddspipe_core/configuration/RoutesConfiguration.hpp>
#include <ddspipe_core/types/topic/dds/DistributedTopic.hpp>

namespace eprosima {
Expand Down Expand Up @@ -55,7 +56,8 @@ class DdsBridge : public Bridge
const utils::Heritable<types::DistributedTopic>& topic,
const std::shared_ptr<ParticipantsDatabase>& participants_database,
const std::shared_ptr<PayloadPool>& payload_pool,
const std::shared_ptr<utils::SlotThreadPool>& thread_pool);
const std::shared_ptr<utils::SlotThreadPool>& thread_pool,
const RoutesConfiguration& routes_config);

DDSPIPE_CORE_DllAPI
~DdsBridge();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <map>
#include <set>

#include <cpp_utils/Formatter.hpp>

#include <ddspipe_core/configuration/IConfiguration.hpp>
#include <ddspipe_core/types/participant/ParticipantId.hpp>

#include <ddspipe_core/library/library_dll.h>

namespace eprosima {
namespace ddspipe {
namespace core {

// TODO
struct RoutesConfiguration : public ddspipe::core::IConfiguration
{

using RoutesMap = std::map<ddspipe::core::types::ParticipantId, std::set<ddspipe::core::types::ParticipantId>>;

/////////////////////////
// CONSTRUCTORS
/////////////////////////

DDSPIPE_CORE_DllAPI RoutesConfiguration() = default;

/////////////////////////
// METHODS
/////////////////////////

DDSPIPE_CORE_DllAPI virtual bool is_valid(
utils::Formatter& error_msg) const noexcept override;

DDSPIPE_CORE_DllAPI bool is_valid(
utils::Formatter& error_msg,
std::map<ddspipe::core::types::ParticipantId, bool> participant_ids) const noexcept;

/////////////////////////
// OPERATORS
/////////////////////////

DDSPIPE_CORE_DllAPI RoutesMap operator () () const;

/////////////////////////
// VARIABLES
/////////////////////////

RoutesMap routes {};
};

} /* namespace core */
} /* namespace ddspipe */
} /* namespace eprosima */
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <set>

#include <cpp_utils/Formatter.hpp>
#include <cpp_utils/memory/Heritable.hpp>

#include <ddspipe_core/configuration/IConfiguration.hpp>
#include <ddspipe_core/configuration/RoutesConfiguration.hpp>
#include <ddspipe_core/types/topic/dds/DistributedTopic.hpp>

#include <ddspipe_core/library/library_dll.h>

namespace eprosima {
namespace ddspipe {
namespace core {

// TODO
struct TopicRoutesConfiguration : public ddspipe::core::IConfiguration
{

using TopicRoutesMap = std::map<utils::Heritable<ddspipe::core::types::DistributedTopic>, RoutesConfiguration>;

/////////////////////////
// CONSTRUCTORS
/////////////////////////

DDSPIPE_CORE_DllAPI TopicRoutesConfiguration() = default;

/////////////////////////
// METHODS
/////////////////////////

DDSPIPE_CORE_DllAPI virtual bool is_valid(
utils::Formatter& error_msg) const noexcept override;

DDSPIPE_CORE_DllAPI bool is_valid(
utils::Formatter& error_msg,
std::map<ddspipe::core::types::ParticipantId, bool> participant_ids) const noexcept;

/////////////////////////
// OPERATORS
/////////////////////////

DDSPIPE_CORE_DllAPI TopicRoutesMap operator () () const;

/////////////////////////
// VARIABLES
/////////////////////////

TopicRoutesMap topic_routes {};
};

} /* namespace core */
} /* namespace ddspipe */
} /* namespace eprosima */
15 changes: 13 additions & 2 deletions ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@

#include <ddspipe_core/communication/dds/DdsBridge.hpp>
#include <ddspipe_core/communication/rpc/RpcBridge.hpp>
#include <ddspipe_core/dynamic/DiscoveryDatabase.hpp>
#include <ddspipe_core/configuration/RoutesConfiguration.hpp>
#include <ddspipe_core/configuration/TopicRoutesConfiguration.hpp>
#include <ddspipe_core/dynamic/AllowedTopicList.hpp>
#include <ddspipe_core/dynamic/DiscoveryDatabase.hpp>
#include <ddspipe_core/dynamic/ParticipantsDatabase.hpp>
#include <ddspipe_core/efficiency/payload/PayloadPool.hpp>

#include <ddspipe_core/library/library_dll.h>

namespace eprosima {
Expand Down Expand Up @@ -65,7 +68,9 @@ class DdsPipe
const std::shared_ptr<ParticipantsDatabase>& participants_database,
const std::shared_ptr<utils::SlotThreadPool>& thread_pool,
const std::set<utils::Heritable<types::DistributedTopic>>& builtin_topics = {},
bool start_enable = false);
bool start_enable = false,
const RoutesConfiguration& routes_config = {},
const TopicRoutesConfiguration& topic_routes_config = {});

/**
* @brief Destroy the DdsPipe object
Expand Down Expand Up @@ -316,6 +321,12 @@ class DdsPipe
*/
std::map<types::RpcTopic, bool> current_services_;

//! Custom forwarding routes
RoutesConfiguration routes_config_;

//! Custom forwarding routes per topic
TopicRoutesConfiguration topic_routes_config_;

/////
// AUXILIAR VARIABLES

Expand Down
123 changes: 94 additions & 29 deletions ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,53 +27,118 @@ DdsBridge::DdsBridge(
const utils::Heritable<DistributedTopic>& topic,
const std::shared_ptr<ParticipantsDatabase>& participants_database,
const std::shared_ptr<PayloadPool>& payload_pool,
const std::shared_ptr<utils::SlotThreadPool>& thread_pool)
const std::shared_ptr<utils::SlotThreadPool>& thread_pool,
const RoutesConfiguration& routes_config)
: Bridge(participants_database, payload_pool, thread_pool)
, topic_(topic)
{
logDebug(DDSPIPE_DDSBRIDGE, "Creating DdsBridge " << *this << ".");

std::set<ParticipantId> ids = participants_->get_participants_ids();
auto routes = routes_config();

std::map<types::ParticipantId, std::shared_ptr<IWriter>> writers;
std::map<types::ParticipantId, std::shared_ptr<IReader>> readers;
// Determine which endpoints need to be created
std::set<types::ParticipantId> writers_to_create;
std::set<types::ParticipantId> readers_to_create;
for (const ParticipantId& id: ids)
{
const auto& it = routes.find(id);
if (it != routes.end())
{
const auto& src_id = it->first;
const auto& dst_ids = it->second;
if (dst_ids.size() != 0) // Only create reader if there are any destination writers
{
readers_to_create.insert(src_id);
writers_to_create.insert(dst_ids.begin(), dst_ids.end());
}
}
else
{
// When no route is defined, forward to all other participants (+ itself if repeater)
auto dst_ids = ids;
if (!participants_->get_participant(id)->is_repeater())
{
// Do not add writer for this participant because it is not repeater
dst_ids.erase(id);
}
writers_to_create.insert(dst_ids.begin(), dst_ids.end());
readers_to_create.insert(id);
}
}

// Generate readers and writers for each participant
for (const auto& id: ids)
// Generate writers for each participant
std::map<types::ParticipantId, std::shared_ptr<IWriter>> writers;
for (const auto& id: writers_to_create)
{
std::shared_ptr<IParticipant> participant = participants_database->get_participant(id);

writers[id] = participant->create_writer(*topic);
}

// Generate readers for each participant
std::map<types::ParticipantId, std::shared_ptr<IReader>> readers;
for (const auto& id: readers_to_create)
{
std::shared_ptr<IParticipant> participant = participants_database->get_participant(id);
readers[id] = participant->create_reader(*topic);
}

// Generate tracks
for (ParticipantId id: ids)
for (const ParticipantId& id: ids)
{
// List of all Participants
std::map<ParticipantId, std::shared_ptr<IWriter>> writers_except_one =
writers; // Create a copy of the map

if (!participants_->get_participant(id)->is_repeater())
auto it = routes.find(id);
if (it != routes.end())
{
// Remove this Track source participant because it is not repeater
writers_except_one.erase(id);

logDebug(
DDSPIPE_DDSBRIDGE,
"Not adding own Writer to Track in " << *this << " in Participant " << id << ".");
if (it->second.size() == 0)
{
// Do not create track if no destination writers
continue;
}

std::map<ParticipantId, std::shared_ptr<IWriter>> dst_writers;
for (const auto& writer_id : it->second)
{
dst_writers[writer_id] = writers[writer_id];
}

// This insert is required as there is no copy method for Track
// Tracks are always created disabled and then enabled with Bridge enable() method
tracks_[id] =
std::make_unique<Track>(
topic,
id,
readers[id],
std::move(dst_writers),
payload_pool,
thread_pool);
}
else
{
// List of all Participants
std::map<ParticipantId, std::shared_ptr<IWriter>> writers_except_one =
writers; // Create a copy of the map

if (!participants_->get_participant(id)->is_repeater())
{
// Remove this Track source participant because it is not repeater
writers_except_one.erase(id);

logDebug(
DDSPIPE_DDSBRIDGE,
"Not adding own Writer to Track in " << *this << " in Participant " << id << ".");
}

// This insert is required as there is no copy method for Track
// Tracks are always created disabled and then enabled with Bridge enable() method
tracks_[id] =
std::make_unique<Track>(
topic,
id,
readers[id],
std::move(writers_except_one),
payload_pool,
thread_pool);
}

// This insert is required as there is no copy method for Track
// Tracks are always created disabled and then enabled with Bridge enable() method
tracks_[id] =
std::make_unique<Track>(
topic,
id,
readers[id],
std::move(writers_except_one),
payload_pool,
thread_pool);
}

logDebug(DDSPIPE_DDSBRIDGE, "DdsBridge " << *this << " created.");
Expand Down
Loading

0 comments on commit a655e48

Please sign in to comment.