Skip to content

Commit

Permalink
Merge pull request #76 from eile/master
Browse files Browse the repository at this point in the history
Implement Unix side of #38: make connectionset round-robiny
  • Loading branch information
tribal-tec committed Jan 23, 2014
2 parents 10d51d3 + 0b3040e commit 8cd90e5
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 22 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ script:
- mkdir Debug
- cd Debug
- cmake .. -DCI_BUILD_COMMIT=$TRAVIS_COMMIT -DCMAKE_BUILD_TYPE=Debug -DTRAVIS=1
- env TRAVIS=1 make -j8 cis ARGS=-V
- env TRAVIS=1 make -j2 cis ARGS=-V
- mkdir ../Release
- git status
- git --no-pager diff
- cd ../Release
- cmake .. -DCI_BUILD_COMMIT=$TRAVIS_COMMIT -DCMAKE_BUILD_TYPE=Release -DTRAVIS=1
- env TRAVIS=1 make -j8 cis ARGS=-V
- env TRAVIS=1 make -j2 cis ARGS=-V
- git status
- git --no-pager diff
before_install:
Expand Down
4 changes: 2 additions & 2 deletions CMake/GitExternal.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ function(GIT_EXTERNAL DIR REPO TAG)
get_filename_component(GIT_EXTERNAL_DIR "${DIR}/.." ABSOLUTE)

if(NOT EXISTS "${DIR}")
message("git clone ${REPO} ${DIR}")
message(STATUS "git clone ${REPO} ${DIR}")
execute_process(
COMMAND "${GIT_EXECUTABLE}" clone "${REPO}" "${DIR}"
RESULT_VARIABLE nok ERROR_VARIABLE error
Expand All @@ -39,7 +39,7 @@ function(GIT_EXTERNAL DIR REPO TAG)
RESULT_VARIABLE nok ERROR_VARIABLE error
WORKING_DIRECTORY "${DIR}")
if(nok)
message(WARNING "Update of ${DIR} failed: ${error}\n")
message(STATUS "Update of ${DIR} failed:\n ${error}")
endif()

