Skip to content

Commit

Permalink
TCPSendResources cleanup (#4300)
Browse files Browse the repository at this point in the history
* Refs #20120: Remove unused include

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: TCP event call

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Sanitize transport

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Added tests and minor fixes

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Extended doxygen description and added to versions.md

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Uncrustify

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Add missing header

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Fix tests

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Uncrustify

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: After client-server decision making rebase, not working

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Update

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Fix for chaining-transports

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Add new channel connection status and tests

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: PR refactor, timed event deleted. cleanup on pdp unbinding

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Uncrustify

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Add unittests

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Fix deadlock

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Fix unittest

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Fix asio throwing exceptions

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Unnittest untab

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Apply suggestions

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Uncrustify

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Consider wan case + associated tests

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Remove versions.md update

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Fix rebasing wrong deletion

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Delete assert clause

Signed-off-by: Jesus Perez <[email protected]>

* Refs #20120: Apply suggestions

Signed-off-by: EduPonz <[email protected]>

---------

Signed-off-by: Jesus Perez <[email protected]>
Signed-off-by: EduPonz <[email protected]>
Co-authored-by: EduPonz <[email protected]>
  • Loading branch information
jepemi and EduPonz committed Mar 13, 2024
1 parent 1c890fe commit 21e6a64
Show file tree
Hide file tree
Showing 22 changed files with 755 additions and 638 deletions.
18 changes: 18 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
#include <fastrtps/types/TypeObjectFactory.h>
#include <fastrtps/types/DynamicPubSubType.h>

#include <fastdds/rtps/common/LocatorList.hpp>

#include <fastrtps/utils/TimeConversion.h>
#include <fastrtps/utils/IPLocator.h>
#include "fastrtps/utils/shared_mutex.hpp"
Expand Down Expand Up @@ -1239,6 +1241,21 @@ bool PDP::remove_remote_participant(

this->mp_mutex->lock();

// Delete from sender resource list (TCP only)
LocatorList_t remote_participant_locators;
for (auto& remote_participant_default_locator : pdata->default_locators.unicast)
{
remote_participant_locators.push_back(remote_participant_default_locator);
}
for (auto& remote_participant_metatraffic_locator : pdata->metatraffic_locators.unicast)
{
remote_participant_locators.push_back(remote_participant_metatraffic_locator);
}
if (!remote_participant_locators.empty())
{
mp_RTPSParticipant->update_removed_participant(remote_participant_locators);
}

// Return reader proxy objects to pool
for (auto pit : *pdata->m_readers)
{
Expand Down Expand Up @@ -1266,6 +1283,7 @@ bool PDP::remove_remote_participant(
participant_proxies_pool_.push_back(pdata);

this->mp_mutex->unlock();

return true;
}

Expand Down
21 changes: 20 additions & 1 deletion src/cpp/rtps/network/NetworkFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
#include <utility>

#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/participant/RTPSParticipant.h>
#include <fastdds/rtps/transport/TransportDescriptorInterface.h>
#include <fastrtps/utils/IPFinder.h>
#include <fastrtps/utils/IPLocator.h>

#include <rtps/transport/UDPv4Transport.h>
#include <rtps/transport/TCPTransportInterface.h>

using namespace std;
using namespace eprosima::fastdds::rtps;
Expand Down Expand Up @@ -471,6 +472,24 @@ void NetworkFactory::update_network_interfaces()
}
}

void NetworkFactory::remove_participant_associated_send_resources(
SendResourceList& send_resource_list,
const LocatorList_t& remote_participant_locators,
const LocatorList_t& participant_initial_peers) const
{
for (auto& transport : mRegisteredTransports)
{
TCPTransportInterface* tcp_transport = dynamic_cast<TCPTransportInterface*>(transport.get());
if (tcp_transport)
{
tcp_transport->CloseOutputChannel(
send_resource_list,
remote_participant_locators,
participant_initial_peers);
}
}
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
13 changes: 13 additions & 0 deletions src/cpp/rtps/network/NetworkFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <memory>

#include <fastdds/rtps/common/Locator.h>
#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/common/LocatorSelector.hpp>
#include <fastdds/rtps/messages/MessageReceiver.h>
#include <fastdds/rtps/transport/SenderResource.h>
Expand Down Expand Up @@ -246,6 +247,18 @@ class NetworkFactory
*/
void update_network_interfaces();

/**
* Remove the given participants from the send resource list
*
* @param send_resource_list List of send resources associated to the local participant.
* @param remote_participant_locators List of locators associated to the remote participant.
* @param participant_initial_peers List of locators of the initial peers of the local participant.
*/
void remove_participant_associated_send_resources(
fastdds::rtps::SendResourceList& send_resource_list,
const LocatorList_t& remote_participant_locators,
const LocatorList_t& participant_initial_peers) const;

private:

std::vector<std::unique_ptr<fastdds::rtps::TransportInterface>> mRegisteredTransports;
Expand Down
15 changes: 15 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
#include <fastdds/rtps/writer/StatelessPersistentWriter.h>
#include <fastdds/rtps/writer/StatefulPersistentWriter.h>

#include <fastdds/rtps/common/LocatorList.hpp>

#include <fastrtps/utils/IPFinder.h>
#include <fastrtps/utils/Semaphore.h>
#include <fastrtps/xmlparser/XMLProfileManager.h>
Expand Down Expand Up @@ -2982,6 +2984,19 @@ bool RTPSParticipantImpl::should_match_local_endpoints(
return should_match_local_endpoints;
}

void RTPSParticipantImpl::update_removed_participant(
const LocatorList_t& remote_participant_locators)
{
if (!remote_participant_locators.empty())
{
std::lock_guard<std::timed_mutex> guard(m_send_resources_mutex_);
m_network_Factory.remove_participant_associated_send_resources(
send_resource_list_,
remote_participant_locators,
m_att.builtin.initialPeersList);
}
}

} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */
9 changes: 9 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <fastdds/rtps/builtin/data/ReaderProxyData.h>
#include <fastdds/rtps/builtin/data/WriterProxyData.h>
#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/history/IChangePool.h>
#include <fastdds/rtps/history/IPayloadPool.h>
#include <fastdds/rtps/messages/MessageReceiver.h>
Expand Down Expand Up @@ -1266,6 +1267,14 @@ class RTPSParticipantImpl
return match_local_endpoints_;
}

