Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[21245] Fix topic interference on liveliness_changed status (backport #4988) (backport #5032) #5046

Open
wants to merge 1 commit into
base: 2.6.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/cpp/rtps/builtin/liveliness/WLP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -850,10 +850,11 @@ bool WLP::remove_local_reader(

bool WLP::automatic_liveliness_assertion()
{
std::lock_guard<std::recursive_mutex> guard(*mp_builtinProtocols->mp_PDP->getMutex());
std::unique_lock<std::recursive_mutex> lock(*mp_builtinProtocols->mp_PDP->getMutex());

if (0 < automatic_writers_.size())
{
lock.unlock();
return send_liveliness_message(automatic_instance_handle_);
}

Expand Down
64 changes: 32 additions & 32 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,40 +340,11 @@ bool StatefulReader::matched_writer_remove(
const GUID_t& writer_guid,
bool removed_by_lease)
{

if (is_alive_ && liveliness_lease_duration_ < c_TimeInfinite)
{
auto wlp = this->mp_RTPSParticipant->wlp();
if ( wlp != nullptr)
{
LivelinessData::WriterStatus writer_liveliness_status;
wlp->sub_liveliness_manager_->remove_writer(
writer_guid,
liveliness_kind_,
liveliness_lease_duration_,
writer_liveliness_status);

if (writer_liveliness_status == LivelinessData::WriterStatus::ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, -1, 0);
}
else if (writer_liveliness_status == LivelinessData::WriterStatus::NOT_ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, 0, -1);
}

}
else
{
logError(RTPS_LIVELINESS,
"Finite liveliness lease duration but WLP not enabled, cannot remove writer");
}
}

std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
WriterProxy* wproxy = nullptr;
if (is_alive_)
{
std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);

//Remove cachechanges belonging to the unmatched writer
mp_history->writer_unmatched(writer_guid, get_last_notified(writer_guid));

Expand Down Expand Up @@ -418,7 +389,36 @@ bool StatefulReader::matched_writer_remove(
}
}

return (wproxy != nullptr);
bool ret_val = (wproxy != nullptr);
if (ret_val && liveliness_lease_duration_ < c_TimeInfinite)
{
auto wlp = this->mp_RTPSParticipant->wlp();
if ( wlp != nullptr)
{
LivelinessData::WriterStatus writer_liveliness_status;
wlp->sub_liveliness_manager_->remove_writer(
writer_guid,
liveliness_kind_,
liveliness_lease_duration_,
writer_liveliness_status);

if (writer_liveliness_status == LivelinessData::WriterStatus::ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, -1, 0);
}
else if (writer_liveliness_status == LivelinessData::WriterStatus::NOT_ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, 0, -1);
}
}
else
{
logError(RTPS_LIVELINESS,
"Finite liveliness lease duration but WLP not enabled, cannot remove writer");
}
}

return ret_val;
}

bool StatefulReader::matched_writer_is_matched(
Expand Down
61 changes: 33 additions & 28 deletions src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,33 +189,8 @@ bool StatelessReader::matched_writer_remove(
const GUID_t& writer_guid,
bool removed_by_lease)
{
if (liveliness_lease_duration_ < c_TimeInfinite)
{
auto wlp = mp_RTPSParticipant->wlp();
if ( wlp != nullptr)
{
LivelinessData::WriterStatus writer_liveliness_status;
wlp->sub_liveliness_manager_->remove_writer(
writer_guid,
liveliness_kind_,
liveliness_lease_duration_,
writer_liveliness_status);
bool ret_val = false;

if (writer_liveliness_status == LivelinessData::WriterStatus::ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, -1, 0);
}
else if (writer_liveliness_status == LivelinessData::WriterStatus::NOT_ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, 0, -1);
}
}
else
{
logError(RTPS_LIVELINESS,
"Finite liveliness lease duration but WLP not enabled, cannot remove writer");
}
}
{
std::unique_lock<RecursiveTimedMutex> guard(mp_mutex);

Expand Down Expand Up @@ -243,11 +218,41 @@ bool StatelessReader::matched_writer_remove(
guard.unlock();
mp_listener->on_writer_discovery(this, WriterDiscoveryInfo::REMOVED_WRITER, writer_guid, nullptr);
}
return true;
ret_val = true;
break;
}
}
}
return false;

if (ret_val && liveliness_lease_duration_ < c_TimeInfinite)
{
auto wlp = mp_RTPSParticipant->wlp();
if ( wlp != nullptr)
{
LivelinessData::WriterStatus writer_liveliness_status;
wlp->sub_liveliness_manager_->remove_writer(
writer_guid,
liveliness_kind_,
liveliness_lease_duration_,
writer_liveliness_status);

if (writer_liveliness_status == LivelinessData::WriterStatus::ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, -1, 0);
}
else if (writer_liveliness_status == LivelinessData::WriterStatus::NOT_ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, 0, -1);
}
}
else
{
logError(RTPS_LIVELINESS,
"Finite liveliness lease duration but WLP not enabled, cannot remove writer");
}
}

return ret_val;
}

bool StatelessReader::matched_writer_is_matched(
Expand Down
170 changes: 170 additions & 0 deletions test/blackbox/common/BlackboxTestsLivelinessQos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "BlackboxTests.hpp"

#include <string>
#include <thread>

#include "PubSubReader.hpp"
Expand Down Expand Up @@ -2039,6 +2040,175 @@ TEST(LivelinessTests, correct_liveliness_state_one_writer_multiple_readers)
ASSERT_EQ(reader.sub_wait_liveliness_lost_for(2, std::chrono::seconds(4)), 2u);
}