execute_process(
Expand Down
43 changes: 26 additions & 17 deletions co/connectionSet.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

/* Copyright (c) 2005-2013, Stefan Eilemann <[email protected]>
/* Copyright (c) 2005-2014, Stefan Eilemann <[email protected]>
* 2011-2012, Daniel Nachbaur <[email protected]>
*
* This file is part of Collage <https://github.com/Eyescale/Collage>
Expand Down Expand Up @@ -367,6 +367,7 @@ void ConnectionSet::_clear()
ConnectionSet::Event ConnectionSet::select( const uint32_t timeout )
{
LB_TS_SCOPED( _selectThread );

while( true )
{
_impl->connection = 0;
Expand All @@ -377,6 +378,13 @@ ConnectionSet::Event ConnectionSet::select( const uint32_t timeout )
_impl->thread->event = EVENT_NONE; // unblock previous thread
_impl->thread = 0;
}
#else
if( !_impl->dirty ) // #38: check results from previous poll()
{
const Event event = _getSelectResult( 0 );
if( event != EVENT_NONE )
return event;
}
#endif

if( !_setupFDSet( ))
Expand Down Expand Up @@ -421,22 +429,22 @@ ConnectionSet::Event ConnectionSet::select( const uint32_t timeout )
return EVENT_SELECT_ERROR;

default: // SUCCESS
{
const Event event = _getSelectResult( ret );

if( event == EVENT_NONE )
break;

if( _impl->connection == _impl->selfConnection.get( ))
{
Event event = _getSelectResult( ret );

if( event == EVENT_NONE )
break;

if( _impl->connection == _impl->selfConnection.get( ))
{
_impl->connection = 0;
_impl->selfConnection->reset();
return EVENT_INTERRUPT;
}
if( event == EVENT_DATA && _impl->connection->isListening())
event = EVENT_CONNECT;
return event;
_impl->connection = 0;
_impl->selfConnection->reset();
return EVENT_INTERRUPT;
}
if( event == EVENT_DATA && _impl->connection->isListening( ))
return EVENT_CONNECT;
return event;
}
}
}
}
Expand Down Expand Up @@ -476,15 +484,16 @@ ConnectionSet::Event ConnectionSet::_getSelectResult( const uint32_t )
{
for( size_t i = 0; i < _impl->fdSet.getSize(); ++i )
{
const pollfd& pollFD = _impl->fdSet[i];
pollfd& pollFD = _impl->fdSet[i];
if( pollFD.revents == 0 )
continue;

const int pollEvents = pollFD.revents;
LBASSERT( pollFD.fd > 0 );

_impl->connection = _impl->fdSetResult[i].connection;
LBASSERT( _impl->connection.isValid( ));
pollFD.revents = 0;
LBASSERT( _impl->connection );

LBVERB << "Got event on connection @" << (void*)_impl->connection.get()
<< std::endl;
Expand Down
2 changes: 1 addition & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright (c) 2010-2013, Stefan Eilemann <[email protected]>
#
# Change this number when adding tests to force a CMake run: 5
# Change this number when adding tests to force a CMake run: 6

include(InstallFiles)

Expand Down
131 changes: 131 additions & 0 deletions tests/issue38.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@

/* Copyright (c) 2010-2014, Stefan Eilemann <[email protected]>
*
* This library is free software; you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License version 2.1 as published
* by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
* details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this library; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/

// Tests basic connection functionality
#include <test.h>
#include <co/buffer.h>
#include <co/connection.h>
#include <co/connectionDescription.h>
#include <co/connectionSet.h>
#include <co/init.h>

#include <lunchbox/monitor.h>
#include <iostream>

#define PACKETSIZE (12345)
#define NPACKETS (23456)
#define NCONNECTIONS (10)

namespace
{
uint8_t out[ PACKETSIZE ];

class Writer : public lunchbox::Thread
{
public:
Writer()
: runtime( 0.f )
{}

void startSend( co::ConnectionPtr connection )
{
connection_ = connection;
TEST( start( ));
}

void run() override
{
lunchbox::Clock clock;
for( size_t j = 0; j < NPACKETS; ++j )
TEST( connection_->send( out, PACKETSIZE ));
runtime = clock.getTimef();
connection_ = 0;
}

float runtime;

private:
co::ConnectionPtr connection_;
};

}

int main( int argc, char **argv )
{
co::init( argc, argv );
co::ConnectionDescriptionPtr desc = new co::ConnectionDescription;
desc->setHostname( "127.0.0.1" );

co::ConnectionPtr listener = co::Connection::create( desc );
TEST( listener );
TEST( listener->listen( ));

co::ConnectionPtr writers[ NCONNECTIONS ];
Writer threads[ NCONNECTIONS ];
co::ConnectionSet set;

for( size_t i = 0; i < NCONNECTIONS; ++i )
{
listener->acceptNB();

writers[i] = co::Connection::create( desc );
TEST( writers[i]->connect( ));

co::ConnectionPtr reader = listener->acceptSync();
TEST( reader );

set.addConnection( reader );
threads[i].startSend( writers[i] );
}

co::Buffer buffer;
co::BufferPtr syncBuffer;

for( size_t i = 0; i < NPACKETS * NCONNECTIONS ; ++i )
{
const co::ConnectionSet::Event result = set.select();
TESTINFO( result == co::ConnectionSet::EVENT_DATA, result );

co::ConnectionPtr connection = set.getConnection();
connection->recvNB( &buffer, PACKETSIZE );
TEST( connection->recvSync( syncBuffer ));
TEST( syncBuffer == &buffer );
TEST( buffer.getSize() == PACKETSIZE );
buffer.setSize( 0 );
}

const float runtime = threads[0].runtime;
const float delta = runtime / 10.f;
for( size_t i = 0; i < NCONNECTIONS; ++i )
{
threads[i].join();
writers[i]->close();
TESTINFO( std::abs( threads[i].runtime - runtime ) < delta ,
threads[i].runtime << " != " << runtime );
}

const co::Connections& connections = set.getConnections();
while( !connections.empty( ))
{
co::ConnectionPtr connection = connections.back();
connection->close();
TEST( set.removeConnection( connection ));
}

co::exit();
return EXIT_SUCCESS;
}

0 comments on commit 8cd90e5

Please sign in to comment.