Skip to content

Commit

Permalink
Merge pull request #232 from openziti/close-ziti-conn-on-timeout
Browse files Browse the repository at this point in the history
send StateClosed for timedout connections
  • Loading branch information
ekoby authored Feb 5, 2021
2 parents adfe272 + 648aeb7 commit aff3dde
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 16 deletions.
19 changes: 10 additions & 9 deletions library/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ int ziti_channel_send(ziti_channel_t *ch, uint32_t content, const hdr_t *hdrs, i
header_t header;
header_init(&header, ch->msg_seq++);

CH_LOG(TRACE, "=> ct[%x] seq[%d] len[%d]", content, header.seq, body_len);
CH_LOG(TRACE, "=> ct[%04X] seq[%d] len[%d]", content, header.seq, body_len);
header.content = content;
header.body_len = body_len;

Expand Down Expand Up @@ -359,7 +359,7 @@ ziti_channel_send_for_reply(ziti_channel_t *ch, uint32_t content, const hdr_t *h
header_t header;
header_init(&header, ch->msg_seq++);

CH_LOG(TRACE, "=> ct[%x] seq[%d] len[%d]", ch->id, content, header.seq, body_len);
CH_LOG(TRACE, "=> ct[%04X] seq[%d] len[%d]", ch->id, content, header.seq, body_len);
header.content = content;
header.body_len = body_len;

Expand Down Expand Up @@ -464,26 +464,27 @@ static void dispatch_message(ziti_channel_t *ch, message *m) {
return;
}

CH_LOG(ERROR, "received unexpected message[ct=%X] in Connecting state", m->header.content);
CH_LOG(ERROR, "received unexpected message ct[%04X] in Connecting state", m->header.content);
}

if (is_edge(m->header.content)) {
int32_t conn_id;
bool has_conn_id = message_get_int32_header(m, ConnIdHeader, &conn_id);

if (!has_conn_id) {
CH_LOG(ERROR, "received message without conn_id ct[%d]", m->header.content);
CH_LOG(ERROR, "received message without conn_id ct[%04X]", m->header.content);
}
else {
struct msg_receiver *conn = find_receiver(ch, conn_id);
if (conn == NULL) {
CH_LOG(DEBUG, "received message for unknown connection conn_id[%d] ct[%d]", conn_id, m->header.content);
CH_LOG(DEBUG, "received message for unknown connection conn_id[%d] ct[%04X]", conn_id,
m->header.content);
} else {
conn->receive(conn->receiver, m, ZITI_OK);
}
}
} else {
CH_LOG(WARN, "unsupported content type [%d]", m->header.content);
CH_LOG(WARN, "unsupported content type [%04X]", m->header.content);
}
}

Expand Down Expand Up @@ -519,7 +520,7 @@ static void process_inbound(ziti_channel_t *ch) {

ch->in_body_offset = 0;

CH_LOG(TRACE, "<= ct[%x] seq[%d] len[%d] hdrs[%d]", ch->in_next->header.content,
CH_LOG(TRACE, "<= ct[%04X] seq[%d] len[%d] hdrs[%d]", ch->in_next->header.content,
ch->in_next->header.seq,
ch->in_next->header.body_len, ch->in_next->header.headers_len);
}
Expand Down Expand Up @@ -570,7 +571,7 @@ static void latency_reply_cb(void *ctx, message *reply, int err) {
CH_LOG(VERBOSE, "latency is now %ld", ch->latency);
}
else {
CH_LOG(WARN, "invalid latency probe result ct[%d]", reply->header.content);
CH_LOG(WARN, "invalid latency probe result ct[%04X]", reply->header.content);
}
uv_timer_start(&ch->timer, send_latency_probe, LATENCY_INTERVAL, 0);
}
Expand Down Expand Up @@ -611,7 +612,7 @@ static void hello_reply_cb(void *ctx, message *msg, int err) {
message_get_bool_header(msg, ResultSuccessHeader, &success);
}
else if (msg) {
CH_LOG(ERROR, "unexpected Hello response ct[%x]", msg->header.content);
CH_LOG(ERROR, "unexpected Hello response ct[%04X]", msg->header.content);
cb_code = ZITI_GATEWAY_UNAVAILABLE;
}
else {
Expand Down
23 changes: 16 additions & 7 deletions library/connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,13 @@ static void process_edge_message(struct ziti_conn *conn, message *msg, int code)

static int ziti_channel_start_connection(struct ziti_conn *conn);

static int ziti_disconnect(ziti_connection conn);

static void free_handle(uv_handle_t *h) {
free(h);
}

const char* ziti_conn_state(ziti_connection conn) {
const char *ziti_conn_state(ziti_connection conn) {
return conn ? conn_state_str[conn->state] : "<NULL>";
}

Expand Down Expand Up @@ -282,7 +284,7 @@ static void on_channel_connected(ziti_channel_t *ch, void *ctx, int status) {
static void complete_conn_req(struct ziti_conn *conn, int code) {
if (conn->conn_req && conn->conn_req->cb) {
if (code != ZITI_OK) {
conn_set_state(conn, Disconnected);
conn_set_state(conn, code == ZITI_TIMEOUT ? Timedout : Disconnected);
conn->conn_req->failed = true;
}
conn->conn_req->cb(conn, code);
Expand All @@ -299,9 +301,15 @@ static void connect_timeout(uv_timer_t *timer) {
struct ziti_conn *conn = timer->data;

if (conn->state == Connecting) {
CONN_LOG(WARN, "ziti connection timed out");
conn_set_state(conn, Timedout);
if (conn->channel == NULL) {
CONN_LOG(WARN, "connect timeout: no suitable edge router");
}
else {
CONN_LOG(WARN, "connect timeout: failed to establish connection in %d seconds", conn->timeout);
}
complete_conn_req(conn, ZITI_TIMEOUT);
ziti_disconnect(conn);

}
else {
CONN_LOG(ERROR, "timeout in unexpected state[%s]", ziti_conn_state(conn));
Expand Down Expand Up @@ -496,7 +504,7 @@ static int do_ziti_dial(ziti_connection conn, const char *service, ziti_dial_opt
NEWP(async_cr, uv_async_t);
uv_async_init(conn->ziti_ctx->loop, async_cr, ziti_connect_async);

conn->flusher = calloc(1, sizeof(uv_async_t));
conn->flusher = calloc(1, sizeof(uv_check_t));
uv_check_init(conn->ziti_ctx->loop, conn->flusher);
conn->flusher->data = conn;
uv_unref((uv_handle_t *) conn->flusher);
Expand Down Expand Up @@ -599,7 +607,8 @@ static void ziti_disconnect_async(uv_async_t *ar) {
case Accepting:
case Connecting:
case Connected:
case CloseWrite: {
case CloseWrite:
case Timedout: {
NEWP(wr, struct ziti_write_req_s);
wr->conn = conn;
wr->cb = ziti_disconnect_cb;
Expand All @@ -620,7 +629,7 @@ static int ziti_disconnect(struct ziti_conn *conn) {
return ZITI_OK;
}

if (conn->state < Timedout) {
if (conn->state <= Timedout) {
NEWP(ar, uv_async_t);
uv_async_init(conn->ziti_ctx->loop, ar, ziti_disconnect_async);
ar->data = conn;
Expand Down

0 comments on commit aff3dde

Please sign in to comment.