From a8381424277e302e99eaa1f8eeb3c1054cfdbb85 Mon Sep 17 00:00:00 2001 From: ekoby <7406535+ekoby@users.noreply.github.com> Date: Fri, 27 Nov 2020 14:29:13 -0500 Subject: [PATCH] implement channel re-connect (#191) * implement channel re-connect channels are persisted on disconnect, or failure to connect. each failed channel will attempt to re-connect with exponential backoff --- inc_internal/zt_internal.h | 6 +- library/channel.c | 109 ++++++++++++++++++++++++++++------- library/connect.c | 22 ++++--- library/ziti.c | 4 +- programs/ziti-prox-c/proxy.c | 1 + 5 files changed, 110 insertions(+), 32 deletions(-) diff --git a/inc_internal/zt_internal.h b/inc_internal/zt_internal.h index baf130f2..7aff5d75 100644 --- a/inc_internal/zt_internal.h +++ b/inc_internal/zt_internal.h @@ -61,12 +61,15 @@ enum conn_state { Accepting, Timedout, CloseWrite, + Disconnected, Closed }; typedef struct ziti_channel { struct ziti_ctx *ctx; - char *ingress; + char *name; + char *host; + int port; uint32_t id; char token[UUID_STR_LEN]; @@ -76,6 +79,7 @@ typedef struct ziti_channel { uv_timer_t latency_timer; enum conn_state state; + uint reconnect_count; struct ch_conn_req **conn_reqs; int conn_reqs_n; diff --git a/library/channel.c b/library/channel.c index 0e5f9d03..15f19b5f 100644 --- a/library/channel.c +++ b/library/channel.c @@ -26,9 +26,15 @@ limitations under the License. #define MAXHOSTNAMELEN 255 #endif +#define BACKOFF_TIME 3000 /* 3 seconds */ +#define MAX_BACKOFF 5 /* max reconnection timeout: (1 << 5) * BACKOFF_TIME = 96 seconds */ + +static void reconnect_channel(ziti_channel_t *ch); + static void on_channel_connect_internal(uv_connect_t *req, int status); static void on_write(uv_write_t *req, int status); + static void async_write(uv_async_t *ar); static struct msg_receiver *find_receiver(ziti_channel_t *ch, uint32_t conn_id); @@ -76,7 +82,7 @@ int ziti_channel_init(struct ziti_ctx *ctx, ziti_channel_t *ch) { ch->conn_reqs = calloc(32, sizeof(struct ch_conn_req *)); ch->conn_reqs_n = 0; - ch->ingress = NULL; + ch->name = NULL; ch->in_next = NULL; ch->in_body_offset = 0; ch->incoming = new_buffer(); @@ -94,7 +100,8 @@ int ziti_channel_init(struct ziti_ctx *ctx, ziti_channel_t *ch) { void ziti_channel_free(ziti_channel_t* ch) { free(ch->conn_reqs); free_buffer(ch->incoming); - free(ch->ingress); + free(ch->name); + free(ch->host); } int ziti_close_channels(struct ziti_ctx *ziti) { @@ -162,6 +169,12 @@ int ziti_channel_connect(ziti_context ztx, const char *ch_name, const char *url, ch->conn_reqs[ch->conn_reqs_n++] = r; } } + else if (ch->state == Disconnected) { + if (cb) { + cb(ch, cb_ctx, UV_ENOTCONN); + } + return ZITI_GATEWAY_UNAVAILABLE; + } else { ZITI_LOG(ERROR, "should not be here: %s", ziti_errorstr(ZITI_WTF)); return ZITI_WTF; @@ -171,8 +184,8 @@ int ziti_channel_connect(ziti_context ztx, const char *ch_name, const char *url, ch = calloc(1, sizeof(ziti_channel_t)); ziti_channel_init(ztx, ch); - ch->ingress = strdup(ch_name); - ZITI_LOG(INFO, "opening new channel for ingress[%s] ch[%d]", ch->ingress, ch->id); + ch->name = strdup(ch_name); + ZITI_LOG(INFO, "opening new channel for ingress[%s] ch[%d]", ch->name, ch->id); struct http_parser_url ingress; http_parser_url_init(&ingress); @@ -183,6 +196,10 @@ int ziti_channel_connect(ziti_context ztx, const char *ch_name, const char *url, int hostoffset = ingress.field_data[UF_HOST].off; snprintf(host, sizeof(host), "%*.*s", hostlen, hostlen, url + hostoffset); + ch->host = strdup(host); + ch->port = ingress.port; + model_map_set(&ch->ctx->channels, ch->name, ch); + uv_connect_t *req = calloc(1, sizeof(uv_connect_t)); req->data = ch; @@ -195,9 +212,7 @@ int ziti_channel_connect(ziti_context ztx, const char *ch_name, const char *url, ch->state = Connecting; - model_map_set(&ch->ctx->channels, ch->ingress, ch); - - uv_mbed_connect(req, &ch->connection, host, ingress.port, on_channel_connect_internal); + uv_mbed_connect(req, &ch->connection, ch->host, ch->port, on_channel_connect_internal); return ZITI_OK; } @@ -450,7 +465,7 @@ static void latency_reply_cb(void *ctx, message *reply) { if (reply->header.content == ContentTypeResultType && message_get_uint64_header(reply, LatencyProbeTime, &ts)) { ch->latency = uv_now(ch->ctx->loop) - ts; - ZITI_LOG(VERBOSE, "channel[%s] latency is now %ld", ch->ingress, ch->latency); + ZITI_LOG(VERBOSE, "ch[%d](%s) latency is now %ld", ch->id, ch->name, ch->latency); } else { ZITI_LOG(WARN, "invalid latency probe result ct[%d]", reply->header.content); } @@ -486,9 +501,10 @@ static void hello_reply_cb(void *ctx, message *msg) { size_t erVersionLen = strlen(erVersion); message_get_bytes_header(msg, HelloVersionHeader, &erVersion, &erVersionLen); ZITI_LOG(INFO, "ch[%d](%s) connected. EdgeRouter version: %.*s", - ch->id, ch->ingress, (int)erVersionLen, erVersion); + ch->id, ch->name, (int) erVersionLen, erVersion); ch->state = Connected; - } else { + } + else { ZITI_LOG(ERROR, "channel[%d] connect rejected: %d %*s", ch->id, success, msg->header.body_len, msg->body); ch->state = Closed; cb_code = ZITI_GATEWAY_UNAVAILABLE; @@ -501,12 +517,17 @@ static void hello_reply_cb(void *ctx, message *msg) { } ch->conn_reqs_n = 0; - // initial latency - ch->latency = uv_now(ch->ctx->loop) - ch->latency; - uv_timer_init(ch->ctx->loop, &ch->latency_timer); - ch->latency_timer.data = ch; - uv_unref((uv_handle_t *) &ch->latency_timer); - uv_timer_start(&ch->latency_timer, send_latency_probe, 0, 60 * 1000); + if (success) { + // initial latency + ch->latency = uv_now(ch->ctx->loop) - ch->latency; + uv_timer_init(ch->ctx->loop, &ch->latency_timer); + ch->latency_timer.data = ch; + uv_unref((uv_handle_t *) &ch->latency_timer); + uv_timer_start(&ch->latency_timer, send_latency_probe, 0, 60 * 1000); + } + else { + reconnect_channel(ch); + } } static void send_hello(ziti_channel_t *ch) { @@ -521,7 +542,7 @@ static void send_hello(ziti_channel_t *ch) { ziti_channel_send_for_reply(ch, ContentTypeHelloType, headers, 1, ch->token, strlen(ch->token), hello_reply_cb, ch); } -static void async_write(uv_async_t* ar) { +static void async_write(uv_async_t *ar) { struct async_write_req *wr = ar->data; @@ -532,9 +553,52 @@ static void async_write(uv_async_t* ar) { uv_close((uv_handle_t *) ar, (uv_close_cb) free); } +static void reconnect_cb(uv_timer_t *t) { + ziti_channel_t *ch = t->data; + + ch->msg_seq = 0; + + uv_connect_t *req = calloc(1, sizeof(uv_connect_t)); + req->data = ch; + + ch->state = Connecting; + + uv_mbed_init(ch->ctx->loop, &ch->connection, ch->ctx->tlsCtx); + ch->connection._stream.data = ch; + uv_mbed_connect(req, &ch->connection, ch->host, ch->port, on_channel_connect_internal); + uv_close((uv_handle_t *) t, (uv_close_cb) free); +} + +static void reconnect_channel(ziti_channel_t *ch) { + ch->reconnect_count++; + uv_timer_t *t = malloc(sizeof(uv_timer_t)); + uv_timer_init(ch->ctx->loop, t); + t->data = ch; + + int count = ch->reconnect_count; + if (count > MAX_BACKOFF) { + count = MAX_BACKOFF; + } + unsigned int backoff = rand() % count; + + uint64_t timeout = (1U << backoff) * BACKOFF_TIME; + ZITI_LOG(INFO, "ch[%d] reconnecting in %ld ms (attempt = %d)", ch->id, timeout, ch->reconnect_count); + uv_timer_start(t, reconnect_cb, timeout, 0); +} + static void on_channel_close(ziti_channel_t *ch, ssize_t code) { ziti_context ztx = ch->ctx; - model_map_remove(&ztx->channels, ch->ingress); + + if (ch->state != Closed) { + ch->state = Disconnected; + } + + ch->latency = UINT64_MAX; + if (uv_is_active((const uv_handle_t *) &ch->latency_timer)) { + uv_timer_stop(&ch->latency_timer); + } + + // model_map_remove(&ztx->channels, ch->name); while (!LIST_EMPTY(&ch->receivers)) { struct msg_receiver *con = LIST_FIRST(&ch->receivers); @@ -542,6 +606,8 @@ static void on_channel_close(ziti_channel_t *ch, ssize_t code) { con->receive(con->receiver, NULL, (int) code); free(con); } + + reconnect_channel(ch); } static void on_write(uv_write_t *req, int status) { @@ -553,8 +619,6 @@ static void on_write(uv_write_t *req, int status) { ziti_channel_t *ch = uv_handle_get_data((const uv_handle_t *) mbed); on_channel_close(ch, status); - } else { -// ZITI_LOG(TRACE, "write completed [status=%d]", status); } if (wr != NULL) { @@ -605,6 +669,7 @@ static void on_channel_connect_internal(uv_connect_t *req, int status) { if (status == 0) { ZITI_LOG(DEBUG, "ch[%d] connected", ch->id); + ch->reconnect_count = 0; uv_mbed_t *mbed = (uv_mbed_t *) req->handle; uv_mbed_read(mbed, ziti_alloc_cb, on_channel_data); if (ch->ctx->opts->router_keepalive != 0) { @@ -613,7 +678,7 @@ static void on_channel_connect_internal(uv_connect_t *req, int status) { send_hello(ch); } else { - ZITI_LOG(ERROR, "ch[%d] failed to connect[%s] [status=%d]", ch->id, ch->ingress, status); + ZITI_LOG(ERROR, "ch[%d] failed to connect[%s] [status=%d]", ch->id, ch->name, status); for (int i = 0; i < ch->conn_reqs_n; i++) { struct ch_conn_req *r = ch->conn_reqs[i]; @@ -622,6 +687,8 @@ static void on_channel_connect_internal(uv_connect_t *req, int status) { ch->conn_reqs[i] = NULL; } ch->conn_reqs_n = 0; + ch->state = Disconnected; + reconnect_channel(ch); } free(req); } \ No newline at end of file diff --git a/library/connect.c b/library/connect.c index 852172c0..f1f8a24d 100644 --- a/library/connect.c +++ b/library/connect.c @@ -224,13 +224,12 @@ static void on_channel_connected(ziti_channel_t *ch, void *ctx, int status) { else { if (status < 0) { ZITI_LOG(ERROR, "ch[%d] failed to connect status[%d](%s)", ch->id, status, uv_strerror(status)); - model_map_remove(&ch->ctx->channels, ch->ingress); } else if (conn->conn_req && conn->conn_req->failed) { ZITI_LOG(DEBUG, "request already timed out or closed"); } else { // first channel to connect - ZITI_LOG(DEBUG, "selected ch[%s] status[%d] for conn_id[%d]", ch->ingress, status, conn->conn_id); + ZITI_LOG(DEBUG, "selected ch[%s] status[%d] for conn_id[%d]", ch->name, status, conn->conn_id); conn->channel = ch; ziti_channel_start_connection(conn); @@ -262,7 +261,8 @@ static void connect_timeout(uv_timer_t *timer) { else { ZITI_LOG(ERROR, "timeout for connection[%d] in unexpected state[%d]", conn->conn_id, conn->state); } - uv_close((uv_handle_t *) timer, NULL); + uv_close((uv_handle_t *) timer, free_handle); + conn->conn_req->conn_timeout = NULL; } static int ziti_connect(struct ziti_ctx *ctx, const ziti_net_session *session, struct ziti_conn *conn) { @@ -299,7 +299,7 @@ static int ziti_connect(struct ziti_ctx *ctx, const ziti_net_session *session, s } if (best_ch) { - ZITI_LOG(DEBUG, "selected ch[%s] for best latency(%ldms)", best_ch->ingress, best_ch->latency); + ZITI_LOG(DEBUG, "selected ch[%s] for best latency(%ldms)", best_ch->name, best_ch->latency); on_channel_connected(best_ch, conn, ZITI_OK); } @@ -538,10 +538,16 @@ static void ziti_disconnect_async(uv_async_t *ar) { int ziti_disconnect(struct ziti_conn *conn) { NEWP(ar, uv_async_t); - uv_async_init(conn->channel->ctx->loop, ar, ziti_disconnect_async); - ar->data = conn; - conn->disconnector = ar; - return uv_async_send(conn->disconnector); + if (conn->channel) { + uv_async_init(conn->channel->ctx->loop, ar, ziti_disconnect_async); + ar->data = conn; + conn->disconnector = ar; + return uv_async_send(conn->disconnector); + } + else { + conn->state = Closed; + } + return ZITI_OK; } static void crypto_wr_cb(ziti_connection conn, ssize_t status, void *ctx) { diff --git a/library/ziti.c b/library/ziti.c index 32e5dc4a..ca013ccb 100644 --- a/library/ziti.c +++ b/library/ziti.c @@ -351,7 +351,7 @@ void ziti_dump(ziti_context ztx) { ziti_channel_t *ch; const char *url; MODEL_MAP_FOREACH(url, ch, &ztx->channels) { - printf("ch[%d](%s)\n", ch->id, url); + printf("ch[%d](%s) %s\n", ch->id, url, ch->state == Disconnected ? "Disconnected" : ""); } printf("\n==================\nConnections:\n"); @@ -360,7 +360,7 @@ void ziti_dump(ziti_context ztx) { printf("conn[%d]: state[%d] service[%s] using ch[%d] %s\n", conn->conn_id, conn->state, conn->service, conn->channel ? conn->channel->id : -1, - conn->channel ? conn->channel->ingress : "(none)"); + conn->channel ? conn->channel->name : "(none)"); } } diff --git a/programs/ziti-prox-c/proxy.c b/programs/ziti-prox-c/proxy.c index 71ddaa0f..86ae9fe6 100644 --- a/programs/ziti-prox-c/proxy.c +++ b/programs/ziti-prox-c/proxy.c @@ -242,6 +242,7 @@ void on_ziti_connect(ziti_connection conn, int status) { else { ZITI_LOG(ERROR, "ziti connect failed: %s(%d)", ziti_errorstr(status), status); uv_close((uv_handle_t *) clt, close_cb); + ziti_close(&conn); } }