Skip to content

Commit

Permalink
Make the trigger of the DDS Pipe callbacks configurable (#87)
Browse files Browse the repository at this point in the history
* Use builtin topics for internal communication

Signed-off-by: tempate <[email protected]>

* Add internal topics in the constructor

Signed-off-by: tempate <[email protected]>

* Initialize the pipe enabled

Signed-off-by: tempate <[email protected]>

* Minor fix

Signed-off-by: tempate <[email protected]>

* Rename entity_creation_trigger to discovery_trigger

Signed-off-by: tempate <[email protected]>

* Apply suggestions

Signed-off-by: tempate <[email protected]>

* Apply suggestions

Signed-off-by: tempate <[email protected]>

* Remove outdated warning

Signed-off-by: tempate <[email protected]>

* Apply suggestions

Signed-off-by: tempate <[email protected]>

* Apply suggestions

Signed-off-by: tempate <[email protected]>

* Uncrustify

Signed-off-by: tempate <[email protected]>

* Apply suggestions

Signed-off-by: tempate <[email protected]>

---------

Signed-off-by: tempate <[email protected]>
  • Loading branch information
Tempate authored Nov 22, 2023
1 parent 4369964 commit 77ebb97
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 95 deletions.
61 changes: 37 additions & 24 deletions ddsrecorder/src/cpp/tool/DdsRecorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include <cpp_utils/exception/InitializationException.hpp>
#include <cpp_utils/utils.hpp>

#include <ddspipe_core/types/dynamic_types/types.hpp>

#include "DdsRecorder.hpp"

namespace eprosima {
Expand All @@ -32,28 +34,26 @@ DdsRecorder::DdsRecorder(
const yaml::RecorderConfiguration& configuration,
const DdsRecorderStateCode& init_state,
const std::string& file_name)
: configuration_(configuration)
{
// Create Discovery Database
discovery_database_ =
std::make_shared<DiscoveryDatabase>();
discovery_database_ = std::make_shared<DiscoveryDatabase>();

// Create Payload Pool
payload_pool_ =
std::make_shared<FastPayloadPool>();
payload_pool_ = std::make_shared<FastPayloadPool>();

// Create Thread Pool
thread_pool_ =
std::make_shared<SlotThreadPool>(configuration.n_threads);
thread_pool_ = std::make_shared<SlotThreadPool>(configuration_.n_threads);

// Fill MCAP output file settings
participants::McapOutputSettings mcap_output_settings;
if (file_name == "")
{
mcap_output_settings.output_filename = configuration.output_filename;
mcap_output_settings.output_filepath = configuration.output_filepath;
mcap_output_settings.output_filename = configuration_.output_filename;
mcap_output_settings.output_filepath = configuration_.output_filepath;
mcap_output_settings.prepend_timestamp = true;
mcap_output_settings.output_timestamp_format = configuration.output_timestamp_format;
mcap_output_settings.output_local_timestamp = configuration.output_local_timestamp;
mcap_output_settings.output_timestamp_format = configuration_.output_timestamp_format;
mcap_output_settings.output_local_timestamp = configuration_.output_local_timestamp;
}
else
{
Expand All @@ -65,14 +65,14 @@ DdsRecorder::DdsRecorder(
// Create MCAP Handler configuration
participants::McapHandlerConfiguration handler_config(
mcap_output_settings,
configuration.max_pending_samples,
configuration.buffer_size,
configuration.event_window,
configuration.cleanup_period,
configuration.log_publish_time,
configuration.only_with_type,
configuration.mcap_writer_options,
configuration.record_types);
configuration_.max_pending_samples,
configuration_.buffer_size,
configuration_.event_window,
configuration_.cleanup_period,
configuration_.log_publish_time,
configuration_.only_with_type,
configuration_.mcap_writer_options,
configuration_.record_types);

// Create MCAP Handler
mcap_handler_ = std::make_shared<participants::McapHandler>(
Expand All @@ -82,21 +82,34 @@ DdsRecorder::DdsRecorder(

// Create DynTypes Participant
dyn_participant_ = std::make_shared<DynTypesParticipant>(
configuration.simple_configuration,
configuration_.simple_configuration,
payload_pool_,
discovery_database_);
dyn_participant_->init();

// Create Recorder Participant
recorder_participant_ = std::make_shared<SchemaParticipant>(
configuration.recorder_configuration,
configuration_.recorder_configuration,
payload_pool_,
discovery_database_,
mcap_handler_);

// Create and populate Participant Database
participants_database_ =
std::make_shared<ParticipantsDatabase>();
// Create an internal topic to transmit the dynamic types
configuration_.ddspipe_configuration.builtin_topics.insert(
utils::Heritable<DistributedTopic>::make_heritable(type_object_topic()));

if (!configuration_.ddspipe_configuration.allowlist.empty())
{
// The allowlist is not empty. Add the internal topic.
WildcardDdsFilterTopic internal_topic;
internal_topic.topic_name.set_value(TYPE_OBJECT_TOPIC_NAME);

configuration_.ddspipe_configuration.allowlist.insert(
utils::Heritable<WildcardDdsFilterTopic>::make_heritable(internal_topic));
}

// Create Participant Database
participants_database_ = std::make_shared<ParticipantsDatabase>();

// Populate Participant Database
participants_database_->add_participant(
Expand All @@ -110,7 +123,7 @@ DdsRecorder::DdsRecorder(

// Create DDS Pipe
pipe_ = std::make_unique<DdsPipe>(
configuration.ddspipe_configuration,
configuration_.ddspipe_configuration,
discovery_database_,
payload_pool_,
participants_database_,
Expand Down
3 changes: 3 additions & 0 deletions ddsrecorder/src/cpp/tool/DdsRecorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ class DdsRecorder
static participants::McapHandlerStateCode recorder_to_handler_state_(
const DdsRecorderStateCode& recorder_state);

//! Configuration of the DDS Recorder
yaml::RecorderConfiguration configuration_;

//! Payload Pool
std::shared_ptr<ddspipe::core::PayloadPool> payload_pool_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ ReplayerParticipant::ReplayerParticipant(
payload_pool,
discovery_database)
{
// Delete endpoint discovery/removal callbacks inserted in DDS-Pipe core.
// This is to avoid the creation of useless bridges and tracks created when discovering endpoints in topics other
// than the ones present in MCAP.
discovery_database_->clear_all_callbacks();
}

std::shared_ptr<IReader> ReplayerParticipant::create_reader(
Expand Down
9 changes: 3 additions & 6 deletions ddsrecorder_yaml/src/cpp/recorder/YamlReaderConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ void RecorderConfiguration::load_ddsrecorder_configuration_(
// The DDS Pipe should be enabled on start up.
ddspipe_configuration.init_enabled = true;

// The recorder's DdsPipe trigger is the discovery of a writer
ddspipe_configuration.discovery_trigger = DiscoveryTrigger::WRITER;

// Initialize controller domain with the same as the one being recorded
// WARNING: dds tag must have been parsed beforehand
controller_domain = simple_configuration->domain;
Expand Down Expand Up @@ -358,12 +361,6 @@ void RecorderConfiguration::load_dds_configuration_(
{
ddspipe_configuration.allowlist = YamlReader::get_set<utils::Heritable<IFilterTopic>>(yml, ALLOWLIST_TAG,
version);

// Add to allowlist always the type object topic
WildcardDdsFilterTopic internal_topic;
internal_topic.topic_name.set_value(TYPE_OBJECT_TOPIC_NAME);
ddspipe_configuration.allowlist.insert(
utils::Heritable<WildcardDdsFilterTopic>::make_heritable(internal_topic));
}

/////
Expand Down
3 changes: 3 additions & 0 deletions ddsrecorder_yaml/src/cpp/replayer/YamlReaderConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ void ReplayerConfiguration::load_ddsreplayer_configuration_(

// The DDS Pipe should be enabled on start up.
ddspipe_configuration.init_enabled = true;

// The replayer's DdsPipe doesn't get triggered by the discovery of entities
ddspipe_configuration.discovery_trigger = DiscoveryTrigger::NONE;
}
catch (const std::exception& e)
{
Expand Down
13 changes: 6 additions & 7 deletions ddsreplayer/src/cpp/tool/DdsReplayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#include <fastdds/dds/topic/TypeSupport.hpp>
#include <fastrtps/attributes/ParticipantAttributes.h>

#include <ddspipe_core/types/dynamic_types/types.hpp>

#include <ddsrecorder_participants/common/types/DynamicTypesCollection.hpp>
#include <ddsrecorder_participants/common/types/DynamicTypesCollectionPubSubTypes.hpp>
#include <ddsrecorder_participants/constants.hpp>
Expand All @@ -57,16 +59,13 @@ DdsReplayer::DdsReplayer(
, dyn_publisher_(nullptr)
{
// Create Discovery Database
discovery_database_ =
std::make_shared<DiscoveryDatabase>();
discovery_database_ = std::make_shared<DiscoveryDatabase>();

// Create Payload Pool
payload_pool_ =
std::make_shared<FastPayloadPool>();
payload_pool_ = std::make_shared<FastPayloadPool>();

// Create Thread Pool
thread_pool_ =
std::make_shared<SlotThreadPool>(configuration.n_threads);
thread_pool_ = std::make_shared<SlotThreadPool>(configuration.n_threads);

// Create MCAP Reader Participant
mcap_reader_participant_ = std::make_shared<McapReaderParticipant>(
Expand Down Expand Up @@ -135,7 +134,7 @@ DdsReplayer::DdsReplayer(
}
}

// Generate builtin-topics list by combining information from YAML and MCAP files
// Generate builtin-topics from the topics in the MCAP file
configuration.ddspipe_configuration.builtin_topics = generate_builtin_topics_(configuration, input_file);

// Create DDS Pipe
Expand Down
2 changes: 1 addition & 1 deletion docs/rst/notes/forthcoming_version.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ Next release will include the following **DDS Recorder tool configuration featur
Next release will include the following **DDS Replayer tool configuration features**:

* New configuration option (``topics``) to configure the :ref:`Manual Topics <replayer_manual_topics>`.
* New configuration option (``max-tx-rate``) to configure the :ref:`Max transmission rate <replayer_usage_configuration_max_tx_rate>`.
* New configuration option (``max-tx-rate``) to configure the :ref:`Max transmission rate <replayer_max_tx_rate>`.
* Remove the support for `Built-in Topics <https://dds-recorder.readthedocs.io/en/v0.2.0/rst/replaying/usage/configuration.html#built-in-topics>`_.
30 changes: 12 additions & 18 deletions docs/rst/recording/usage/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ DDS Configuration

Configuration related to DDS communication.

.. _recorder_usage_configuration_domain_id:

DDS Domain
^^^^^^^^^^

Tag ``domain`` configures the :term:`Domain Id`.

.. code-block:: yaml
domain: 101
.. _recorder_builtin_topics:

Built-in Topics
Expand All @@ -55,7 +66,7 @@ The ``builtin-topics`` must specify a ``name`` and ``type`` without wildcard cha
.. _recorder_topic_filtering:

Topic Filtering
---------------
^^^^^^^^^^^^^^^

The |ddsrecorder| automatically detects the topics that are being used in a DDS Network.
The |ddsrecorder| then creates internal DDS :term:`Readers<DataReader>` to record the data published on each topic.
Expand Down Expand Up @@ -213,18 +224,6 @@ If a ``qos`` is not manually configured, it will get its value by discovery.

The :ref:`Topic QoS <recorder_topic_qos>` configured in the Manual Topics take precedence over the :ref:`Specs Topic QoS <recorder_specs_topic_qos>`.

.. _recorder_usage_configuration_domain_id:

DDS Domain
^^^^^^^^^^

Tag ``domain`` configures the :term:`Domain Id`.

.. code-block:: yaml
domain: 101
.. _recorder_ignore_participant_flags:

Ignore Participant Flags
Expand Down Expand Up @@ -286,11 +285,6 @@ Example:
See `Interface Whitelist <https://fast-dds.docs.eprosima.com/en/latest/fastdds/transport/whitelist.html>`_ for more information.

.. warning::

When providing an interface whitelist, external participants with which communication is desired must also be configured with interface whitelisting.


Recorder Configuration
----------------------

Expand Down
47 changes: 12 additions & 35 deletions docs/rst/replaying/usage/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,21 @@ DDS Configuration

Configuration related to DDS communication.

.. _replayer_usage_configuration_domain_id:

DDS Domain
^^^^^^^^^^

Tag ``domain`` configures the :term:`Domain Id`.

.. code-block:: yaml
domain: 101
.. _replayer_topic_filtering:

Topic Filtering
---------------
^^^^^^^^^^^^^^^

The |ddsreplayer| automatically detects the topics that are being used in a DDS Network.
The |ddsreplayer| then creates internal DDS :term:`Writers<DataWriter>` to replay the data published on each topic.
Expand Down Expand Up @@ -154,10 +165,6 @@ The ``max-tx-rate`` tag limits the frequency [Hz] at which samples are sent by d
It only accepts non-negative numbers.
By default it is set to ``0``; it sends samples at an unlimited transmission rate.

.. note::

The ``max-tx-rate`` tag can be set (in order of precedence) for topics, for participants, and globally in specs.

.. _replayer_manual_topics:

Manual Topics
Expand All @@ -182,17 +189,6 @@ If a ``qos`` is not manually configured, it will get its value by discovery.

The :ref:`Topic QoS <replayer_topic_qos>` configured in the Manual Topics take precedence over the :ref:`Specs Topic QoS <replayer_specs_topic_qos>`.

.. _replayer_usage_configuration_domain_id:

DDS Domain
^^^^^^^^^^

Tag ``domain`` configures the :term:`Domain Id`.

.. code-block:: yaml
domain: 101

.. _replayer_ignore_participant_flags:

Expand Down Expand Up @@ -255,11 +251,6 @@ Example:
See `Interface Whitelist <https://fast-dds.docs.eprosima.com/en/latest/fastdds/transport/whitelist.html>`_ for more information.

.. warning::

When providing an interface whitelist, external participants with which communication is desired must also be configured with interface whitelisting.


Replay Configuration
--------------------

Expand Down Expand Up @@ -355,20 +346,6 @@ By default, data is replayed at the same rate it was published/received.
However, a user might be interested in playing messages back at a rate different than the original one.
This can be accomplished through the playback ``rate`` tag, which accepts positive float values (e.g. 0.5 <--> half speed || 2 <--> double speed).

.. _replayer_usage_configuration_max_tx_rate:

Max Transmission Rate
---------------------

The ``max-tx-rate`` tag limits the frequency [Hz] at which samples are sent by discarding messages transmitted before :code:`1/max-tx-rate` seconds have passed since the last sent message.
It only accepts non-negative numbers.
By default it is set to ``0``; it sends samples at an unlimited transmission rate.

.. note::

The ``max-tx-rate`` tag can be set for topics and globally under the ``replayer`` tag.
If both are set, the configuration under topics prevails.

.. _replayer_replay_configuration_replaytypes:

Replay Types
Expand Down

0 comments on commit 77ebb97

Please sign in to comment.