Skip to content

Commit

Permalink
implement channel re-connect (#191)
Browse files Browse the repository at this point in the history
* implement channel re-connect

channels are persisted on disconnect, or failure to connect. each failed channel will attempt to re-connect with exponential backoff
  • Loading branch information
ekoby authored Nov 27, 2020
1 parent 1bbc808 commit a838142
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 32 deletions.
6 changes: 5 additions & 1 deletion inc_internal/zt_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,15 @@ enum conn_state {
Accepting,
Timedout,
CloseWrite,
Disconnected,
Closed
};

typedef struct ziti_channel {
struct ziti_ctx *ctx;
char *ingress;
char *name;
char *host;
int port;

uint32_t id;
char token[UUID_STR_LEN];
Expand All @@ -76,6 +79,7 @@ typedef struct ziti_channel {
uv_timer_t latency_timer;

enum conn_state state;
uint reconnect_count;

struct ch_conn_req **conn_reqs;
int conn_reqs_n;
Expand Down
109 changes: 88 additions & 21 deletions library/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,15 @@ limitations under the License.
#define MAXHOSTNAMELEN 255
#endif

#define BACKOFF_TIME 3000 /* 3 seconds */
#define MAX_BACKOFF 5 /* max reconnection timeout: (1 << 5) * BACKOFF_TIME = 96 seconds */

static void reconnect_channel(ziti_channel_t *ch);

static void on_channel_connect_internal(uv_connect_t *req, int status);

static void on_write(uv_write_t *req, int status);

static void async_write(uv_async_t *ar);

static struct msg_receiver *find_receiver(ziti_channel_t *ch, uint32_t conn_id);
Expand Down Expand Up @@ -76,7 +82,7 @@ int ziti_channel_init(struct ziti_ctx *ctx, ziti_channel_t *ch) {
ch->conn_reqs = calloc(32, sizeof(struct ch_conn_req *));
ch->conn_reqs_n = 0;

ch->ingress = NULL;
ch->name = NULL;
ch->in_next = NULL;
ch->in_body_offset = 0;
ch->incoming = new_buffer();
Expand All @@ -94,7 +100,8 @@ int ziti_channel_init(struct ziti_ctx *ctx, ziti_channel_t *ch) {
void ziti_channel_free(ziti_channel_t* ch) {
free(ch->conn_reqs);
free_buffer(ch->incoming);
free(ch->ingress);
free(ch->name);
free(ch->host);
}

int ziti_close_channels(struct ziti_ctx *ziti) {
Expand Down Expand Up @@ -162,6 +169,12 @@ int ziti_channel_connect(ziti_context ztx, const char *ch_name, const char *url,
ch->conn_reqs[ch->conn_reqs_n++] = r;
}
}
else if (ch->state == Disconnected) {
if (cb) {
cb(ch, cb_ctx, UV_ENOTCONN);
}
return ZITI_GATEWAY_UNAVAILABLE;
}
else {
ZITI_LOG(ERROR, "should not be here: %s", ziti_errorstr(ZITI_WTF));
return ZITI_WTF;
Expand All @@ -171,8 +184,8 @@ int ziti_channel_connect(ziti_context ztx, const char *ch_name, const char *url,

ch = calloc(1, sizeof(ziti_channel_t));
ziti_channel_init(ztx, ch);
ch->ingress = strdup(ch_name);
ZITI_LOG(INFO, "opening new channel for ingress[%s] ch[%d]", ch->ingress, ch->id);
ch->name = strdup(ch_name);
ZITI_LOG(INFO, "opening new channel for ingress[%s] ch[%d]", ch->name, ch->id);

struct http_parser_url ingress;
http_parser_url_init(&ingress);
Expand All @@ -183,6 +196,10 @@ int ziti_channel_connect(ziti_context ztx, const char *ch_name, const char *url,
int hostoffset = ingress.field_data[UF_HOST].off;
snprintf(host, sizeof(host), "%*.*s", hostlen, hostlen, url + hostoffset);

ch->host = strdup(host);
ch->port = ingress.port;
model_map_set(&ch->ctx->channels, ch->name, ch);

uv_connect_t *req = calloc(1, sizeof(uv_connect_t));
req->data = ch;

Expand All @@ -195,9 +212,7 @@ int ziti_channel_connect(ziti_context ztx, const char *ch_name, const char *url,

ch->state = Connecting;

model_map_set(&ch->ctx->channels, ch->ingress, ch);

uv_mbed_connect(req, &ch->connection, host, ingress.port, on_channel_connect_internal);
uv_mbed_connect(req, &ch->connection, ch->host, ch->port, on_channel_connect_internal);
return ZITI_OK;
}

Expand Down Expand Up @@ -450,7 +465,7 @@ static void latency_reply_cb(void *ctx, message *reply) {
if (reply->header.content == ContentTypeResultType &&
message_get_uint64_header(reply, LatencyProbeTime, &ts)) {
ch->latency = uv_now(ch->ctx->loop) - ts;
ZITI_LOG(VERBOSE, "channel[%s] latency is now %ld", ch->ingress, ch->latency);
ZITI_LOG(VERBOSE, "ch[%d](%s) latency is now %ld", ch->id, ch->name, ch->latency);
} else {
ZITI_LOG(WARN, "invalid latency probe result ct[%d]", reply->header.content);
}
Expand Down Expand Up @@ -486,9 +501,10 @@ static void hello_reply_cb(void *ctx, message *msg) {
size_t erVersionLen = strlen(erVersion);
message_get_bytes_header(msg, HelloVersionHeader, &erVersion, &erVersionLen);
ZITI_LOG(INFO, "ch[%d](%s) connected. EdgeRouter version: %.*s",
ch->id, ch->ingress, (int)erVersionLen, erVersion);
ch->id, ch->name, (int) erVersionLen, erVersion);
ch->state = Connected;
} else {
}
else {
ZITI_LOG(ERROR, "channel[%d] connect rejected: %d %*s", ch->id, success, msg->header.body_len, msg->body);
ch->state = Closed;
cb_code = ZITI_GATEWAY_UNAVAILABLE;
Expand All @@ -501,12 +517,17 @@ static void hello_reply_cb(void *ctx, message *msg) {
}
ch->conn_reqs_n = 0;

// initial latency
ch->latency = uv_now(ch->ctx->loop) - ch->latency;
uv_timer_init(ch->ctx->loop, &ch->latency_timer);
ch->latency_timer.data = ch;
uv_unref((uv_handle_t *) &ch->latency_timer);
uv_timer_start(&ch->latency_timer, send_latency_probe, 0, 60 * 1000);
if (success) {
// initial latency
ch->latency = uv_now(ch->ctx->loop) - ch->latency;
uv_timer_init(ch->ctx->loop, &ch->latency_timer);
ch->latency_timer.data = ch;
uv_unref((uv_handle_t *) &ch->latency_timer);
uv_timer_start(&ch->latency_timer, send_latency_probe, 0, 60 * 1000);
}
else {
reconnect_channel(ch);
}
}

static void send_hello(ziti_channel_t *ch) {
Expand All @@ -521,7 +542,7 @@ static void send_hello(ziti_channel_t *ch) {
ziti_channel_send_for_reply(ch, ContentTypeHelloType, headers, 1, ch->token, strlen(ch->token), hello_reply_cb, ch);
}

static void async_write(uv_async_t* ar) {
static void async_write(uv_async_t *ar) {

struct async_write_req *wr = ar->data;

Expand All @@ -532,16 +553,61 @@ static void async_write(uv_async_t* ar) {
uv_close((uv_handle_t *) ar, (uv_close_cb) free);
}

static void reconnect_cb(uv_timer_t *t) {
ziti_channel_t *ch = t->data;

ch->msg_seq = 0;

uv_connect_t *req = calloc(1, sizeof(uv_connect_t));
req->data = ch;

ch->state = Connecting;

uv_mbed_init(ch->ctx->loop, &ch->connection, ch->ctx->tlsCtx);
ch->connection._stream.data = ch;
uv_mbed_connect(req, &ch->connection, ch->host, ch->port, on_channel_connect_internal);
uv_close((uv_handle_t *) t, (uv_close_cb) free);
}

static void reconnect_channel(ziti_channel_t *ch) {
ch->reconnect_count++;
uv_timer_t *t = malloc(sizeof(uv_timer_t));
uv_timer_init(ch->ctx->loop, t);
t->data = ch;

int count = ch->reconnect_count;
if (count > MAX_BACKOFF) {
count = MAX_BACKOFF;
}
unsigned int backoff = rand() % count;

uint64_t timeout = (1U << backoff) * BACKOFF_TIME;
ZITI_LOG(INFO, "ch[%d] reconnecting in %ld ms (attempt = %d)", ch->id, timeout, ch->reconnect_count);
uv_timer_start(t, reconnect_cb, timeout, 0);
}

static void on_channel_close(ziti_channel_t *ch, ssize_t code) {
ziti_context ztx = ch->ctx;
model_map_remove(&ztx->channels, ch->ingress);

if (ch->state != Closed) {
ch->state = Disconnected;
}

ch->latency = UINT64_MAX;
if (uv_is_active((const uv_handle_t *) &ch->latency_timer)) {
uv_timer_stop(&ch->latency_timer);
}

// model_map_remove(&ztx->channels, ch->name);

while (!LIST_EMPTY(&ch->receivers)) {
struct msg_receiver *con = LIST_FIRST(&ch->receivers);
LIST_REMOVE(con, _next);
con->receive(con->receiver, NULL, (int) code);
free(con);
}

reconnect_channel(ch);
}

static void on_write(uv_write_t *req, int status) {
Expand All @@ -553,8 +619,6 @@ static void on_write(uv_write_t *req, int status) {
ziti_channel_t *ch = uv_handle_get_data((const uv_handle_t *) mbed);
on_channel_close(ch, status);

} else {
// ZITI_LOG(TRACE, "write completed [status=%d]", status);
}

if (wr != NULL) {
Expand Down Expand Up @@ -605,6 +669,7 @@ static void on_channel_connect_internal(uv_connect_t *req, int status) {

if (status == 0) {
ZITI_LOG(DEBUG, "ch[%d] connected", ch->id);
ch->reconnect_count = 0;
uv_mbed_t *mbed = (uv_mbed_t *) req->handle;
uv_mbed_read(mbed, ziti_alloc_cb, on_channel_data);
if (ch->ctx->opts->router_keepalive != 0) {
Expand All @@ -613,7 +678,7 @@ static void on_channel_connect_internal(uv_connect_t *req, int status) {
send_hello(ch);
}
else {
ZITI_LOG(ERROR, "ch[%d] failed to connect[%s] [status=%d]", ch->id, ch->ingress, status);
ZITI_LOG(ERROR, "ch[%d] failed to connect[%s] [status=%d]", ch->id, ch->name, status);

for (int i = 0; i < ch->conn_reqs_n; i++) {
struct ch_conn_req *r = ch->conn_reqs[i];
Expand All @@ -622,6 +687,8 @@ static void on_channel_connect_internal(uv_connect_t *req, int status) {
ch->conn_reqs[i] = NULL;
}
ch->conn_reqs_n = 0;
ch->state = Disconnected;
reconnect_channel(ch);
}
free(req);
}
22 changes: 14 additions & 8 deletions library/connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,12 @@ static void on_channel_connected(ziti_channel_t *ch, void *ctx, int status) {
else {
if (status < 0) {
ZITI_LOG(ERROR, "ch[%d] failed to connect status[%d](%s)", ch->id, status, uv_strerror(status));
model_map_remove(&ch->ctx->channels, ch->ingress);
}
else if (conn->conn_req && conn->conn_req->failed) {
ZITI_LOG(DEBUG, "request already timed out or closed");
}
else { // first channel to connect
ZITI_LOG(DEBUG, "selected ch[%s] status[%d] for conn_id[%d]", ch->ingress, status, conn->conn_id);
ZITI_LOG(DEBUG, "selected ch[%s] status[%d] for conn_id[%d]", ch->name, status, conn->conn_id);

conn->channel = ch;
ziti_channel_start_connection(conn);
Expand Down Expand Up @@ -262,7 +261,8 @@ static void connect_timeout(uv_timer_t *timer) {
else {
ZITI_LOG(ERROR, "timeout for connection[%d] in unexpected state[%d]", conn->conn_id, conn->state);
}
uv_close((uv_handle_t *) timer, NULL);
uv_close((uv_handle_t *) timer, free_handle);
conn->conn_req->conn_timeout = NULL;
}

static int ziti_connect(struct ziti_ctx *ctx, const ziti_net_session *session, struct ziti_conn *conn) {
Expand Down Expand Up @@ -299,7 +299,7 @@ static int ziti_connect(struct ziti_ctx *ctx, const ziti_net_session *session, s
}

if (best_ch) {
ZITI_LOG(DEBUG, "selected ch[%s] for best latency(%ldms)", best_ch->ingress, best_ch->latency);
ZITI_LOG(DEBUG, "selected ch[%s] for best latency(%ldms)", best_ch->name, best_ch->latency);
on_channel_connected(best_ch, conn, ZITI_OK);
}

Expand Down Expand Up @@ -538,10 +538,16 @@ static void ziti_disconnect_async(uv_async_t *ar) {

int ziti_disconnect(struct ziti_conn *conn) {
NEWP(ar, uv_async_t);
uv_async_init(conn->channel->ctx->loop, ar, ziti_disconnect_async);
ar->data = conn;
conn->disconnector = ar;
return uv_async_send(conn->disconnector);
if (conn->channel) {
uv_async_init(conn->channel->ctx->loop, ar, ziti_disconnect_async);
ar->data = conn;
conn->disconnector = ar;
return uv_async_send(conn->disconnector);
}
else {
conn->state = Closed;
}
return ZITI_OK;
}

static void crypto_wr_cb(ziti_connection conn, ssize_t status, void *ctx) {
Expand Down
4 changes: 2 additions & 2 deletions library/ziti.c
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ void ziti_dump(ziti_context ztx) {
ziti_channel_t *ch;
const char *url;
MODEL_MAP_FOREACH(url, ch, &ztx->channels) {
printf("ch[%d](%s)\n", ch->id, url);
printf("ch[%d](%s) %s\n", ch->id, url, ch->state == Disconnected ? "Disconnected" : "");
}

printf("\n==================\nConnections:\n");
Expand All @@ -360,7 +360,7 @@ void ziti_dump(ziti_context ztx) {
printf("conn[%d]: state[%d] service[%s] using ch[%d] %s\n",
conn->conn_id, conn->state, conn->service,
conn->channel ? conn->channel->id : -1,
conn->channel ? conn->channel->ingress : "(none)");
conn->channel ? conn->channel->name : "(none)");
}
}

Expand Down
1 change: 1 addition & 0 deletions programs/ziti-prox-c/proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ void on_ziti_connect(ziti_connection conn, int status) {
else {
ZITI_LOG(ERROR, "ziti connect failed: %s(%d)", ziti_errorstr(status), status);
uv_close((uv_handle_t *) clt, close_cb);
ziti_close(&conn);
}
}

Expand Down

0 comments on commit a838142

Please sign in to comment.