Skip to content

Commit

Permalink
redesign TcpTransport.
Browse files Browse the repository at this point in the history
  • Loading branch information
ifplusor committed Oct 25, 2019
1 parent b95fc17 commit 32c7921
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 94 deletions.
1 change: 1 addition & 0 deletions src/transport/EventLoop.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class BufferEvent : public noncopyable {
}

int enable(short event) { return bufferevent_enable(m_bufferEvent, event); }
int disable(short event) { return bufferevent_disable(m_bufferEvent, event); }

int connect(const struct sockaddr* addr, int socklen) {
return bufferevent_socket_connect(m_bufferEvent, (struct sockaddr*)addr, socklen);
Expand Down
57 changes: 29 additions & 28 deletions src/transport/TcpRemotingClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,7 @@ RemotingCommand* TcpRemotingClient::invokeSync(const std::string& addr,
doAfterRpcHooks(addr, request, response, false);
return response;
} catch (const RemotingSendRequestException& e) {
LOG_WARN_NEW("invokeSync: send request exception, so close the channel[{}]",
channel->getPeerAddrAndPort().c_str());
LOG_WARN_NEW("invokeSync: send request exception, so close the channel[{}]", channel->getPeerAddrAndPort());
CloseTransport(addr, channel);
throw e;
} catch (const RemotingTimeoutException& e) {
Expand Down Expand Up @@ -297,7 +296,7 @@ void TcpRemotingClient::invokeOneway(const std::string& addr, RemotingCommand& r
doBeforeRpcHooks(addr, request, true);
invokeOnewayImpl(channel, request);
} catch (const RemotingSendRequestException& e) {
LOG_WARN("invokeOneway: send request exception, so close the channel[%s]", channel->getPeerAddrAndPort().c_str());
LOG_WARN("invokeOneway: send request exception, so close the channel[{}]", channel->getPeerAddrAndPort());
CloseTransport(addr, channel);
throw e;
}
Expand Down Expand Up @@ -351,8 +350,8 @@ TcpTransportPtr TcpRemotingClient::CreateTransport(const std::string& addr, bool
TcpTransportPtr channel;

{
// try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking
// long time, if could not get m_tcpLock, return NULL
// try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking long time,
// if could not get m_transportTableMutex, return NULL
if (!UtilAll::try_lock_for(m_transportTableMutex, 1000 * m_tcpTransportTryLockTimeout)) {
LOG_ERROR("GetTransport of:%s get timed_mutex timeout", addr.c_str());
return TcpTransportPtr();
Expand All @@ -362,34 +361,36 @@ TcpTransportPtr TcpRemotingClient::CreateTransport(const std::string& addr, bool
// check for reuse
auto iter = m_transportTable.find(addr);
if (iter != m_transportTable.end()) {
TcpTransportPtr tcp = iter->second;
if (tcp != nullptr) {
TcpConnectStatus connectStatus = tcp->getTcpConnectStatus();
channel = iter->second;
if (channel != nullptr) {
TcpConnectStatus connectStatus = channel->getTcpConnectStatus();
switch (connectStatus) {
case TCP_CONNECT_STATUS_SUCCESS:
return tcp;
case TCP_CONNECT_STATUS_WAIT:
case TCP_CONNECT_STATUS_CONNECTED:
return channel;
case TCP_CONNECT_STATUS_CONNECTING:
// wait server answer, return dummy
return TcpTransportPtr();
case TCP_CONNECT_STATUS_FAILED:
LOG_ERROR("tcpTransport with server disconnected, erase server:%s", addr.c_str());
tcp->disconnect(addr); // avoid coredump when connection with broker was broken
LOG_ERROR_NEW("tcpTransport with server disconnected, erase server:{}", addr);
channel->disconnect(addr); // avoid coredump when connection with broker was broken
m_transportTable.erase(addr);
break;
default:
LOG_ERROR("go to fault state, erase:%s from tcpMap, and reconnect it", addr.c_str());
LOG_ERROR_NEW("go to CLOSED state, erase:{} from transportTable, and reconnect it", addr);
m_transportTable.erase(addr);
break;
}
}
}

// callback;
// choose callback
TcpTransportReadCallback callback = needResponse ? &TcpRemotingClient::MessageReceived : nullptr;

// create new transport, then connect server
channel = TcpTransport::CreateTransport(this, callback);
TcpConnectStatus connectStatus = channel->connect(addr, 0); // use non-block
if (connectStatus != TCP_CONNECT_STATUS_WAIT) {
LOG_WARN("can not connect to:%s", addr.c_str());
if (connectStatus != TCP_CONNECT_STATUS_CONNECTING) {
LOG_WARN("can not connect to:{}", addr);
channel->disconnect(addr);
return TcpTransportPtr();
} else {
Expand All @@ -398,8 +399,9 @@ TcpTransportPtr TcpRemotingClient::CreateTransport(const std::string& addr, bool
}
}

// waiting...
TcpConnectStatus connectStatus = channel->waitTcpConnectEvent(static_cast<int>(m_tcpConnectTimeout));
if (connectStatus != TCP_CONNECT_STATUS_SUCCESS) {
if (connectStatus != TCP_CONNECT_STATUS_CONNECTED) {
LOG_WARN("can not connect to server:%s", addr.c_str());
channel->disconnect(addr);
return TcpTransportPtr();
Expand Down Expand Up @@ -450,35 +452,34 @@ bool TcpRemotingClient::CloseTransport(const std::string& addr, TcpTransportPtr
}

if (!UtilAll::try_lock_for(m_transportTableMutex, 1000 * m_tcpTransportTryLockTimeout)) {
LOG_ERROR("CloseTransport of:%s get timed_mutex timeout", addr.c_str());
LOG_ERROR_NEW("CloseTransport of:{} get timed_mutex timeout", addr);
return false;
}
std::lock_guard<std::timed_mutex> lock(m_transportTableMutex, std::adopt_lock);

LOG_ERROR("CloseTransport of:%s", addr.c_str());
LOG_ERROR_NEW("CloseTransport of:{}", addr);

bool removeItemFromTable = true;
if (m_transportTable.find(addr) != m_transportTable.end()) {
if (m_transportTable[addr]->getStartTime() != channel->getStartTime()) {
LOG_INFO("tcpTransport with addr:%s has been closed before, and has been created again, nothing to do",
addr.c_str());
LOG_INFO_NEW("tcpTransport with addr:{} has been closed before, and has been created again, nothing to do", addr);
removeItemFromTable = false;
}
} else {
LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable before", addr.c_str());
LOG_INFO_NEW("tcpTransport with addr:{} had been removed from tcpTable before", addr);
removeItemFromTable = false;
}

if (removeItemFromTable) {
LOG_WARN("closeTransport: disconnect:%s with state:%d", addr.c_str(),
m_transportTable[addr]->getTcpConnectStatus());
if (m_transportTable[addr]->getTcpConnectStatus() == TCP_CONNECT_STATUS_SUCCESS)
LOG_WARN_NEW("closeTransport: disconnect:{} with state:{}", addr, m_transportTable[addr]->getTcpConnectStatus());
if (m_transportTable[addr]->getTcpConnectStatus() != TCP_CONNECT_STATUS_CLOSED) {
m_transportTable[addr]->disconnect(addr); // avoid coredump when connection with server was broken
LOG_WARN("closeTransport: erase broker: %s", addr.c_str());
}
LOG_WARN_NEW("closeTransport: erase broker: {}", addr);
m_transportTable.erase(addr);
}

LOG_ERROR("CloseTransport of:%s end", addr.c_str());
LOG_ERROR_NEW("CloseTransport of:{} end", addr);

return removeItemFromTable;
}
Expand Down
117 changes: 59 additions & 58 deletions src/transport/TcpTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace rocketmq {

TcpTransport::TcpTransport(TcpRemotingClient* client, TcpTransportReadCallback callback)
: m_event(nullptr),
m_tcpConnectStatus(TCP_CONNECT_STATUS_INIT),
m_tcpConnectStatus(TCP_CONNECT_STATUS_CREATED),
m_statusMutex(),
m_statusEvent(),
m_readCallback(callback),
Expand All @@ -42,48 +42,53 @@ TcpTransport::TcpTransport(TcpRemotingClient* client, TcpTransportReadCallback c
}

TcpTransport::~TcpTransport() {
freeBufferEvent();
closeBufferEvent();
m_readCallback = nullptr;
}

void TcpTransport::freeBufferEvent() {
// freeBufferEvent is idempotent.

// first, unlink BufferEvent
if (m_event != nullptr) {
m_event->setCallback(nullptr, nullptr, nullptr, nullptr);
TcpConnectStatus TcpTransport::closeBufferEvent() {
// closeBufferEvent is idempotent.
if (setTcpConnectEvent(TCP_CONNECT_STATUS_CLOSED) != TCP_CONNECT_STATUS_CLOSED) {
if (m_event != nullptr) {
m_event->disable(EV_READ | EV_WRITE);
// FIXME: not close the socket!!!
}
}

// then, release BufferEvent
m_event.reset();
}

void TcpTransport::setTcpConnectStatus(TcpConnectStatus connectStatus) {
m_tcpConnectStatus = connectStatus;
return TCP_CONNECT_STATUS_CLOSED;
}

TcpConnectStatus TcpTransport::getTcpConnectStatus() {
return m_tcpConnectStatus;
}

TcpConnectStatus TcpTransport::waitTcpConnectEvent(int timeoutMillis) {
if (m_tcpConnectStatus == TCP_CONNECT_STATUS_WAIT) {
if (m_tcpConnectStatus == TCP_CONNECT_STATUS_CONNECTING) {
std::unique_lock<std::mutex> lock(m_statusMutex);
if (!m_statusEvent.wait_for(lock, std::chrono::milliseconds(timeoutMillis),
[&] { return m_tcpConnectStatus != TCP_CONNECT_STATUS_WAIT; })) {
[&] { return m_tcpConnectStatus != TCP_CONNECT_STATUS_CONNECTING; })) {
LOG_INFO("connect timeout");
}
}
return m_tcpConnectStatus;
}

// internal method
void TcpTransport::setTcpConnectEvent(TcpConnectStatus connectStatus) {
TcpConnectStatus TcpTransport::setTcpConnectEvent(TcpConnectStatus connectStatus) {
TcpConnectStatus oldStatus = m_tcpConnectStatus.exchange(connectStatus, std::memory_order_relaxed);
if (oldStatus == TCP_CONNECT_STATUS_WAIT) {
if (oldStatus == TCP_CONNECT_STATUS_CONNECTING) {
// awake waiting thread
m_statusEvent.notify_all();
}
return oldStatus;
}

bool TcpTransport::setTcpConnectEventIf(TcpConnectStatus& expectStatus, TcpConnectStatus connectStatus) {
bool ret = m_tcpConnectStatus.compare_exchange_strong(expectStatus, connectStatus);
if (expectStatus == TCP_CONNECT_STATUS_CONNECTING) {
// awake waiting thread
m_statusEvent.notify_all();
}
return ret;
}

u_long TcpTransport::resolveInetAddr(std::string& hostname) {
Expand All @@ -98,28 +103,23 @@ u_long TcpTransport::resolveInetAddr(std::string& hostname) {

void TcpTransport::disconnect(const std::string& addr) {
// disconnect is idempotent.
std::lock_guard<std::mutex> lock(m_eventMutex);
if (getTcpConnectStatus() != TCP_CONNECT_STATUS_INIT) {
LOG_INFO("disconnect:%s start. event:%p", addr.c_str(), (void*)m_event.get());
freeBufferEvent();
setTcpConnectEvent(TCP_CONNECT_STATUS_INIT);
LOG_INFO("disconnect:%s completely", addr.c_str());
}
LOG_INFO_NEW("disconnect:{} start. event:{}", addr, (void*)m_event.get());
closeBufferEvent();
LOG_INFO_NEW("disconnect:{} completely", addr);
}

TcpConnectStatus TcpTransport::connect(const std::string& strServerURL, int timeoutMillis) {
std::string hostname;
short port;

LOG_DEBUG("connect to [%s].", strServerURL.c_str());
LOG_DEBUG("connect to [{}].", strServerURL);
if (!UtilAll::SplitURL(strServerURL, hostname, port)) {
LOG_INFO("connect to [%s] failed, Invalid url.", strServerURL.c_str());
return TCP_CONNECT_STATUS_FAILED;
LOG_INFO("connect to [{}] failed, Invalid url.", strServerURL);
return closeBufferEvent();
}

{
std::lock_guard<std::mutex> lock(m_eventMutex);

TcpConnectStatus curStatus = TCP_CONNECT_STATUS_CREATED;
if (setTcpConnectEventIf(curStatus, TCP_CONNECT_STATUS_CONNECTING)) {
// TODO: support ipv6
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
Expand All @@ -130,40 +130,35 @@ TcpConnectStatus TcpTransport::connect(const std::string& strServerURL, int time
// create BufferEvent
m_event.reset(EventLoop::GetDefaultEventLoop()->createBufferEvent(-1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE));
if (nullptr == m_event) {
setTcpConnectStatus(TCP_CONNECT_STATUS_FAILED);
return TCP_CONNECT_STATUS_FAILED;
LOG_ERROR_NEW("create BufferEvent failed");
return closeBufferEvent();
}

// then, configure BufferEvent
m_event->setCallback(ReadCallback, nullptr, EventCallback, shared_from_this());
m_event->setWatermark(EV_READ, 4, 0);
m_event->enable(EV_READ | EV_WRITE);

setTcpConnectStatus(TCP_CONNECT_STATUS_WAIT);
if (m_event->connect((struct sockaddr*)&sin, sizeof(sin)) < 0) {
LOG_INFO("connect to fd:%d failed", m_event->getfd());
freeBufferEvent();
setTcpConnectStatus(TCP_CONNECT_STATUS_FAILED);
return TCP_CONNECT_STATUS_FAILED;
LOG_WARN_NEW("connect to fd:{} failed", m_event->getfd());
return closeBufferEvent();
}
} else {
return curStatus;
}

if (timeoutMillis <= 0) {
LOG_INFO("try to connect to fd:%d, addr:%s", m_event->getfd(), hostname.c_str());
return TCP_CONNECT_STATUS_WAIT;
LOG_INFO_NEW("try to connect to fd:{}, addr:{}", m_event->getfd(), hostname);
return TCP_CONNECT_STATUS_CONNECTING;
}

TcpConnectStatus connectStatus = waitTcpConnectEvent(timeoutMillis);
if (connectStatus != TCP_CONNECT_STATUS_SUCCESS) {
LOG_WARN("can not connect to server:%s", strServerURL.c_str());

std::lock_guard<std::mutex> lock(m_eventMutex);
freeBufferEvent();
setTcpConnectStatus(TCP_CONNECT_STATUS_FAILED);
return TCP_CONNECT_STATUS_FAILED;
if (connectStatus != TCP_CONNECT_STATUS_CONNECTED) {
LOG_WARN_NEW("can not connect to server:{}", strServerURL);
return closeBufferEvent();
}

return TCP_CONNECT_STATUS_SUCCESS;
return TCP_CONNECT_STATUS_CONNECTED;
}

void TcpTransport::EventCallback(BufferEvent* event, short what, TcpTransport* transport) {
Expand All @@ -175,12 +170,20 @@ void TcpTransport::EventCallback(BufferEvent* event, short what, TcpTransport* t
// disable Nagle
int val = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (void*)&val, sizeof(val));
transport->setTcpConnectEvent(TCP_CONNECT_STATUS_SUCCESS);

TcpConnectStatus curStatus = TCP_CONNECT_STATUS_CONNECTING;
transport->setTcpConnectEventIf(curStatus, TCP_CONNECT_STATUS_CONNECTED);
} else if (what & (BEV_EVENT_ERROR | BEV_EVENT_EOF | BEV_EVENT_READING | BEV_EVENT_WRITING)) {
LOG_INFO("eventcb: received error event cb:%x on fd:%d", what, fd);
// if error, stop callback.
event->setCallback(nullptr, nullptr, nullptr, nullptr);
transport->setTcpConnectEvent(TCP_CONNECT_STATUS_FAILED);
TcpConnectStatus curStatus = transport->getTcpConnectStatus();
while (curStatus != TCP_CONNECT_STATUS_CLOSED && curStatus != TCP_CONNECT_STATUS_FAILED) {
if (transport->setTcpConnectEventIf(curStatus, TCP_CONNECT_STATUS_FAILED)) {
event->setCallback(nullptr, nullptr, nullptr, nullptr);
break;
}
curStatus = transport->getTcpConnectStatus();
}
} else {
LOG_ERROR("eventcb: received error event:%d on fd:%d", what, fd);
}
Expand Down Expand Up @@ -245,22 +248,20 @@ void TcpTransport::messageReceived(MemoryBlockPtr3& mem, const std::string& addr
}
}

bool TcpTransport::sendMessage(const char* pData, size_t len) {
std::lock_guard<std::mutex> lock(m_eventMutex);
if (getTcpConnectStatus() != TCP_CONNECT_STATUS_SUCCESS) {
bool TcpTransport::sendMessage(const char* data, size_t len) {
if (getTcpConnectStatus() != TCP_CONNECT_STATUS_CONNECTED) {
return false;
}

/* NOTE:
do not need to consider large data which could not send by once, as
bufferevent could handle this case;
*/
return m_event != nullptr && m_event->write(pData, len) == 0;
return m_event != nullptr && m_event->write(data, len) == 0;
}

const std::string TcpTransport::getPeerAddrAndPort() {
std::lock_guard<std::mutex> lock(m_eventMutex);
return m_event ? m_event->getPeerAddrPort() : "";
return m_event != nullptr ? m_event->getPeerAddrPort() : "";
}

const uint64_t TcpTransport::getStartTime() const {
Expand Down
Loading

0 comments on commit 32c7921

Please sign in to comment.