/**
* Method called on participant removal with the set of locators associated to the participant.
*
* @param remote_participant_locators Set of locators associated to the participant removed.
*/
void update_removed_participant(
const LocatorList_t& remote_participant_locators);

};
} // namespace rtps
} /* namespace rtps */
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/transport/TCPSenderResource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource
// Implementation functions are bound to the right transport parameters
clean_up = [this, &transport]()
{
transport.CloseOutputChannel(locator_);
transport.SenderResourceHasBeenClosed(locator_);
};

send_lambda_ = [this, &transport](
Expand Down Expand Up @@ -68,7 +68,7 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource
}

static TCPSenderResource* cast(
TransportInterface& transport,
const TransportInterface& transport,
SenderResource* sender_resource)
{
TCPSenderResource* returned_resource = nullptr;
Expand Down
100 changes: 88 additions & 12 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <chrono>
#include <cstring>
#include <map>
#include <set>
#include <memory>
#include <mutex>
#include <string>
Expand Down Expand Up @@ -273,6 +274,19 @@ Locator TCPTransportInterface::remote_endpoint_to_locator(
return locator;
}

Locator TCPTransportInterface::local_endpoint_to_locator(
const std::shared_ptr<TCPChannelResource>& channel) const
{
Locator locator;
asio::error_code ec;
endpoint_to_locator(channel->local_endpoint(ec), locator);
if (ec)
{
LOCATOR_INVALID(locator);
}
return locator;
}

void TCPTransportInterface::bind_socket(
std::shared_ptr<TCPChannelResource>& channel)
{
Expand Down Expand Up @@ -639,11 +653,24 @@ bool TCPTransportInterface::transform_remote_locator(
return false;
}

void TCPTransportInterface::CloseOutputChannel(
void TCPTransportInterface::SenderResourceHasBeenClosed(
fastrtps::rtps::Locator_t& locator)
{
locator.set_Invalid_Address();
locator.port = 0;
// The TCPSendResource associated channel cannot be removed from the channel_resources_ map. On transport's destruction
// this map is consulted to send the unbind requests. If not sending it, the other participant wouldn't disconnect the
// socket and keep a connection status of eEstablished. This would prevent new connect calls since it thinks it's already
// connected.
// If moving this unbind send with the respective channel disconnection to this point, the following problem arises:
// If receiving a SenderResourceHasBeenClosed call after receiving an unbinding message from a remote participant (our participant
// isn't disconnecting but we want to erase this send resource), the channel cannot be disconnected here since the listening thread has
// taken the read mutex (permanently waiting at read asio layer). This mutex is also needed to disconnect the socket (deadlock).
// Socket disconnection should always be done in the listening thread (or in the transport cleanup, when receiver resources have
// already been destroyed and the listening thread had consequently finished).
// An assert() clause finding the respective channel resource cannot be made since in LARGE DATA scenario, where the PDP discovery is done
// via UDP, a server's send resource can be created with without any associated channel resource until receiving a connection request from
// the client.
// The send resource locator is invalidated to prevent further use of associated channel.
LOCATOR_INVALID(locator);
}

bool TCPTransportInterface::CloseInputChannel(
Expand Down Expand Up @@ -1192,7 +1219,6 @@ bool TCPTransportInterface::Receive(
{
std::shared_ptr<RTCPMessageManager> rtcp_message_manager;
if (TCPChannelResource::eConnectionStatus::eDisconnected != channel->connection_status())

{
std::unique_lock<std::mutex> lock(rtcp_message_manager_mutex_);
rtcp_message_manager = rtcp_manager.lock();
Expand Down Expand Up @@ -1441,10 +1467,8 @@ void TCPTransportInterface::SocketAccepted(
create_listening_thread(channel);

EPROSIMA_LOG_INFO(RTCP, "Accepted connection (local: "
<< channel->local_endpoint().address() << ":"
<< channel->local_endpoint().port() << "), remote: "
<< channel->remote_endpoint().address() << ":"
<< channel->remote_endpoint().port() << ")");
<< local_endpoint_to_locator(channel) << ", remote: "
<< remote_endpoint_to_locator(channel) << ")");
}
else
{
Expand Down Expand Up @@ -1486,10 +1510,8 @@ void TCPTransportInterface::SecureSocketAccepted(
create_listening_thread(secure_channel);

EPROSIMA_LOG_INFO(RTCP, " Accepted connection (local: "
<< socket->lowest_layer().local_endpoint().address() << ":"
<< socket->lowest_layer().local_endpoint().port() << "), remote: "
<< socket->lowest_layer().remote_endpoint().address() << ":"
<< socket->lowest_layer().remote_endpoint().port() << ")");
<< local_endpoint_to_locator(secure_channel) << ", remote: "
<< remote_endpoint_to_locator(secure_channel) << ")");
}
else
{
Expand Down Expand Up @@ -1842,6 +1864,60 @@ void TCPTransportInterface::fill_local_physical_port(
}
}

void TCPTransportInterface::CloseOutputChannel(
SendResourceList& send_resource_list,
const LocatorList& remote_participant_locators,
const LocatorList& participant_initial_peers) const
{
// Since send resources handle physical locators, we need to convert the remote participant locators to physical
std::set<Locator> remote_participant_physical_locators;
for (const Locator& remote_participant_locator : remote_participant_locators)
{
remote_participant_physical_locators.insert(IPLocator::toPhysicalLocator(remote_participant_locator));

// Also add the WANtoLANLocator ([0][WAN] address) if the remote locator is a WAN locator. In WAN scenario,
//initial peer can also work with the WANtoLANLocator of the remote participant.
if (IPLocator::hasWan(remote_participant_locator))
{
remote_participant_physical_locators.insert(IPLocator::toPhysicalLocator(IPLocator::WanToLanLocator(
remote_participant_locator)));
}
}

// Exlude initial peers.
for (const auto& initial_peer : participant_initial_peers)
{
if (std::find(remote_participant_physical_locators.begin(), remote_participant_physical_locators.end(),
IPLocator::toPhysicalLocator(initial_peer)) != remote_participant_physical_locators.end())
{
remote_participant_physical_locators.erase(IPLocator::toPhysicalLocator(initial_peer));
}
}

for (const auto& remote_participant_physical_locator : remote_participant_physical_locators)
{
if (!IsLocatorSupported(remote_participant_physical_locator))
{
continue;
}
// Remove send resources for the associated remote participant locator
for (auto it = send_resource_list.begin(); it != send_resource_list.end();)
{
TCPSenderResource* tcp_sender_resource = TCPSenderResource::cast(*this, it->get());

if (tcp_sender_resource)
{
if (tcp_sender_resource->locator() == remote_participant_physical_locator)
{
it = send_resource_list.erase(it);
continue;
}
}
++it;
}
}
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
20 changes: 19 additions & 1 deletion src/cpp/rtps/transport/TCPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ class TCPTransportInterface : public TransportInterface
Locator remote_endpoint_to_locator(
const std::shared_ptr<TCPChannelResource>& channel) const;

/**
* Converts a local endpoint to a locator if possible. Otherwise, it sets an invalid locator.
*/
Locator local_endpoint_to_locator(
const std::shared_ptr<TCPChannelResource>& channel) const;

/**
* Shutdown method to close the connections of the transports.
*/
Expand Down Expand Up @@ -241,7 +247,7 @@ class TCPTransportInterface : public TransportInterface
const Locator&) override;

//! Resets the locator bound to the sender resource.
void CloseOutputChannel(
void SenderResourceHasBeenClosed(
fastrtps::rtps::Locator_t& locator);

//! Reports whether Locators correspond to the same port.
Expand Down Expand Up @@ -478,6 +484,18 @@ class TCPTransportInterface : public TransportInterface
return non_blocking_send_;
}

/**
* Close the output channel associated to the given remote participant but if its locators belong to the
* given list of initial peers.
*
* @param send_resource_list List of send resources associated to the local participant.
* @param remote_participant_locators Set of locators associated to the remote participant.
* @param participant_initial_peers List of locators associated to the initial peers of the local participant.
*/
void CloseOutputChannel(
SendResourceList& send_resource_list,
const LocatorList& remote_participant_locators,
const LocatorList& participant_initial_peers) const;
};

} // namespace rtps
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/UDPSenderResource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class UDPSenderResource : public fastrtps::rtps::SenderResource
// Implementation functions are bound to the right transport parameters
clean_up = [this, &transport]()
{
transport.CloseOutputChannel(socket_);
transport.SenderResourceHasBeenClosed(socket_);
};

send_lambda_ = [this, &transport](
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/UDPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ bool UDPTransportInterface::CloseInputChannel(
return true;
}

void UDPTransportInterface::CloseOutputChannel(
void UDPTransportInterface::SenderResourceHasBeenClosed(
eProsimaUDPSocket& socket)
{
socket.cancel();
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/UDPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class UDPTransportInterface : public TransportInterface
const Locator&) override;

//! Removes all outbound sockets on the given port.
void CloseOutputChannel(
void SenderResourceHasBeenClosed(
eProsimaUDPSocket& socket);

//! Reports whether Locators correspond to the same port.
Expand Down
Loading

0 comments on commit 21e6a64

Please sign in to comment.