diff --git a/CMakeLists.txt b/CMakeLists.txt index 2d6483c..8d3426e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -27,11 +27,12 @@ if(NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES) set_property(CACHE CMAKE_BUILD_TYPE PROPERTY STRINGS "Debug" "Release" "MinSizeRel" "RelWithDebInfo") endif() -project(microxrcedds VERSION "1.3.1") +project(microxrcedds VERSION "2.0.0") set(_client_tag develop) +set(_client_version 2.0.0) set(_agent_tag develop) -set(_gen_tag develop) +set(_gen_tag master) ############################################################################### # Build options. @@ -150,6 +151,9 @@ if(UXRCE_ENABLE_AGENT) -DLIB_INSTALL_DIR:PATH=${LIB_INSTALL_DIR} -DDATA_INSTALL_DIR:PATH=${DATA_INSTALL_DIR} -DUAGENT_BUILD_TESTS:BOOL=${UXRCE_BUILD_TESTS} + -DUAGENT_P2P_CLIENT_TAG:STRING=${_client_tag} + -DUAGENT_P2P_CLIENT_VERSION:STRING=${_client_version} + -DUAGENT_BUILD_EXECUTABLE:BOOL=OFF -DUAGENT_ISOLATED_INSTALL:BOOL=OFF -DGTEST_INDIVIDUAL:BOOL=ON DEPENDS diff --git a/Dockerfile b/Dockerfile index bbab829..b8bc25b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -34,7 +34,7 @@ RUN cd /uxrce/build && \ -DCMAKE_INSTALL_PREFIX=../install \ -DUXRCE_BUILD_EXAMPLES=ON \ .. &&\ - make && make install + make -j $(nproc) && make install # Prepare Micro XRCE-DDS artifacts RUN cd /uxrce && \ diff --git a/ci/linux/CMakeLists.txt b/ci/linux/CMakeLists.txt index ee6e112..9ee5a57 100644 --- a/ci/linux/CMakeLists.txt +++ b/ci/linux/CMakeLists.txt @@ -24,10 +24,10 @@ include(ExternalProject) include(CheckCCompilerFlag) include(CheckCXXCompilerFlag) -set(_c_flags "-fwrapv -fprofile-arcs -ftest-coverage") -set(_cxx_flags "-fwrapv -fprofile-arcs -ftest-coverage") -set(_exe_linker_flags "-fprofile-arcs -ftest-coverage") -set(_shared_linker_flags "-fprofile-arcs -ftest-coverage") +set(_c_flags "-fwrapv -fprofile-arcs -ftest-coverage --coverage -fno-inline -fno-inline-small-functions -fno-default-inline") +set(_cxx_flags "-fwrapv -fprofile-arcs -ftest-coverage --coverage -fno-inline -fno-inline-small-functions -fno-default-inline") +set(_exe_linker_flags "-fprofile-arcs -ftest-coverage --coverage -fno-inline -fno-inline-small-functions -fno-default-inline") +set(_shared_linker_flags "-fprofile-arcs -ftest-coverage --coverage -fno-inline -fno-inline-small-functions -fno-default-inline") check_cxx_compiler_flag("-fprofile-abs-path" _have_fprofile_abs_path) if(_have_fprofile_abs_path) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 89032ab..6e6f8ec 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -46,4 +46,5 @@ add_subdirectory(test/interaction_client) add_subdirectory(test/client_agent) add_subdirectory(test/publisher_subscriber) add_subdirectory(test/discovery) +add_subdirectory(test/custom_transports) #add_subdirectory(test/shapes_demo) TODO (julibert): fix client and agent paths. diff --git a/test/profiling/publisher/main.c b/test/profiling/publisher/main.c index dc8c58b..02e7cb0 100644 --- a/test/profiling/publisher/main.c +++ b/test/profiling/publisher/main.c @@ -39,8 +39,7 @@ int main(int args, char** argv) // Transport uxrUDPTransport transport; - uxrUDPPlatform udp_platform; - if(!uxr_init_udp_transport(&transport, &udp_platform, UXR_IPv4, "127.0.0.1", "2020")) + if(!uxr_init_udp_transport(&transport, UXR_IPv4, "127.0.0.1", "2020")) { printf("Error at create transport.\n"); return 1; diff --git a/test/profiling/subscriber/main.c b/test/profiling/subscriber/main.c index 9e37d90..b595968 100644 --- a/test/profiling/subscriber/main.c +++ b/test/profiling/subscriber/main.c @@ -59,8 +59,7 @@ int main(int args, char** argv) // Transport uxrUDPTransport transport; - uxrUDPPlatform udp_platform; - if(!uxr_init_udp_transport(&transport, &udp_platform, UXR_IPv4, "127.0.0.1", "2020")) + if(!uxr_init_udp_transport(&transport, UXR_IPv4, "127.0.0.1", "2020")) { printf("Error at create transport.\n"); return 1; diff --git a/test/test/client_agent/ClientAgentInteraction.cpp b/test/test/client_agent/ClientAgentInteraction.cpp index 99c228e..7a1d17e 100644 --- a/test/test/client_agent/ClientAgentInteraction.cpp +++ b/test/test/client_agent/ClientAgentInteraction.cpp @@ -12,6 +12,8 @@ #include #endif +#include + #include #include @@ -21,6 +23,7 @@ class ClientAgentInteraction : public ::testing::TestWithParam(GetParam())); const float LOST = 0.1f; + static const uint8_t INIT_CLOSE_RETRIES = 20; ClientAgentInteraction() : transport_(std::get<0>(GetParam())) @@ -47,13 +50,26 @@ class ClientAgentInteraction : public ::testing::TestWithParamset_verbose_level(6); ASSERT_TRUE(agent_udp4_->start()); break; + } case Transport::UDP_IPV6_TRANSPORT: + { agent_udp6_.reset(new eprosima::uxr::UDPv6Agent(port, middleware_)); agent_udp6_->set_verbose_level(6); ASSERT_TRUE(agent_udp6_->start()); break; + } case Transport::TCP_IPV4_TRANSPORT: + { agent_tcp4_.reset(new eprosima::uxr::TCPv4Agent(port, middleware_)); agent_tcp4_->set_verbose_level(6); ASSERT_TRUE(agent_tcp4_->start()); break; + } case Transport::TCP_IPV6_TRANSPORT: + { agent_tcp6_.reset(new eprosima::uxr::TCPv6Agent(port, middleware_)); agent_tcp6_->set_verbose_level(6); ASSERT_TRUE(agent_tcp6_->start()); break; + } + case Transport::CUSTOM_WITHOUT_FRAMING: + { + try + { + agent_custom_endpoint_.add_member("index"); + } + catch(const std::exception& /*e*/) + { + // Do nothing + } + + + agent_custom_.reset(new eprosima::uxr::CustomAgent( + "custom_agent", + &agent_custom_endpoint_, + middleware_, + false, + agent_custom_transport_open, + agent_custom_transport_close, + agent_custom_transport_write_packet, + agent_custom_transport_read_packet)); + agent_custom_->set_verbose_level(6); + ASSERT_TRUE(agent_custom_->start()); + break; + } + case Transport::CUSTOM_WITH_FRAMING: + { + try + { + agent_custom_endpoint_.add_member("index"); + } + catch(const std::exception& /*e*/) + { + // Do nothing + } + + agent_custom_.reset(new eprosima::uxr::CustomAgent( + "custom_agent", + &agent_custom_endpoint_, + middleware_, + true, + agent_custom_transport_open, + agent_custom_transport_close, + agent_custom_transport_write_stream, + agent_custom_transport_read_stream)); + agent_custom_->set_verbose_level(6); + ASSERT_TRUE(agent_custom_->start()); + break; + } + } } @@ -96,17 +170,31 @@ class ClientAgentInteraction : public ::testing::TestWithParamstop()); break; + } case Transport::UDP_IPV6_TRANSPORT: + { ASSERT_TRUE(agent_udp6_->stop()); break; + } case Transport::TCP_IPV4_TRANSPORT: + { ASSERT_TRUE(agent_tcp4_->stop()); break; + } case Transport::TCP_IPV6_TRANSPORT: + { ASSERT_TRUE(agent_tcp6_->stop()); - break; + break; + } + case Transport::CUSTOM_WITHOUT_FRAMING: + case Transport::CUSTOM_WITH_FRAMING: + { + ASSERT_TRUE(agent_custom_->stop()); + break; + } } } @@ -116,13 +204,16 @@ class ClientAgentInteraction : public ::testing::TestWithParam agent_udp6_; std::unique_ptr agent_tcp4_; std::unique_ptr agent_tcp6_; + std::unique_ptr agent_custom_; + eprosima::uxr::CustomEndPoint agent_custom_endpoint_; + eprosima::uxr::Middleware::Kind middleware_; Client client_; }; TEST_P(ClientAgentInteraction, InitCloseSession) { - for (int i = 0; i < 20; ++i) + for (int i = 0; i < ClientAgentInteraction::INIT_CLOSE_RETRIES; ++i) { TearDown(); SetUp(); @@ -133,15 +224,21 @@ TEST_P(ClientAgentInteraction, NewEntitiesCreationXMLBestEffort) { switch (std::get<1>(GetParam())) { - case MiddlewareKind::FASTDDS: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x01, UXR_STATUS_OK, 0)); - break; - case MiddlewareKind::FASTRTPS: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x01, UXR_STATUS_OK, 0)); - break; - case MiddlewareKind::CED: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x01, UXR_STATUS_OK, 0)); - break; + case MiddlewareKind::FASTDDS: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x01, UXR_STATUS_OK, 0)); + break; + } + case MiddlewareKind::FASTRTPS: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x01, UXR_STATUS_OK, 0)); + break; + } + case MiddlewareKind::CED: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x01, UXR_STATUS_OK, 0)); + break; + } } } @@ -149,15 +246,21 @@ TEST_P(ClientAgentInteraction, NewEntitiesCreationXMLReliable) { switch (std::get<1>(GetParam())) { - case MiddlewareKind::FASTDDS: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); - break; - case MiddlewareKind::FASTRTPS: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); - break; - case MiddlewareKind::CED: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); - break; + case MiddlewareKind::FASTDDS: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); + break; + } + case MiddlewareKind::FASTRTPS: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); + break; + } + case MiddlewareKind::CED: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); + break; + } } } @@ -165,15 +268,21 @@ TEST_P(ClientAgentInteraction, NewEntitiesCreationREFBestEffort) { switch (std::get<1>(GetParam())) { - case MiddlewareKind::FASTDDS: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x01, UXR_STATUS_OK, 0)); - break; - case MiddlewareKind::FASTRTPS: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x01, UXR_STATUS_OK, 0)); - break; - case MiddlewareKind::CED: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x01, UXR_STATUS_OK, 0)); - break; + case MiddlewareKind::FASTDDS: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x01, UXR_STATUS_OK, 0)); + break; + } + case MiddlewareKind::FASTRTPS: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x01, UXR_STATUS_OK, 0)); + break; + } + case MiddlewareKind::CED: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x01, UXR_STATUS_OK, 0)); + break; + } } } @@ -181,15 +290,21 @@ TEST_P(ClientAgentInteraction, NewEntitiesCreationREFReliable) { switch (std::get<1>(GetParam())) { - case MiddlewareKind::FASTDDS: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0)); - break; - case MiddlewareKind::FASTRTPS: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0)); - break; - case MiddlewareKind::CED: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0)); - break; + case MiddlewareKind::FASTDDS: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0)); + break; + } + case MiddlewareKind::FASTRTPS: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0)); + break; + } + case MiddlewareKind::CED: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0)); + break; + } } } @@ -197,18 +312,24 @@ TEST_P(ClientAgentInteraction, ExistantEntitiesCreationReuseXMLXMLReliable) { switch (std::get<1>(GetParam())) { - case MiddlewareKind::FASTDDS: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE)); - break; - case MiddlewareKind::FASTRTPS: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE)); - break; - case MiddlewareKind::CED: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE)); - break; + case MiddlewareKind::FASTDDS: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE)); + break; + } + case MiddlewareKind::FASTRTPS: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE)); + break; + } + case MiddlewareKind::CED: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE)); + break; + } } } @@ -224,18 +345,24 @@ TEST_P(ClientAgentInteraction, ExistantEntitiesCreationReuseREFREFReliable) { switch (std::get<1>(GetParam())) { - case MiddlewareKind::FASTDDS: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0)); - ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE)); - break; - case MiddlewareKind::FASTRTPS: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0)); - ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE)); - break; - case MiddlewareKind::CED: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0)); - ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE)); - break; + case MiddlewareKind::FASTDDS: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0)); + ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE)); + break; + } + case MiddlewareKind::FASTRTPS: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0)); + ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE)); + break; + } + case MiddlewareKind::CED: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0)); + ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE)); + break; + } } } @@ -243,18 +370,24 @@ TEST_P(ClientAgentInteraction, ExistantEntitiesCreationReplaceReliable) { switch (std::get<1>(GetParam())) { - case MiddlewareKind::FASTDDS: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, UXR_REPLACE)); - break; - case MiddlewareKind::FASTRTPS: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, UXR_REPLACE)); - break; - case MiddlewareKind::CED: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, UXR_REPLACE)); - break; + case MiddlewareKind::FASTDDS: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, UXR_REPLACE)); + break; + } + case MiddlewareKind::FASTRTPS: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, UXR_REPLACE)); + break; + } + case MiddlewareKind::CED: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, UXR_REPLACE)); + break; + } } } @@ -262,18 +395,24 @@ TEST_P(ClientAgentInteraction, ExistantEntitiesCreationNoReplaceReliable) { switch (std::get<1>(GetParam())) { - case MiddlewareKind::FASTDDS: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_ERR_ALREADY_EXISTS, 0)); - break; - case MiddlewareKind::FASTRTPS: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_ERR_ALREADY_EXISTS, 0)); - break; - case MiddlewareKind::CED: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_ERR_ALREADY_EXISTS, 0)); - break; + case MiddlewareKind::FASTDDS: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_ERR_ALREADY_EXISTS, 0)); + break; + } + case MiddlewareKind::FASTRTPS: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_ERR_ALREADY_EXISTS, 0)); + break; + } + case MiddlewareKind::CED: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_ERR_ALREADY_EXISTS, 0)); + break; + } } } @@ -281,21 +420,33 @@ TEST_P(ClientAgentInteraction, ExistantEntitiesCreationReplaceReuseReliable) { switch (std::get<1>(GetParam())) { - case MiddlewareKind::FASTDDS: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REPLACE | UXR_REUSE)); - break; - case MiddlewareKind::FASTRTPS: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REPLACE | UXR_REUSE)); - break; - case MiddlewareKind::CED: - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); - ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REPLACE | UXR_REUSE)); - break; + case MiddlewareKind::FASTDDS: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REPLACE | UXR_REUSE)); + break; + } + case MiddlewareKind::FASTRTPS: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REPLACE | UXR_REUSE)); + break; + } + case MiddlewareKind::CED: + { + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0)); + ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REPLACE | UXR_REUSE)); + break; + } } } +TEST_P(ClientAgentInteraction, PingFromClientToAgent) +{ + const Transport transport_kind(std::get<0>(GetParam())); + ASSERT_NO_FATAL_FAILURE(client_.ping_agent(transport_kind)); +} + INSTANTIATE_TEST_CASE_P( Transports, ClientAgentInteraction, @@ -303,6 +454,13 @@ INSTANTIATE_TEST_CASE_P( ::testing::Values(Transport::UDP_IPV4_TRANSPORT, Transport::TCP_IPV4_TRANSPORT, Transport::UDP_IPV6_TRANSPORT, Transport::TCP_IPV6_TRANSPORT), ::testing::Values(MiddlewareKind::FASTDDS, MiddlewareKind::FASTRTPS, MiddlewareKind::CED))); +INSTANTIATE_TEST_CASE_P( + CustomTransports, + ClientAgentInteraction, + ::testing::Combine( + ::testing::Values(Transport::CUSTOM_WITHOUT_FRAMING, Transport::CUSTOM_WITH_FRAMING), + ::testing::Values(MiddlewareKind::FASTDDS))); + int main(int args, char** argv) { ::testing::InitGoogleTest(&args, argv); diff --git a/test/test/cross_serialization/ClientSerialization.cpp b/test/test/cross_serialization/ClientSerialization.cpp index 917027d..41d342e 100644 --- a/test/test/cross_serialization/ClientSerialization.cpp +++ b/test/test/cross_serialization/ClientSerialization.cpp @@ -148,7 +148,7 @@ std::vector ClientSerialization::info_payload() payload.object_info.config._.agent.xrce_version = XrceVersion{0x01, 0x23}; payload.object_info.config._.agent.xrce_vendor_id = XrceVendorId{0x45, 0x67}; payload.object_info.activity.kind = DDS_XRCE_OBJK_AGENT; - payload.object_info.activity._.agent.availibility = 1; + payload.object_info.activity._.agent.availability = 1; payload.object_info.activity._.agent.address_seq.size = 0x01; payload.object_info.activity._.agent.address_seq.data[0].format = ADDRESS_FORMAT_MEDIUM; payload.object_info.activity._.agent.address_seq.data[0]._.medium_locator.locator_port = 0x0123; diff --git a/test/test/custom_transports/CMakeLists.txt b/test/test/custom_transports/CMakeLists.txt new file mode 100644 index 0000000..3e16661 --- /dev/null +++ b/test/test/custom_transports/CMakeLists.txt @@ -0,0 +1,62 @@ +# Copyright 2021-present Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# include(${PROJECT_SOURCE_DIR}/cmake/common/check_configuration.cmake) + +# cmake_host_system_information(RESULT HOSTNAME_SUFFIX QUERY HOSTNAME) + +# configure_file(${CMAKE_CURRENT_SOURCE_DIR}/../common/EntitiesInfo.hpp.in +# ${CMAKE_CURRENT_BINARY_DIR}/EntitiesInfo.hpp +# @ONLY +# ) + +set(SRCS + Custom_transports.cpp + ) + +add_library(custom_transports STATIC ${SRCS}) + +set_common_compile_options(custom_transports) + +if(MSVC OR MSVC_IDE) + target_compile_options(custom_transports + PRIVATE + /wd4996 + PUBLIC + -D_CRT_SECURE_NO_WARNINGS + ) +endif() + +target_include_directories(custom_transports + PUBLIC + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_BINARY_DIR} + PRIVATE + ${GTEST_INCLUDE_DIRS} + ) + +target_link_libraries(custom_transports + PUBLIC + microxrcedds_client + microxrcedds_agent + PRIVATE + ${GTEST_BOTH_LIBRARIES} + ) + +set_target_properties(custom_transports PROPERTIES + CXX_STANDARD + 11 + CXX_STANDARD_REQUIRED + YES + ) diff --git a/test/test/custom_transports/Custom_transports.cpp b/test/test/custom_transports/Custom_transports.cpp new file mode 100644 index 0000000..19013b7 --- /dev/null +++ b/test/test/custom_transports/Custom_transports.cpp @@ -0,0 +1,330 @@ +#include "Custom_transports.hpp" +#include + +#include +#include +#include + +using packet_fifo = std::queue>; +using stream_fifo = std::queue; + +static std::map client_to_agent_packet_queue; +static std::map agent_to_client_packet_queue; +static std::map client_to_agent_stream_queue; +static std::map agent_to_client_stream_queue; + +std::mutex transport_mtx; + +template +static int32_t find_queue_with_data(const std::map& m) +{ + for (auto const& it : m) + { + if (!it.second.empty()) + { + return it.first; + } + } + + return -1; +} + +template +static void erase_fifo_by_index(std::map& m, const int32_t index) +{ + auto it = m.find(index); + if (it != m.end()) + m.erase (it); +} + +eprosima::uxr::CustomAgent::InitFunction agent_custom_transport_open = []() -> bool +{ + return true; +}; + +eprosima::uxr::CustomAgent::FiniFunction agent_custom_transport_close = []() -> bool +{ + std::unique_lock lock(transport_mtx); + + client_to_agent_stream_queue.clear(); + client_to_agent_packet_queue.clear(); + agent_to_client_stream_queue.clear(); + agent_to_client_stream_queue.clear(); + + return true; +}; + +eprosima::uxr::CustomAgent::RecvMsgFunction agent_custom_transport_read_packet = []( + eprosima::uxr::CustomEndPoint* source_endpoint, + uint8_t* buffer, + size_t buffer_length, + int timeout, + eprosima::uxr::TransportRc& transport_rc) -> ssize_t +{ + size_t rv = 0; + int64_t init_time = uxr_millis(); + bool received = false; + + transport_rc = eprosima::uxr::TransportRc::ok; + + while (uxr_millis() - init_time < timeout) + { + std::unique_lock lock(transport_mtx); + + int32_t index = find_queue_with_data(client_to_agent_packet_queue); + if (0 <= index) + { + auto data = client_to_agent_packet_queue[index].front(); + client_to_agent_packet_queue[index].pop(); + + if (data.size() <= buffer_length) + { + std::copy(data.begin(), data.end(), buffer); + rv = data.size(); + received = true; + std::cout << "Custom agent receive: " << rv << " bytes in queue " << index << std::endl; + source_endpoint->set_member_value("index", static_cast(index)); + } + else + { + transport_rc = eprosima::uxr::TransportRc::server_error; + } + + break; + } + + lock.unlock(); + std::this_thread::sleep_for(std::chrono::microseconds(10)); + } + + if (!received) + { + transport_rc = eprosima::uxr::TransportRc::timeout_error; + } + + return static_cast(rv); +}; + +eprosima::uxr::CustomAgent::SendMsgFunction agent_custom_transport_write_packet = []( + const eprosima::uxr::CustomEndPoint* destination_endpoint, + uint8_t* buffer, + size_t message_length, + eprosima::uxr::TransportRc& transport_rc) -> ssize_t +{ + std::unique_lock lock(transport_mtx); + int32_t index = static_cast(destination_endpoint->get_member("index")); + + std::vector packet(buffer, buffer + message_length); + agent_to_client_packet_queue[index].emplace(std::move(packet)); + transport_rc = eprosima::uxr::TransportRc::ok; + std::cout << "Custom agent send: " << message_length << " bytes." << std::endl; + + return static_cast(message_length); +}; + +eprosima::uxr::CustomAgent::RecvMsgFunction agent_custom_transport_read_stream = []( + eprosima::uxr::CustomEndPoint* source_endpoint, + uint8_t* buffer, + size_t buffer_length, + int timeout, + eprosima::uxr::TransportRc& transport_rc) -> ssize_t +{ + size_t rv = 0; + int64_t init_time = uxr_millis(); + bool received = false; + + transport_rc = eprosima::uxr::TransportRc::ok; + + while (uxr_millis() - init_time < timeout) + { + std::unique_lock lock(transport_mtx); + + int32_t index = find_queue_with_data(client_to_agent_stream_queue); + if (0 <= index) + { + rv = (buffer_length > client_to_agent_stream_queue[index].size()) ? + client_to_agent_stream_queue[index].size() : + buffer_length; + + for (size_t i = 0; i < rv; i++) + { + buffer[i] = client_to_agent_stream_queue[index].front(); + client_to_agent_stream_queue[index].pop(); + } + + std::cout << "Custom agent receive: " << rv << " bytes in queue " << index << std::endl; + + source_endpoint->set_member_value("index", static_cast(index)); + received = true; + + break; + } + + lock.unlock(); + std::this_thread::sleep_for(std::chrono::microseconds(10)); + } + + if (!received) + { + transport_rc = eprosima::uxr::TransportRc::timeout_error; + } + + return static_cast(rv); +}; + +eprosima::uxr::CustomAgent::SendMsgFunction agent_custom_transport_write_stream = []( + const eprosima::uxr::CustomEndPoint* destination_endpoint, + uint8_t* buffer, + size_t message_length, + eprosima::uxr::TransportRc& transport_rc) -> ssize_t +{ + std::unique_lock lock(transport_mtx); + int32_t index = static_cast(destination_endpoint->get_member("index")); + + for (size_t i = 0; i < message_length; i++) + { + agent_to_client_stream_queue[index].emplace(buffer[i]); + } + + transport_rc = eprosima::uxr::TransportRc::ok; + std::cout << "Custom agent send: " << message_length << " bytes to queue " << index << std::endl; + + return static_cast(message_length); +}; + + +// Client custom transport +extern "C" +{ + static int32_t global_index = 0; + + bool client_custom_transport_open(uxrCustomTransport* transport) + { + transport->args = malloc(sizeof(int32_t)); + *(int32_t*) transport->args = global_index++; + int32_t index = *(int32_t*) transport->args; + + std::cout << "Custom client creating: " << index << std::endl; + + return true; + } + + bool client_custom_transport_close(uxrCustomTransport* transport) + { + int32_t index = *(int32_t*) transport->args; + free(transport->args); + + std::unique_lock lock(transport_mtx); + + erase_fifo_by_index(client_to_agent_packet_queue, index); + erase_fifo_by_index(client_to_agent_stream_queue, index); + erase_fifo_by_index(agent_to_client_packet_queue, index); + erase_fifo_by_index(agent_to_client_stream_queue, index); + + return true; + } + + size_t client_custom_transport_write_packet(uxrCustomTransport* transport, const uint8_t* buf, size_t len, uint8_t* errcode) + { + (void) errcode; + + int32_t index = *(int32_t*) transport->args; + + std::unique_lock lock(transport_mtx); + + std::vector packet(buf, buf + len); + client_to_agent_packet_queue[index].emplace(std::move(packet)); + std::cout << "Custom client send: " << len << " bytes in queue " << index << std::endl; + + return len; + } + size_t client_custom_transport_read_packet(uxrCustomTransport* transport, uint8_t* buf, size_t len, int timeout, uint8_t* errcode) + { + (void) errcode; + + int32_t index = *(int32_t*) transport->args; + + size_t rv = 0; + int64_t init_time = uxr_millis(); + + while (uxr_millis() - init_time < timeout) + { + std::unique_lock lock(transport_mtx); + + if (0 < agent_to_client_packet_queue[index].size()) + { + auto data = agent_to_client_packet_queue[index].front(); + agent_to_client_packet_queue[index].pop(); + + if (data.size() <= len) + { + std::copy( data.begin(), data.end(), buf); + rv = data.size(); + std::cout << "Custom client receive: " << len << " bytes in queue " << index << std::endl; + } + else + { + *errcode = 1; + } + + break; + } + + lock.unlock(); + std::this_thread::sleep_for(std::chrono::microseconds(10)); + } + + return rv; + } + + size_t client_custom_transport_write_stream(uxrCustomTransport* transport, const uint8_t* buf, size_t len, uint8_t* errcode) + { + (void) errcode; + + int32_t index = *(int32_t*) transport->args; + + std::unique_lock lock(transport_mtx); + + for (size_t i = 0; i < len; i++) + { + client_to_agent_stream_queue[index].emplace(buf[i]); + } + + std::cout << "Custom client send: " << len << " bytes in queue " << index << std::endl; + + return len; + } + size_t client_custom_transport_read_stream(uxrCustomTransport* transport, uint8_t* buf, size_t len, int timeout, uint8_t* errcode) + { + (void) errcode; + + int32_t index = *(int32_t*) transport->args; + + size_t rv = 0; + int64_t init_time = uxr_millis(); + + while (uxr_millis() - init_time < timeout) + { + std::unique_lock lock(transport_mtx); + + if (0 < agent_to_client_stream_queue[index].size()) + { + rv = (len > agent_to_client_stream_queue[index].size()) ? + agent_to_client_stream_queue[index].size() : + len; + + for (size_t i = 0; i < rv; i++) + { + buf[i] = agent_to_client_stream_queue[index].front(); + agent_to_client_stream_queue[index].pop(); + } + + break; + } + lock.unlock(); + std::this_thread::sleep_for(std::chrono::microseconds(10)); + } + + return rv; + } +} diff --git a/test/test/custom_transports/Custom_transports.hpp b/test/test/custom_transports/Custom_transports.hpp new file mode 100644 index 0000000..341c355 --- /dev/null +++ b/test/test/custom_transports/Custom_transports.hpp @@ -0,0 +1,26 @@ +#ifndef IN_TEST_CUSTOM_TRANSPORT_HPP +#define IN_TEST_CUSTOM_TRANSPORT_HPP + +#include +#include + +// Agent custom transports +extern eprosima::uxr::CustomAgent::InitFunction agent_custom_transport_open; +extern eprosima::uxr::CustomAgent::FiniFunction agent_custom_transport_close; +extern eprosima::uxr::CustomAgent::RecvMsgFunction agent_custom_transport_read_stream; +extern eprosima::uxr::CustomAgent::SendMsgFunction agent_custom_transport_write_stream; +extern eprosima::uxr::CustomAgent::RecvMsgFunction agent_custom_transport_read_packet; +extern eprosima::uxr::CustomAgent::SendMsgFunction agent_custom_transport_write_packet; + +// Client custom transport +extern "C" +{ + bool client_custom_transport_open(uxrCustomTransport* transport); + bool client_custom_transport_close(uxrCustomTransport* transport); + size_t client_custom_transport_write_stream( uxrCustomTransport* transport, const uint8_t* buf, size_t len, uint8_t* errcode); + size_t client_custom_transport_read_stream( uxrCustomTransport* transport, uint8_t* buf, size_t len, int timeout, uint8_t* errcode); + size_t client_custom_transport_write_packet( uxrCustomTransport* transport, const uint8_t* buf, size_t len, uint8_t* errcode); + size_t client_custom_transport_read_packet( uxrCustomTransport* transport, uint8_t* buf, size_t len, int timeout, uint8_t* errcode); +} + +#endif //IN_TEST_CUSTOM_TRANSPORT_HPP diff --git a/test/test/interaction_client/BigHelloWorld.h b/test/test/interaction_client/BigHelloWorld.h index 2f9cdac..4cfbd90 100644 --- a/test/test/interaction_client/BigHelloWorld.h +++ b/test/test/interaction_client/BigHelloWorld.h @@ -37,7 +37,7 @@ extern "C" typedef struct BigHelloWorld { uint32_t index; - char message[4096]; + char message[8192]; } BigHelloWorld; diff --git a/test/test/interaction_client/CMakeLists.txt b/test/test/interaction_client/CMakeLists.txt index e0d2aae..1514a7f 100644 --- a/test/test/interaction_client/CMakeLists.txt +++ b/test/test/interaction_client/CMakeLists.txt @@ -51,6 +51,7 @@ target_include_directories(interaction_client target_link_libraries(interaction_client PUBLIC microxrcedds_client + custom_transports PRIVATE ${GTEST_BOTH_LIBRARIES} ) diff --git a/test/test/interaction_client/Client.hpp b/test/test/interaction_client/Client.hpp index 6a431e3..a167b9d 100644 --- a/test/test/interaction_client/Client.hpp +++ b/test/test/interaction_client/Client.hpp @@ -4,19 +4,24 @@ #include "BigHelloWorld.h" #include "Gateway.hpp" #include +#include <../custom_transports/Custom_transports.hpp> #include +#include #include #include #include #include -enum class Transport { +enum class Transport +{ UDP_IPV4_TRANSPORT, UDP_IPV6_TRANSPORT, TCP_IPV4_TRANSPORT, - TCP_IPV6_TRANSPORT + TCP_IPV6_TRANSPORT, + CUSTOM_WITH_FRAMING, + CUSTOM_WITHOUT_FRAMING }; inline bool operator == (const uxrObjectId& obj1, const uxrObjectId& obj2) @@ -33,6 +38,9 @@ inline bool operator == (const uxrStreamId& s1, const uxrStreamId& s2) && s1.direction == s2.direction; } +extern "C" bool flush_session(uxrSession* session){ + return uxr_run_session_until_confirm_delivery(session, 1000); +} class Client { @@ -197,8 +205,14 @@ class Client ucdrBuffer ub; uint32_t topic_size = BigHelloWorld_size_of_topic(&topic, 0); - bool prepared = uxr_prepare_output_stream(&session_, output_stream_id, datawriter_id, &ub, topic_size); - ASSERT_TRUE(prepared); + uint16_t prepared = false; + if (topic_size < mtu_) + { + prepared = uxr_prepare_output_stream(&session_, output_stream_id, datawriter_id, &ub, topic_size); + } else { + prepared = uxr_prepare_output_stream_fragmented(&session_, output_stream_id, datawriter_id, &ub, topic_size, flush_session); + } + ASSERT_NE(prepared, UXR_INVALID_REQUEST_ID); bool written = BigHelloWorld_serialize_topic(&ub, &topic); ASSERT_TRUE(written); ASSERT_FALSE(ub.error); @@ -249,24 +263,52 @@ class Client { case Transport::UDP_IPV4_TRANSPORT: mtu_ = UXR_CONFIG_UDP_TRANSPORT_MTU; - ASSERT_TRUE(uxr_init_udp_transport(&udp_transport_, &udp_platform_, UXR_IPv4, ip, port)); + ASSERT_TRUE(uxr_init_udp_transport(&udp_transport_, UXR_IPv4, ip, port)); uxr_init_session(&session_, gateway_.monitorize(&udp_transport_.comm), client_key_); break; case Transport::UDP_IPV6_TRANSPORT: mtu_ = UXR_CONFIG_UDP_TRANSPORT_MTU; - ASSERT_TRUE(uxr_init_udp_transport(&udp_transport_, &udp_platform_, UXR_IPv6, ip, port)); + ASSERT_TRUE(uxr_init_udp_transport(&udp_transport_, UXR_IPv6, ip, port)); uxr_init_session(&session_, gateway_.monitorize(&udp_transport_.comm), client_key_); break; case Transport::TCP_IPV4_TRANSPORT: mtu_ = UXR_CONFIG_TCP_TRANSPORT_MTU; - ASSERT_TRUE(uxr_init_tcp_transport(&tcp_transport_, &tcp_platform_, UXR_IPv4, ip, port)); + ASSERT_TRUE(uxr_init_tcp_transport(&tcp_transport_, UXR_IPv4, ip, port)); uxr_init_session(&session_, gateway_.monitorize(&tcp_transport_.comm), client_key_); break; case Transport::TCP_IPV6_TRANSPORT: mtu_ = UXR_CONFIG_TCP_TRANSPORT_MTU; - ASSERT_TRUE(uxr_init_tcp_transport(&tcp_transport_, &tcp_platform_, UXR_IPv6, ip, port)); + ASSERT_TRUE(uxr_init_tcp_transport(&tcp_transport_, UXR_IPv6, ip, port)); uxr_init_session(&session_, gateway_.monitorize(&tcp_transport_.comm), client_key_); break; + case Transport::CUSTOM_WITHOUT_FRAMING: + mtu_ = UXR_CONFIG_CUSTOM_TRANSPORT_MTU; + + uxr_set_custom_transport_callbacks( + &custom_transport_, + false, + client_custom_transport_open, + client_custom_transport_close, + client_custom_transport_write_packet, + client_custom_transport_read_packet); + + ASSERT_TRUE(uxr_init_custom_transport(&custom_transport_, NULL)); + uxr_init_session(&session_, gateway_.monitorize(&custom_transport_.comm), client_key_); + break; + case Transport::CUSTOM_WITH_FRAMING: + mtu_ = UXR_CONFIG_CUSTOM_TRANSPORT_MTU; + + uxr_set_custom_transport_callbacks( + &custom_transport_, + true, + client_custom_transport_open, + client_custom_transport_close, + client_custom_transport_write_stream, + client_custom_transport_read_stream); + + ASSERT_TRUE(uxr_init_custom_transport(&custom_transport_, NULL)); + uxr_init_session(&session_, gateway_.monitorize(&custom_transport_.comm), client_key_); + break; } init_common(); @@ -295,6 +337,10 @@ class Client case Transport::TCP_IPV6_TRANSPORT: ASSERT_TRUE(uxr_close_tcp_transport(&tcp_transport_)); break; + case Transport::CUSTOM_WITHOUT_FRAMING: + case Transport::CUSTOM_WITH_FRAMING: + ASSERT_TRUE(uxr_close_custom_transport(&custom_transport_)); + break; } } @@ -303,6 +349,35 @@ class Client return mtu_; } + void ping_agent( + const Transport transport_kind) + { + uxrCommunication* comm(nullptr); + + switch (transport_kind) + { + case Transport::UDP_IPV4_TRANSPORT: + case Transport::UDP_IPV6_TRANSPORT: + { + comm = &udp_transport_.comm; + break; + } + case Transport::TCP_IPV4_TRANSPORT: + case Transport::TCP_IPV6_TRANSPORT: + { + comm = &tcp_transport_.comm; + break; + } + case Transport::CUSTOM_WITHOUT_FRAMING: + case Transport::CUSTOM_WITH_FRAMING: + { + comm = &custom_transport_.comm; + break; + } + } + ASSERT_TRUE(uxr_ping_agent_attempts(comm, 1000, 1)); + } + private: void init_common() { @@ -315,12 +390,15 @@ class Client ASSERT_EQ(UXR_STATUS_OK, session_.info.last_requested_status); /* Setup streams. */ - output_best_effort_stream_buffer_.reset(new uint8_t[mtu_ * UXR_CONFIG_MAX_OUTPUT_BEST_EFFORT_STREAMS]{0}); - output_reliable_stream_buffer_.reset(new uint8_t[mtu_ * history_ * UXR_CONFIG_MAX_OUTPUT_RELIABLE_STREAMS]{0}); - input_reliable_stream_buffer_.reset(new uint8_t[mtu_ * history_ * UXR_CONFIG_MAX_INPUT_RELIABLE_STREAMS]{0}); + output_best_effort_stream_buffer_.reset( + new std::vector(mtu_ * UXR_CONFIG_MAX_OUTPUT_BEST_EFFORT_STREAMS, 0)); + output_reliable_stream_buffer_.reset( + new std::vector(mtu_ * history_ * UXR_CONFIG_MAX_OUTPUT_RELIABLE_STREAMS, 0)); + input_reliable_stream_buffer_.reset( + new std::vector(mtu_ * history_ * UXR_CONFIG_MAX_INPUT_RELIABLE_STREAMS, 0)); for(size_t i = 0; i < UXR_CONFIG_MAX_OUTPUT_BEST_EFFORT_STREAMS; ++i) { - uint8_t* buffer = output_best_effort_stream_buffer_.get() + mtu_ * i; + uint8_t* buffer = output_best_effort_stream_buffer_->data() + mtu_ * i; (void) uxr_create_output_best_effort_stream(&session_, buffer, mtu_); } for(size_t i = 0; i < UXR_CONFIG_MAX_INPUT_BEST_EFFORT_STREAMS; ++i) @@ -329,12 +407,12 @@ class Client } for(size_t i = 0; i < UXR_CONFIG_MAX_OUTPUT_RELIABLE_STREAMS; ++i) { - uint8_t* buffer = output_reliable_stream_buffer_.get() + mtu_ * history_ * i; + uint8_t* buffer = output_reliable_stream_buffer_->data() + mtu_ * history_ * i; (void) uxr_create_output_reliable_stream(&session_, buffer , mtu_ * history_, history_); } for(size_t i = 0; i < UXR_CONFIG_MAX_INPUT_RELIABLE_STREAMS; ++i) { - uint8_t* buffer = input_reliable_stream_buffer_.get() + mtu_ * history_ * i; + uint8_t* buffer = input_reliable_stream_buffer_->data() + mtu_ * history_ * i; (void) uxr_create_input_reliable_stream(&session_, buffer, mtu_ * history_, history_); } } @@ -377,12 +455,6 @@ class Client } static uint32_t next_client_key_; - static const char* participant_xml_; - static const char* topic_xml_; - static const char* publisher_xml_; - static const char* subscriber_xml_; - static const char* datawriter_xml_; - static const char* datareader_xml_; Gateway gateway_; @@ -390,16 +462,15 @@ class Client uint16_t history_; uxrUDPTransport udp_transport_; - uxrUDPPlatform udp_platform_; uxrTCPTransport tcp_transport_; - uxrTCPPlatform tcp_platform_; + uxrCustomTransport custom_transport_; size_t mtu_; uxrSession session_; - std::unique_ptr output_best_effort_stream_buffer_; - std::unique_ptr output_reliable_stream_buffer_; - std::unique_ptr input_reliable_stream_buffer_; + std::shared_ptr> output_best_effort_stream_buffer_; + std::shared_ptr> output_reliable_stream_buffer_; + std::shared_ptr> input_reliable_stream_buffer_; std::string expected_message_; diff --git a/test/test/publisher_subscriber/PublisherSubscriberInteraction.cpp b/test/test/publisher_subscriber/PublisherSubscriberInteraction.cpp index d17621d..67ee1dc 100644 --- a/test/test/publisher_subscriber/PublisherSubscriberInteraction.cpp +++ b/test/test/publisher_subscriber/PublisherSubscriberInteraction.cpp @@ -13,6 +13,9 @@ #include #endif +#include + +#include <../custom_transports/Custom_transports.hpp> #include @@ -52,11 +55,16 @@ class PublisherSubscriberNoLost : public ::testing::TestWithParam(GetParam())) { @@ -105,6 +113,36 @@ class PublisherSubscriberNoLost : public ::testing::TestWithParamset_verbose_level(6); ASSERT_TRUE(agent_tcp6_->start()); break; + case Transport::CUSTOM_WITHOUT_FRAMING: + agent_custom_endpoint_.add_member("index"); + + agent_custom_.reset(new eprosima::uxr::CustomAgent( + "custom_agent", + &agent_custom_endpoint_, + middleware_, + false, + agent_custom_transport_open, + agent_custom_transport_close, + agent_custom_transport_write_packet, + agent_custom_transport_read_packet)); + agent_custom_->set_verbose_level(6); + ASSERT_TRUE(agent_custom_->start()); + break; + case Transport::CUSTOM_WITH_FRAMING: + agent_custom_endpoint_.add_member("index"); + + agent_custom_.reset(new eprosima::uxr::CustomAgent( + "custom_agent", + &agent_custom_endpoint_, + middleware_, + true, + agent_custom_transport_open, + agent_custom_transport_close, + agent_custom_transport_write_stream, + agent_custom_transport_read_stream)); + agent_custom_->set_verbose_level(6); + ASSERT_TRUE(agent_custom_->start()); + break; } } @@ -123,6 +161,9 @@ class PublisherSubscriberNoLost : public ::testing::TestWithParam agent_udp6_; std::unique_ptr agent_tcp4_; std::unique_ptr agent_tcp6_; + std::unique_ptr agent_custom_; + eprosima::uxr::CustomEndPoint agent_custom_endpoint_; + eprosima::uxr::Middleware::Kind middleware_; Client publisher_; Client subscriber_; @@ -157,6 +198,12 @@ TEST_P(PublisherSubscriberNoLost, PubSub10TopicsReliable) check_messages(SMALL_MESSAGE, 10, 0x80); } +TEST_P(PublisherSubscriberNoLost, PubSub1ContinousFragmentedTopic) +{ + std::string message(size_t(publisher_.get_mtu() * 8), 'A'); + publisher_.publish(1, 0x80, 1, message); +} + // TODO (#4423) Fix the non-reliable behavior when messages is higher than the agent history to enable this /*TEST_P(PublisherSubscriberNoLost, PubSub30TopicsReliable) { @@ -172,6 +219,14 @@ INSTANTIATE_TEST_CASE_P( ::testing::Values(MiddlewareKind::FASTDDS, MiddlewareKind::FASTRTPS, MiddlewareKind::CED), ::testing::Values(0.0f))); +INSTANTIATE_TEST_CASE_P( + TransportAndLostCustomTransports, + PublisherSubscriberNoLost, + ::testing::Combine( + ::testing::Values(Transport::CUSTOM_WITH_FRAMING, Transport::CUSTOM_WITHOUT_FRAMING), + ::testing::Values(MiddlewareKind::FASTDDS), + ::testing::Values(0.0f))); + TEST_P(PublisherSubscriberLost, PubSub1FragmentedTopic2Parts) { std::string message(size_t(publisher_.get_mtu() * 1.5), 'A');