Skip to content

Commit

Permalink
Add C++ unittest to reproduce Morpheus issue #953 which is a variatio…
Browse files Browse the repository at this point in the history
…n on MRC issue nv-morpheus#360 [no ci]
  • Loading branch information
dagardner-nv committed Jan 22, 2024
1 parent 66d3b07 commit 63de5c0
Showing 1 changed file with 93 additions and 0 deletions.
93 changes: 93 additions & 0 deletions cpp/mrc/tests/modules/test_segment_modules.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,99 @@ TEST_F(TestSegmentModules, ModuleEndToEndTest)
EXPECT_EQ(packets_3, 4);
}

TEST_F(TestSegmentModules, ModuleInitError)
{
using namespace modules;
unsigned int packets_1{0};
unsigned int packets_2{0};
unsigned int packets_3{0};

auto init_wrapper = [&packets_1, &packets_2, &packets_3](segment::IBuilder& builder) {
auto simple_mod = builder.make_module<SimpleModule>("ModuleEndToEndTest_mod1");
auto configurable_mod = builder.make_module<ConfigurableModule>("ModuleEndToEndTest_mod2");

auto source1 = builder.make_source<bool>("src1", [](rxcpp::subscriber<bool>& sub) {
if (sub.is_subscribed())
{
sub.on_next(true);
sub.on_next(false);
sub.on_next(true);
sub.on_next(true);
}

sub.on_completed();
});

// Ex1. Partially dynamic edge construction
builder.make_edge(source1, simple_mod->input_port("input1"));

auto source2 = builder.make_source<bool>("src2", [](rxcpp::subscriber<bool>& sub) {
if (sub.is_subscribed())
{
sub.on_next(true);
sub.on_next(false);
sub.on_next(false);
sub.on_next(false);
sub.on_next(true);
sub.on_next(false);
}

sub.on_completed();
});

// Ex2. Dynamic edge construction -- requires type specification
builder.make_edge(source2, simple_mod->input_port("input2"));

auto sink1 = builder.make_sink<std::string>("sink1", [&packets_1](std::string input) {
packets_1++;
VLOG(10) << "Sinking " << input << std::endl;
});

builder.make_edge(simple_mod->output_port("output1"), sink1);

auto sink2 = builder.make_sink<std::string>("sink2", [&packets_2](std::string input) {
packets_2++;
VLOG(10) << "Sinking " << input << std::endl;
});

builder.make_edge(simple_mod->output_port("output2"), sink2);

auto source3 = builder.make_source<bool>("src3", [](rxcpp::subscriber<bool>& sub) {
if (sub.is_subscribed())
{
sub.on_next(true);
sub.on_next(false);
sub.on_next(true);
sub.on_next(true);
}

sub.on_completed();
});

builder.make_edge(source3, configurable_mod->input_port("configurable_input_a"));

auto sink3 = builder.make_sink<std::string>("sink3", [&packets_3](std::string input) {
packets_3++;
VLOG(10) << "Sinking " << input << std::endl;
});

builder.make_edge(configurable_mod->output_port("configurable_output_x"), sink3);

throw std::runtime_error("Test exception");
};

m_pipeline->make_segment("EndToEnd_Segment", init_wrapper);

auto options = std::make_shared<Options>();
options->topology().user_cpuset("0-1");
options->topology().restrict_gpus(true);

Executor executor(options);
executor.register_pipeline(std::move(m_pipeline));
executor.start();
EXPECT_THROW(executor.join(), std::runtime_error);
}

TEST_F(TestSegmentModules, ModuleAsSourceTest)
{
using namespace modules;
Expand Down

0 comments on commit 63de5c0

Please sign in to comment.