Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20816] Properly delete builtin statistics writers upon delete_contained_entities() (backport #4891) #4917

Merged
merged 2 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/cpp/fastdds/domain/DomainParticipantImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ class DomainParticipantImpl

DomainId_t get_domain_id() const;

ReturnCode_t delete_contained_entities();
virtual ReturnCode_t delete_contained_entities();

ReturnCode_t assert_liveliness();

Expand Down
6 changes: 6 additions & 0 deletions src/cpp/statistics/fastdds/domain/DomainParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@ void DomainParticipantImpl::disable()
efd::DomainParticipantImpl::disable();
}

ReturnCode_t DomainParticipantImpl::delete_contained_entities()
{
delete_statistics_builtin_entities();
return efd::DomainParticipantImpl::delete_contained_entities();
}

efd::PublisherImpl* DomainParticipantImpl::create_publisher_impl(
const efd::PublisherQos& qos,
efd::PublisherListener* listener)
Expand Down
9 changes: 9 additions & 0 deletions src/cpp/statistics/fastdds/domain/DomainParticipantImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ class DomainParticipantImpl : public efd::DomainParticipantImpl
static bool is_statistics_topic_name(
const std::string& topic_name) noexcept;

/**
* @brief This override calls the parent method and returns builtin publishers to nullptr
*
* @return RETCODE_OK if successful
* @note This method is meant to be used followed by a deletion of the participant as it implies
* the deletion of the builtin statistics publishers.
*/
ReturnCode_t delete_contained_entities() override;

protected:

/**
Expand Down
21 changes: 18 additions & 3 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,18 @@ class PubSubReader
return get_last_sequence_received();
}

void startReception(
size_t expected_samples)
{
{
std::unique_lock<std::mutex> lock(mutex_);
current_processed_count_ = 0;
number_samples_expected_ = expected_samples;
last_seq.clear();
}
receiving_.store(true);
}

void stopReception()
{
receiving_.store(false);
Expand Down Expand Up @@ -1790,9 +1802,12 @@ class PubSubReader
if (info.valid_data
&& info.instance_state == eprosima::fastdds::dds::ALIVE_INSTANCE_STATE)
{
auto it = std::find(total_msgs_.begin(), total_msgs_.end(), data);
ASSERT_NE(it, total_msgs_.end());
total_msgs_.erase(it);
if (!total_msgs_.empty())
{
auto it = std::find(total_msgs_.begin(), total_msgs_.end(), data);
ASSERT_NE(it, total_msgs_.end());
total_msgs_.erase(it);
}
++current_processed_count_;
default_receive_print<type>(data);
cv_.notify_one();
Expand Down
94 changes: 94 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsStatistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -686,3 +686,97 @@ TEST(DDSStatistics, discovery_topic_physical_data_delete_physical_properties)
test_discovery_topic_physical_data(DiscoveryTopicPhysicalDataTest::NO_PHYSICAL_DATA);
#endif // FASTDDS_STATISTICS
}

class CustomStatisticsParticipantSubscriber : public PubSubReader<HelloWorldPubSubType>
{
public:

CustomStatisticsParticipantSubscriber(
const std::string& topic_name)
: PubSubReader<HelloWorldPubSubType>(topic_name)
{
}

void destroy() override
{
participant_->delete_contained_entities();
DomainParticipantFactory::get_instance()->delete_participant(participant_);
participant_ = nullptr;
}

};

// Regression test for #20816. When an application is terminated with delete_contained_entities()
// it has to properly finish. The test creates a number of participants with some of them sharing the same topic.
// Each participant asynchronously sends and receive a number of samples. In the readers, when a minumm number of samples
// is received the destroy() method is called (abruptly). The test checks that the application finishes successfully
TEST(DDSStatistics, correct_deletion_upon_delete_contained_entities)
{
#ifdef FASTDDS_STATISTICS

//! Set environment variable and create participant using Qos set by code
const char* value = "HISTORY_LATENCY_TOPIC;NETWORK_LATENCY_TOPIC;"
"PUBLICATION_THROUGHPUT_TOPIC;SUBSCRIPTION_THROUGHPUT_TOPIC;RTPS_SENT_TOPIC;"
"RTPS_LOST_TOPIC;HEARTBEAT_COUNT_TOPIC;ACKNACK_COUNT_TOPIC;NACKFRAG_COUNT_TOPIC;"
"GAP_COUNT_TOPIC;DATA_COUNT_TOPIC;RESENT_DATAS_TOPIC;SAMPLE_DATAS_TOPIC;"
"PDP_PACKETS_TOPIC;EDP_PACKETS_TOPIC;DISCOVERY_TOPIC;PHYSICAL_DATA_TOPIC;";

#ifdef _WIN32
ASSERT_EQ(0, _putenv_s("FASTDDS_STATISTICS", value));
#else
ASSERT_EQ(0, setenv("FASTDDS_STATISTICS", value, 1));
#endif // ifdef _WIN32

size_t n_participants = 5;
size_t n_participants_same_topic = 2;

std::vector<std::shared_ptr<PubSubWriter<HelloWorldPubSubType>>> writers;
std::vector<std::shared_ptr<CustomStatisticsParticipantSubscriber>> readers;

readers.reserve(n_participants);
writers.reserve(n_participants);

std::vector<std::shared_ptr<std::thread>> threads;
threads.reserve(2 * n_participants);

for (size_t i = 0; i < n_participants; ++i)
{
size_t topic_number = (i < n_participants_same_topic) ? 0 : i;

auto writer = std::make_shared<PubSubWriter<HelloWorldPubSubType>>(TEST_TOPIC_NAME + std::to_string(
topic_number));
auto reader =
std::make_shared<CustomStatisticsParticipantSubscriber>(TEST_TOPIC_NAME + std::to_string(topic_number));

std::shared_ptr<std::list<HelloWorld>> data = std::make_shared<std::list<HelloWorld>>(default_helloworld_data_generator(
10));

threads.emplace_back(std::make_shared<std::thread>([reader, data]()
{
reader->init();
ASSERT_TRUE(reader->isInitialized());
reader->startReception(data->size());
reader->block_for_at_least(3);
reader->destroy();
}));

threads.emplace_back(std::make_shared<std::thread>([writer, data]()
{
writer->init();
ASSERT_TRUE(writer->isInitialized());
writer->wait_discovery();
writer->send(*data, 10);
writer->destroy();
}));

writers.push_back(writer);
readers.push_back(reader);
}

for (auto& thread : threads)
{
thread->join();
}

#endif // FASTDDS_STATISTICS
}
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ class DomainParticipantImpl
return false;
}

ReturnCode_t delete_contained_entities()
virtual ReturnCode_t delete_contained_entities()
{
bool can_be_deleted = true;

Expand Down
Loading