/**
* This is a regression test for redmine issue #21189.
*
* The test ensures that liveliness changed status is not affected by writers on a topic different from
* the one of the reader.
*
* The test creates two readers and two writers, each reader and writer pair on a different topic.
* Writing a sample on one writer should not affect the liveliness changed status of the other reader.
* Destroying the writer should not affect the liveliness changed status of the other reader.
*/
static void test_liveliness_qos_independent_topics(
const std::string& topic_name,
eprosima::fastdds::dds::ReliabilityQosPolicyKind reliability_kind)
{
const auto lease_dutation_time = std::chrono::seconds(1);
const eprosima::fastrtps::Duration_t lease_duration(1, 0);
const eprosima::fastrtps::Duration_t announcement_period(0, 250000000);

PubSubReader<HelloWorldPubSubType> reader1(topic_name + "1");
PubSubReader<HelloWorldPubSubType> reader2(topic_name + "2");

PubSubWriter<HelloWorldPubSubType> writer1(topic_name + "1");
PubSubWriter<HelloWorldPubSubType> writer2(topic_name + "2");

// Configure and start the readers
reader1.liveliness_kind(eprosima::fastdds::dds::AUTOMATIC_LIVELINESS_QOS)
.liveliness_lease_duration(lease_duration)
.reliability(reliability_kind)
.init();
reader2.liveliness_kind(eprosima::fastdds::dds::AUTOMATIC_LIVELINESS_QOS)
.liveliness_lease_duration(lease_duration)
.reliability(reliability_kind)
.init();

// Configure and start the writers
writer1.liveliness_kind(eprosima::fastdds::dds::AUTOMATIC_LIVELINESS_QOS)
.liveliness_lease_duration(lease_duration)
.liveliness_announcement_period(announcement_period)
.reliability(reliability_kind)
.init();
writer2.liveliness_kind(eprosima::fastdds::dds::AUTOMATIC_LIVELINESS_QOS)
.liveliness_lease_duration(lease_duration)
.liveliness_announcement_period(announcement_period)
.reliability(reliability_kind)
.init();

// Wait for discovery
reader1.wait_discovery();
reader2.wait_discovery();
writer1.wait_discovery();
writer2.wait_discovery();

HelloWorldPubSubType::type data;

// Write a sample on writer1 and wait for reader1 to assert writer1's liveliness
writer1.send_sample(data);
reader1.wait_liveliness_recovered();

// Check liveliness changed status on both readers
{
auto liveliness = reader1.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 1);
EXPECT_EQ(liveliness.not_alive_count, 0);
}

{
auto liveliness = reader2.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 0);
EXPECT_EQ(liveliness.not_alive_count, 0);
}

// Write a sample on writer2 and wait for reader2 to assert writer2's liveliness
writer2.send_sample(data);
reader2.wait_liveliness_recovered();

// Check liveliness changed status on both readers
{
auto liveliness = reader1.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 1);
EXPECT_EQ(liveliness.not_alive_count, 0);
}

{
auto liveliness = reader2.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 1);
EXPECT_EQ(liveliness.not_alive_count, 0);
}

// Destroy writer2 and wait twice the lease duration time
writer2.destroy();
std::this_thread::sleep_for(lease_dutation_time * 2);

// Check liveliness changed status on both readers
{
auto liveliness = reader1.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 1);
EXPECT_EQ(liveliness.not_alive_count, 0);
}

{
auto liveliness = reader2.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 0);
EXPECT_EQ(liveliness.not_alive_count, 0);
}

// Start writer2 again and wait for reader2 to assert writer2's liveliness
writer2.init();
reader2.wait_discovery();
writer2.send_sample(data);
reader2.wait_liveliness_recovered(2);

// Check liveliness changed status on both readers
{
auto liveliness = reader1.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 1);
EXPECT_EQ(liveliness.not_alive_count, 0);
}

{
auto liveliness = reader2.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 1);
EXPECT_EQ(liveliness.not_alive_count, 0);
}

// Destroy writer1 and wait twice the lease duration time
writer1.destroy();
std::this_thread::sleep_for(lease_dutation_time * 2);

// Check liveliness changed status on both readers
{
auto liveliness = reader1.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 0);
EXPECT_EQ(liveliness.not_alive_count, 0);
}

{
auto liveliness = reader2.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 1);
EXPECT_EQ(liveliness.not_alive_count, 0);
}

// Destroy writer2 and wait twice the lease duration time
writer2.destroy();
std::this_thread::sleep_for(lease_dutation_time * 2);

// Check liveliness changed status on both readers
{
auto liveliness = reader1.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 0);
EXPECT_EQ(liveliness.not_alive_count, 0);
}

{
auto liveliness = reader2.liveliness_changed_status();
EXPECT_EQ(liveliness.alive_count, 0);
EXPECT_EQ(liveliness.not_alive_count, 0);
}
}

TEST_P(LivelinessQos, IndependentTopics_reliable)
{
test_liveliness_qos_independent_topics(TEST_TOPIC_NAME, eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS);
}

TEST_P(LivelinessQos, IndependentTopics_besteffort)
{
test_liveliness_qos_independent_topics(TEST_TOPIC_NAME, eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS);
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down
Loading
Loading