From e4c8d2b0c367713e2eabde8e3853dccba03ff8c7 Mon Sep 17 00:00:00 2001 From: Stefan Eilemann Date: Wed, 22 Jan 2014 15:01:29 +0100 Subject: [PATCH 1/3] Implement Unix side of #38: make connectionset round-robin --- co/connectionSet.cpp | 43 ++++++++------ tests/CMakeLists.txt | 2 +- tests/issue38.cpp | 130 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 157 insertions(+), 18 deletions(-) create mode 100644 tests/issue38.cpp diff --git a/co/connectionSet.cpp b/co/connectionSet.cpp index 12dbe8c07..5b0d2b891 100644 --- a/co/connectionSet.cpp +++ b/co/connectionSet.cpp @@ -1,5 +1,5 @@ -/* Copyright (c) 2005-2013, Stefan Eilemann +/* Copyright (c) 2005-2014, Stefan Eilemann * 2011-2012, Daniel Nachbaur * * This file is part of Collage @@ -367,6 +367,7 @@ void ConnectionSet::_clear() ConnectionSet::Event ConnectionSet::select( const uint32_t timeout ) { LB_TS_SCOPED( _selectThread ); + while( true ) { _impl->connection = 0; @@ -377,6 +378,13 @@ ConnectionSet::Event ConnectionSet::select( const uint32_t timeout ) _impl->thread->event = EVENT_NONE; // unblock previous thread _impl->thread = 0; } +#else + if( !_impl->dirty ) // #38: check results from previous poll() + { + const Event event = _getSelectResult( 0 ); + if( event != EVENT_NONE ) + return event; + } #endif if( !_setupFDSet( )) @@ -421,22 +429,22 @@ ConnectionSet::Event ConnectionSet::select( const uint32_t timeout ) return EVENT_SELECT_ERROR; default: // SUCCESS + { + const Event event = _getSelectResult( ret ); + + if( event == EVENT_NONE ) + break; + + if( _impl->connection == _impl->selfConnection.get( )) { - Event event = _getSelectResult( ret ); - - if( event == EVENT_NONE ) - break; - - if( _impl->connection == _impl->selfConnection.get( )) - { - _impl->connection = 0; - _impl->selfConnection->reset(); - return EVENT_INTERRUPT; - } - if( event == EVENT_DATA && _impl->connection->isListening()) - event = EVENT_CONNECT; - return event; + _impl->connection = 0; + _impl->selfConnection->reset(); + return EVENT_INTERRUPT; } + if( event == EVENT_DATA && _impl->connection->isListening( )) + return EVENT_CONNECT; + return event; + } } } } @@ -476,7 +484,7 @@ ConnectionSet::Event ConnectionSet::_getSelectResult( const uint32_t ) { for( size_t i = 0; i < _impl->fdSet.getSize(); ++i ) { - const pollfd& pollFD = _impl->fdSet[i]; + pollfd& pollFD = _impl->fdSet[i]; if( pollFD.revents == 0 ) continue; @@ -484,7 +492,8 @@ ConnectionSet::Event ConnectionSet::_getSelectResult( const uint32_t ) LBASSERT( pollFD.fd > 0 ); _impl->connection = _impl->fdSetResult[i].connection; - LBASSERT( _impl->connection.isValid( )); + pollFD.revents = 0; + LBASSERT( _impl->connection ); LBVERB << "Got event on connection @" << (void*)_impl->connection.get() << std::endl; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 5327b1fa3..22ace2ea3 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -1,6 +1,6 @@ # Copyright (c) 2010-2013, Stefan Eilemann # -# Change this number when adding tests to force a CMake run: 5 +# Change this number when adding tests to force a CMake run: 6 include(InstallFiles) diff --git a/tests/issue38.cpp b/tests/issue38.cpp new file mode 100644 index 000000000..a05a8d873 --- /dev/null +++ b/tests/issue38.cpp @@ -0,0 +1,130 @@ + +/* Copyright (c) 2010-2014, Stefan Eilemann + * + * This library is free software; you can redistribute it and/or modify it under + * the terms of the GNU Lesser General Public License version 2.1 as published + * by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more + * details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +// Tests basic connection functionality +#include +#include +#include +#include +#include +#include + +#include +#include + +#define PACKETSIZE (12345) +#define NPACKETS (23456) +#define NCONNECTIONS (10) + +namespace +{ +uint8_t out[ PACKETSIZE ]; + +class Writer : public lunchbox::Thread +{ +public: + Writer() + : runtime( 0.f ) + {} + + void start( co::ConnectionPtr connection ) + { + connection_ = connection; + TEST( lunchbox::Thread::start( )); + } + + void run() override + { + lunchbox::Clock clock; + for( size_t j = 0; j < NPACKETS; ++j ) + TEST( connection_->send( out, PACKETSIZE )); + runtime = clock.getTimef(); + connection_ = 0; + } + + float runtime; + +private: + co::ConnectionPtr connection_; +}; + +} + +int main( int argc, char **argv ) +{ + co::init( argc, argv ); + co::ConnectionDescriptionPtr desc = new co::ConnectionDescription; + desc->setHostname( "127.0.0.1" ); + + co::ConnectionPtr listener = co::Connection::create( desc ); + TEST( listener ); + TEST( listener->listen( )); + + co::ConnectionPtr writers[ NCONNECTIONS ]; + Writer threads[ NCONNECTIONS ]; + co::ConnectionSet set; + + for( size_t i = 0; i < NCONNECTIONS; ++i ) + { + listener->acceptNB(); + + writers[i] = co::Connection::create( desc ); + TEST( writers[i]->connect( )); + + co::ConnectionPtr reader = listener->acceptSync(); + TEST( reader ); + + set.addConnection( reader ); + threads[i].start( writers[i] ); + } + + co::Buffer buffer; + co::BufferPtr syncBuffer; + + for( size_t i = 0; i < NPACKETS * NCONNECTIONS ; ++i ) + { + const co::ConnectionSet::Event result = set.select(); + TESTINFO( result == co::ConnectionSet::EVENT_DATA, result ); + + co::ConnectionPtr connection = set.getConnection(); + connection->recvNB( &buffer, PACKETSIZE ); + TEST( connection->recvSync( syncBuffer )); + TEST( syncBuffer == &buffer ); + TEST( buffer.getSize() == PACKETSIZE ); + buffer.setSize( 0 ); + } + + const float runtime = threads[0].runtime; + for( size_t i = 0; i < NCONNECTIONS; ++i ) + { + threads[i].join(); + writers[i]->close(); + TESTINFO( std::abs( threads[i].runtime - runtime ) < 10.f , + threads[i].runtime << " != " << runtime ); + } + + const co::Connections& connections = set.getConnections(); + while( !connections.empty( )) + { + co::ConnectionPtr connection = connections.back(); + connection->close(); + TEST( set.removeConnection( connection )); + } + + co::exit(); + return EXIT_SUCCESS; +} From f347b3f576ed2b7f493473537f9535d110393b66 Mon Sep 17 00:00:00 2001 From: Stefan Eilemann Date: Wed, 22 Jan 2014 15:24:46 +0100 Subject: [PATCH 2/3] Fix OS X build, make #38 test more robust --- CMake/GitExternal.cmake | 4 ++-- tests/issue38.cpp | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/CMake/GitExternal.cmake b/CMake/GitExternal.cmake index 90e988462..ebf107594 100644 --- a/CMake/GitExternal.cmake +++ b/CMake/GitExternal.cmake @@ -13,7 +13,7 @@ function(GIT_EXTERNAL DIR REPO TAG) get_filename_component(GIT_EXTERNAL_DIR "${DIR}/.." ABSOLUTE) if(NOT EXISTS "${DIR}") - message("git clone ${REPO} ${DIR}") + message(STATUS "git clone ${REPO} ${DIR}") execute_process( COMMAND "${GIT_EXECUTABLE}" clone "${REPO}" "${DIR}" RESULT_VARIABLE nok ERROR_VARIABLE error @@ -39,7 +39,7 @@ function(GIT_EXTERNAL DIR REPO TAG) RESULT_VARIABLE nok ERROR_VARIABLE error WORKING_DIRECTORY "${DIR}") if(nok) - message(WARNING "Update of ${DIR} failed: ${error}\n") + message(STATUS "Update of ${DIR} failed:\n ${error}") endif() execute_process( diff --git a/tests/issue38.cpp b/tests/issue38.cpp index a05a8d873..723f6b3b6 100644 --- a/tests/issue38.cpp +++ b/tests/issue38.cpp @@ -41,10 +41,10 @@ class Writer : public lunchbox::Thread : runtime( 0.f ) {} - void start( co::ConnectionPtr connection ) + void startSend( co::ConnectionPtr connection ) { connection_ = connection; - TEST( lunchbox::Thread::start( )); + TEST( start( )); } void run() override @@ -89,7 +89,7 @@ int main( int argc, char **argv ) TEST( reader ); set.addConnection( reader ); - threads[i].start( writers[i] ); + threads[i].startSend( writers[i] ); } co::Buffer buffer; @@ -109,11 +109,12 @@ int main( int argc, char **argv ) } const float runtime = threads[0].runtime; + const float delta = runtime / 10.f; for( size_t i = 0; i < NCONNECTIONS; ++i ) { threads[i].join(); writers[i]->close(); - TESTINFO( std::abs( threads[i].runtime - runtime ) < 10.f , + TESTINFO( std::abs( threads[i].runtime - runtime ) < delta , threads[i].runtime << " != " << runtime ); } From 0b3040e509e0fae8fee555258df8978dc6947471 Mon Sep 17 00:00:00 2001 From: Stefan Eilemann Date: Thu, 23 Jan 2014 08:52:41 +0100 Subject: [PATCH 3/3] Update Travis config --- .travis.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index dffe6c67e..2e5775705 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,13 +7,13 @@ script: - mkdir Debug - cd Debug - cmake .. -DCI_BUILD_COMMIT=$TRAVIS_COMMIT -DCMAKE_BUILD_TYPE=Debug -DTRAVIS=1 - - env TRAVIS=1 make -j8 cis ARGS=-V + - env TRAVIS=1 make -j2 cis ARGS=-V - mkdir ../Release - git status - git --no-pager diff - cd ../Release - cmake .. -DCI_BUILD_COMMIT=$TRAVIS_COMMIT -DCMAKE_BUILD_TYPE=Release -DTRAVIS=1 - - env TRAVIS=1 make -j8 cis ARGS=-V + - env TRAVIS=1 make -j2 cis ARGS=-V - git status - git --no-pager diff before_install: