Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust transfer methods behaviour when interrupted #308

Merged
merged 4 commits into from
Sep 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ def jobMatrix(String prefix, List specs, Closure callback) {
def nodes = [:]
for (spec in specs) {
def label = specToLabel(spec)
def node_tag = label
if (spec.os =~ /macOS/) {
node_tag = spec.os
}
def fairsoft = spec.fairsoft
def os = spec.os
def compiler = spec.compiler
nodes["${prefix}/${label}"] = {
node(label) {
node(node_tag) {
githubNotify(context: "${prefix}/${label}", description: 'Building ...', status: 'PENDING')
try {
deleteDir()
Expand All @@ -29,7 +33,7 @@ def jobMatrix(String prefix, List specs, Closure callback) {
echo "module load compiler/gcc/9.1.0" >> Dart.cfg
'''
}
if (os =~ /MacOS/) {
if (os =~ /[Mm]acOS/) {
sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=clang++'\" >> Dart.cfg"
} else {
sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=g++'\" >> Dart.cfg"
Expand Down Expand Up @@ -71,8 +75,7 @@ pipeline{
script {
def build_jobs = jobMatrix('build', [
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc9.1.0', fairsoft: 'fairmq_dev'],
[os: 'MacOS10.13', arch: 'x86_64', compiler: 'AppleLLVM10.0.0', fairsoft: 'fairmq_dev'],
[os: 'MacOS10.14', arch: 'x86_64', compiler: 'AppleLLVM10.0.0', fairsoft: 'fairmq_dev'],
[os: 'macOS10.15', arch: 'x86_64', compiler: 'AppleLLVM11.0.3', fairsoft: 'fairmq_dev'],
]) { spec, label ->
sh './Dart.sh alfa_ci Dart.cfg'
}
Expand Down
12 changes: 6 additions & 6 deletions fairmq/FairMQChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class FairMQChannel
/// Sends a message to the socket queue.
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msg);
Expand All @@ -260,7 +260,7 @@ class FairMQChannel
/// Receives a message from the socket queue.
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msg);
Expand All @@ -270,7 +270,7 @@ class FairMQChannel
/// Send a vector of messages
/// @param msgVec message vector reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Send(std::vector<FairMQMessagePtr>& msgVec, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msgVec);
Expand All @@ -280,7 +280,7 @@ class FairMQChannel
/// Receive a vector of messages
/// @param msgVec message vector reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(std::vector<FairMQMessagePtr>& msgVec, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msgVec);
Expand All @@ -290,7 +290,7 @@ class FairMQChannel
/// Send FairMQParts
/// @param parts FairMQParts reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQParts& parts, int sndTimeoutInMs = -1)
{
return Send(parts.fParts, sndTimeoutInMs);
Expand All @@ -299,7 +299,7 @@ class FairMQChannel
/// Receive FairMQParts
/// @param parts FairMQParts reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQParts& parts, int rcvTimeoutInMs = -1)
{
return Receive(parts.fParts, rcvTimeoutInMs);
Expand Down
8 changes: 4 additions & 4 deletions fairmq/FairMQDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int Send(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
{
return GetChannel(channel, index).Send(msg, sndTimeoutInMs);
Expand All @@ -140,7 +140,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int Receive(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
{
return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs);
Expand All @@ -151,7 +151,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQParts& parts, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
{
return GetChannel(channel, index).Send(parts.fParts, sndTimeoutInMs);
Expand All @@ -162,7 +162,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQParts& parts, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
{
return GetChannel(channel, index).Receive(parts.fParts, rcvTimeoutInMs);
Expand Down
19 changes: 18 additions & 1 deletion fairmq/FairMQSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,31 @@
#ifndef FAIRMQSOCKET_H_
#define FAIRMQSOCKET_H_

#include "FairMQMessage.h"

#include <memory>
#include <ostream>
#include <stdexcept>
#include <string>
#include <vector>

#include "FairMQMessage.h"
class FairMQTransportFactory;

namespace fair
{
namespace mq
{

enum class TransferResult : int
{
error = -1,
timeout = -2,
interrupted = -3
};

} // namespace mq
} // namespace fair

class FairMQSocket
{
public:
Expand Down
12 changes: 6 additions & 6 deletions fairmq/ofi/Socket.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ try {
int size(0);
for (auto& msg : msgVec) {
size += msg->GetSize();
}
}

fSendPushSem.wait();
{
Expand All @@ -284,7 +284,7 @@ try {
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return -1;
return TransferResult::error;
}

auto Socket::SendQueueReader() -> void
Expand Down Expand Up @@ -431,7 +431,7 @@ try {
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return -1;
return TransferResult::error;
}

auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int /*timeout*/) -> int64_t
Expand All @@ -449,14 +449,14 @@ try {
int64_t size(0);
for (auto& msg : msgVec) {
size += msg->GetSize();
}
}
fBytesRx += size;
++fMessagesRx;

return size;
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return -1;
return TransferResult::error;
}

auto Socket::RecvControlQueueReader() -> void
Expand Down
2 changes: 1 addition & 1 deletion fairmq/sdk/Topology.h
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
}
}

using Duration = std::chrono::milliseconds;
using Duration = std::chrono::microseconds;
using ChangeStateCompletionSignature = void(std::error_code, TopologyState);

private:
Expand Down
44 changes: 26 additions & 18 deletions fairmq/shmem/Socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class Socket final : public fair::mq::Socket

bool ShouldRetry(int flags, int timeout, int& elapsed) const
{
if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
if ((flags & ZMQ_DONTWAIT) == 0) {
if (timeout > 0) {
elapsed += fTimeout;
if (elapsed >= timeout) {
Expand All @@ -147,10 +147,10 @@ class Socket final : public fair::mq::Socket
{
if (zmq_errno() == ETERM) {
LOG(debug) << "Terminating socket " << fId;
return -1;
return static_cast<int>(TransferResult::error);
} else {
LOG(error) << "Failed transfer on socket " << fId << ", reason: " << zmq_strerror(errno);
return -1;
return static_cast<int>(TransferResult::error);
}
}

Expand All @@ -166,7 +166,7 @@ class Socket final : public fair::mq::Socket
ZMsg zmqMsg(sizeof(MetaHeader));
std::memcpy(zmqMsg.Data(), &(shmMsg->fMeta), sizeof(MetaHeader));

while (true && !fManager.Interrupted()) {
while (true) {
int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) {
shmMsg->fQueued = true;
Expand All @@ -175,17 +175,19 @@ class Socket final : public fair::mq::Socket
fBytesTx += size;
return size;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) {
if (fManager.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue;
} else {
return -2;
return static_cast<int>(TransferResult::timeout);
}
} else {
return HandleErrors();
}
}

return -1;
return static_cast<int>(TransferResult::error);
}

int Receive(MessagePtr& msg, const int timeout = -1) override
Expand Down Expand Up @@ -218,10 +220,12 @@ class Socket final : public fair::mq::Socket
++fMessagesRx;
return size;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) {
if (fManager.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue;
} else {
return -2;
return static_cast<int>(TransferResult::timeout);
}
} else {
return HandleErrors();
Expand Down Expand Up @@ -249,7 +253,7 @@ class Socket final : public fair::mq::Socket
std::memcpy(metas++, &(shmMsg->fMeta), sizeof(MetaHeader));
}

while (!fManager.Interrupted()) {
while (true) {
int64_t totalSize = 0;
int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) {
Expand All @@ -267,17 +271,19 @@ class Socket final : public fair::mq::Socket

return totalSize;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) {
if (fManager.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue;
} else {
return -2;
return static_cast<int>(TransferResult::timeout);
}
} else {
return HandleErrors();
}
}

return -1;
return static_cast<int>(TransferResult::error);
}

int64_t Receive(std::vector<MessagePtr>& msgVec, const int timeout = -1) override
Expand All @@ -290,7 +296,7 @@ class Socket final : public fair::mq::Socket

ZMsg zmqMsg;

while (!fManager.Interrupted()) {
while (true) {
int64_t totalSize = 0;
int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) {
Expand Down Expand Up @@ -321,17 +327,19 @@ class Socket final : public fair::mq::Socket

return totalSize;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) {
if (fManager.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue;
} else {
return -2;
return static_cast<int>(TransferResult::timeout);
}
} else {
return HandleErrors();
}
}

return -1;
return static_cast<int>(TransferResult::error);
}

void* GetSocket() const { return fSocket; }
Expand Down Expand Up @@ -498,7 +506,7 @@ class Socket final : public fair::mq::Socket
if (constant == "pollout")
return ZMQ_POLLOUT;

return -1;
throw SocketError(tools::ToString("GetConstant called with an invalid argument: ", constant));
}

~Socket() override { Close(); }
Expand Down
Loading