From 68871fcc7f61163244bb7c5e10d64b7006f0f6e8 Mon Sep 17 00:00:00 2001 From: ekoby <7406535+ekoby@users.noreply.github.com> Date: Mon, 21 Dec 2020 08:16:36 -0500 Subject: [PATCH] Merge 0.18.6 (#207) * fix crash if conn is closed before connect reply * remove controller keep-alive and set timeout to 15s * fix handling of controller errors - AUTH error -- retry, and if fails again stop context, notify services unavailable - NOT AVAILABLE -- try refresh, keep services * ziti_log_level() * force session refresh if router closes edge connection * log changing log level --- inc_internal/zt_internal.h | 6 ++- includes/ziti/ziti_log.h | 4 +- library/channel.c | 82 ++++++++++++++++++++------------------ library/connect.c | 6 +++ library/utils.c | 25 ++++++++---- library/ziti.c | 35 ++++++++-------- library/ziti_ctrl.c | 2 +- library/ziti_enroll.c | 2 +- 8 files changed, 92 insertions(+), 70 deletions(-) diff --git a/inc_internal/zt_internal.h b/inc_internal/zt_internal.h index 41d17ce5..253c4654 100644 --- a/inc_internal/zt_internal.h +++ b/inc_internal/zt_internal.h @@ -207,6 +207,8 @@ extern "C" { void ziti_on_channel_event(ziti_channel_t *ch, ziti_router_status status, ziti_context ztx); +void ziti_force_session_refresh(ziti_context ztx); + int ziti_close_channels(ziti_context ztx); int ziti_channel_connect(ziti_context ztx, const char *name, const char *url, ch_connect_cb, void *ctx); @@ -221,11 +223,13 @@ int ziti_channel_send(ziti_channel_t *ch, uint32_t content, const hdr_t *hdrs, i uint32_t body_len, struct ziti_write_req_s *ziti_write); -int +struct waiter_s* ziti_channel_send_for_reply(ziti_channel_t *ch, uint32_t content, const hdr_t *headers, int nhdrs, const uint8_t *body, uint32_t body_len, reply_cb, void *reply_ctx); +void ziti_channel_remove_waiter(ziti_channel_t *ch, struct waiter_s *waiter); + int load_config(const char *filename, ziti_config **); int load_jwt(const char *filename, struct enroll_cfg_s *ecfg, ziti_enrollment_jwt_header **, ziti_enrollment_jwt **); diff --git a/includes/ziti/ziti_log.h b/includes/ziti/ziti_log.h index 8c06203c..41b67eff 100644 --- a/includes/ziti/ziti_log.h +++ b/includes/ziti/ziti_log.h @@ -40,7 +40,7 @@ enum DebugLevel { }; #define ZITI_LOG(level, fmt, ...) do { \ -if (level <= ziti_log_level) { ziti_logger(level, __FILE__, __LINE__, __func__, fmt, ##__VA_ARGS__); }\ +if (level <= ziti_log_level()) { ziti_logger(level, __FILE__, __LINE__, __func__, fmt, ##__VA_ARGS__); }\ } while(0) #ifdef __cplusplus @@ -65,7 +65,7 @@ ZITI_FUNC extern void ziti_log_set_logger(log_writer logger); ZITI_FUNC extern void ziti_log_set_level(int level); // don't use directly -ZITI_FUNC extern int ziti_log_level; +ZITI_FUNC extern int ziti_log_level(); #ifdef __cplusplus } diff --git a/library/channel.c b/library/channel.c index c4f4c726..ca09d94b 100644 --- a/library/channel.c +++ b/library/channel.c @@ -102,7 +102,7 @@ static int ziti_channel_init(struct ziti_ctx *ctx, ziti_channel_t *ch, uint32_t return 0; } -void ziti_channel_free(ziti_channel_t* ch) { +void ziti_channel_free(ziti_channel_t *ch) { free(ch->conn_reqs); free_buffer(ch->incoming); FREE(ch->name); @@ -168,8 +168,7 @@ int ziti_channel_connect(ziti_context ztx, const char *ch_name, const char *url, if (ch->state == Connected) { cb(ch, cb_ctx, ZITI_OK); - } - else if (ch->state == Connecting || ch->state == Initial) { + } else if (ch->state == Connecting || ch->state == Initial) { // not connected yet, add to the callbacks if (cb != NULL) { NEWP(r, struct ch_conn_req); @@ -177,14 +176,12 @@ int ziti_channel_connect(ziti_context ztx, const char *ch_name, const char *url, r->ctx = cb_ctx; ch->conn_reqs[ch->conn_reqs_n++] = r; } - } - else if (ch->state == Disconnected) { + } else if (ch->state == Disconnected) { if (cb) { cb(ch, cb_ctx, UV_ENOTCONN); } return ZITI_GATEWAY_UNAVAILABLE; - } - else { + } else { ZITI_LOG(ERROR, "should not be here: %s", ziti_errorstr(ZITI_WTF)); return ZITI_WTF; } @@ -270,9 +267,18 @@ int ziti_channel_send(ziti_channel_t *ch, uint32_t content, const hdr_t *hdrs, i return uv_mbed_write(req, &ch->connection, &buf, on_channel_send); } -int ziti_channel_send_for_reply(ziti_channel_t *ch, uint32_t content, const hdr_t *hdrs, int nhdrs, const uint8_t *body, - uint32_t body_len, - reply_cb rep_cb, void *reply_ctx) { +void ziti_channel_remove_waiter(ziti_channel_t *ch, struct waiter_s *waiter) { + if (waiter) { + LIST_REMOVE(waiter, next); + free(waiter); + } +} + +struct waiter_s * +ziti_channel_send_for_reply(ziti_channel_t *ch, uint32_t content, const hdr_t *hdrs, int nhdrs, const uint8_t *body, + uint32_t body_len, + reply_cb rep_cb, void *reply_ctx) { + struct waiter_s *result = NULL; header_t header; header_init(&header, ch->msg_seq++); @@ -304,6 +310,7 @@ int ziti_channel_send_for_reply(ziti_channel_t *ch, uint32_t content, const hdr_ w->reply_ctx = reply_ctx; LIST_INSERT_HEAD(&ch->waiters, w, next); + result = w; } NEWP(wr, struct async_write_req); @@ -317,12 +324,11 @@ int ziti_channel_send_for_reply(ziti_channel_t *ch, uint32_t content, const hdr_ // Guard against write requests coming on a thread different from our loop if (uv_thread_self() == ch->ctx->loop_thread) { async_write(async_req); - } - else { + } else { uv_async_send(async_req); } - return 0; + return result; } static struct msg_receiver *find_receiver(ziti_channel_t *ch, uint32_t conn_id) { @@ -352,6 +358,7 @@ static bool is_edge(int32_t content) { return false; } } + static void dispatch_message(ziti_channel_t *ch, message *m) { struct waiter_s *w = NULL; @@ -383,20 +390,17 @@ static void dispatch_message(ziti_channel_t *ch, message *m) { if (!has_conn_id) { ZITI_LOG(ERROR, "ch[%d] received message without conn_id ct[%d]", ch->id, m->header.content); - } - else { + } else { struct msg_receiver *conn = find_receiver(ch, conn_id); if (conn == NULL) { ZITI_LOG(DEBUG, "ch[%d] received message for unknown connection conn_id[%d] ct[%d]", ch->id, conn_id, m->header.content); - } - else { + } else { conn->receive(conn->receiver, m, ZITI_OK); } } - } - else { - ZITI_LOG(WARN, "ch[%d] unsupported content type [%d]", ch->id, m->header.content); + } else { + ZITI_LOG(WARN, "ch[%d] unsupported content type [%d]", ch->id, m->header.content); } } @@ -413,7 +417,7 @@ static void process_inbound(ziti_channel_t *ch) { uint8_t header_buf[HEADER_SIZE]; int header_read = 0; - while(header_read < HEADER_SIZE) { + while (header_read < HEADER_SIZE) { len = buffer_get_next(ch->incoming, HEADER_SIZE - header_read, &ptr); memcpy(header_buf + header_read, ptr, len); header_read += len; @@ -515,8 +519,7 @@ static void hello_reply_cb(void *ctx, message *msg) { FREE(ch->version); ch->version = strndup(erVersion, erVersionLen); ch->notify_cb(ch, EdgeRouterConnected, ch->notify_ctx); - } - else { + } else { ZITI_LOG(ERROR, "channel[%d] connect rejected: %d %*s", ch->id, success, msg->header.body_len, msg->body); ch->state = Closed; cb_code = ZITI_GATEWAY_UNAVAILABLE; @@ -537,8 +540,7 @@ static void hello_reply_cb(void *ctx, message *msg) { ch->latency_timer.data = ch; uv_unref((uv_handle_t *) &ch->latency_timer); uv_timer_start(&ch->latency_timer, send_latency_probe, 0, 60 * 1000); - } - else { + } else { reconnect_channel(ch); } } @@ -568,17 +570,23 @@ static void async_write(uv_async_t *ar) { static void reconnect_cb(uv_timer_t *t) { ziti_channel_t *ch = t->data; + ziti_context ztx = ch->ctx; - ch->msg_seq = 0; + if (ztx->session == NULL || ztx->session->token == NULL) { + ZITI_LOG(ERROR, "ziti context is not authenticated, delaying re-connect"); + reconnect_channel(ch); + } else { + ch->msg_seq = 0; - uv_connect_t *req = calloc(1, sizeof(uv_connect_t)); - req->data = ch; + uv_connect_t *req = calloc(1, sizeof(uv_connect_t)); + req->data = ch; - ch->state = Connecting; + ch->state = Connecting; - uv_mbed_init(ch->loop, &ch->connection, ch->connection.tls); - ch->connection._stream.data = ch; - uv_mbed_connect(req, &ch->connection, ch->host, ch->port, on_channel_connect_internal); + uv_mbed_init(ch->loop, &ch->connection, ch->connection.tls); + ch->connection._stream.data = ch; + uv_mbed_connect(req, &ch->connection, ch->host, ch->port, on_channel_connect_internal); + } uv_close((uv_handle_t *) t, (uv_close_cb) free); } @@ -620,6 +628,7 @@ static void on_channel_close(ziti_channel_t *ch, ssize_t code) { if (ch->state != Closed) { reconnect_channel(ch); + ziti_force_session_refresh(ch->ctx); } } @@ -663,12 +672,10 @@ static void on_channel_data(uv_stream_t *s, ssize_t len, const uv_buf_t *buf) { on_channel_close(ch, len); } - } - else if (len == 0) { + } else if (len == 0) { // sometimes SSL message has no payload free(buf->base); - } - else { + } else { ZITI_LOG(TRACE, "ch[%d] on_data [len=%zd]", ch->id, len); if (len > 0) { buffer_append(ch->incoming, buf->base, (uint32_t) len); @@ -689,8 +696,7 @@ static void on_channel_connect_internal(uv_connect_t *req, int status) { uv_mbed_keepalive(mbed, 1, ch->ctx->opts->router_keepalive); } send_hello(ch); - } - else { + } else { ZITI_LOG(ERROR, "ch[%d] failed to connect[%s] [status=%d]", ch->id, ch->name, status); for (int i = 0; i < ch->conn_reqs_n; i++) { diff --git a/library/connect.c b/library/connect.c index 5935195e..0c8cc7a7 100644 --- a/library/connect.c +++ b/library/connect.c @@ -37,6 +37,7 @@ struct ziti_conn_req { ziti_dial_opts *dial_opts; ziti_listen_opts *listen_opts; + struct waiter_s *waiter; int retry_count; uv_timer_t *conn_timeout; bool failed; @@ -126,6 +127,7 @@ int close_conn_internal(struct ziti_conn *conn) { } if (conn->conn_req) { + ziti_channel_remove_waiter(conn->channel, conn->conn_req->waiter); free_conn_req(conn->conn_req); } @@ -758,6 +760,8 @@ void connect_reply_cb(void *ctx, message *msg) { uv_timer_stop(req->conn_timeout); } + req->waiter = NULL; + switch (msg->header.content) { case ContentTypeStateClosed: if (strncmp(INVALID_SESSION, (const char *) msg->body, msg->header.body_len) == 0) { @@ -948,6 +952,8 @@ int ziti_channel_start_connection(struct ziti_conn *conn) { } break; } + + req->waiter = ziti_channel_send_for_reply(ch, content_type, headers, nheaders, conn->token, strlen(conn->token), connect_reply_cb, conn); diff --git a/library/utils.c b/library/utils.c index f6c831e9..7f9c349d 100644 --- a/library/utils.c +++ b/library/utils.c @@ -99,7 +99,7 @@ const char* ziti_git_commit() { return to_str(ZITI_COMMIT); } -int ziti_log_level = ZITI_LOG_DEFAULT_LEVEL; +static int ziti_log_lvl = ZITI_LOG_DEFAULT_LEVEL; static FILE *ziti_debug_out; static bool log_initialized = false; @@ -128,7 +128,7 @@ void ziti_log_init(uv_loop_t *loop, int level, log_writer log_func) { init_debug(loop); if (level == ZITI_LOG_DEFAULT_LEVEL) - level = ziti_log_level; // in case it was set before + level = ziti_log_lvl; // in case it was set before ziti_log_set_level(level); @@ -144,15 +144,24 @@ void ziti_log_set_level(int level) { if (level == ZITI_LOG_DEFAULT_LEVEL) { char *lvl = getenv("ZITI_LOG"); if (lvl != NULL) { - ziti_log_level = (int) strtol(lvl, NULL, 10); + ziti_log_lvl = (int) strtol(lvl, NULL, 10); } else { - ziti_log_level = INFO; + ziti_log_lvl = INFO; } } else { - ziti_log_level = level; + ziti_log_lvl = level; } - uv_mbed_set_debug(ziti_log_level, uv_mbed_logger); + uv_mbed_set_debug(ziti_log_lvl, uv_mbed_logger); + if (logger) { + char msg[128]; + int len = snprintf(msg, sizeof(msg), "set log level: ziti_log_lvl=%d &ziti_log_lvl = %p", ziti_log_lvl, &ziti_log_lvl); + logger(INFO, "ziti_log_set_level", msg, len); + } +} + +int ziti_log_level() { + return ziti_log_lvl; } void ziti_log_set_logger(log_writer log) { @@ -170,7 +179,7 @@ static void init_debug(uv_loop_t *loop) { } ts_loop = loop; log_initialized = true; - ziti_log_set_level(ziti_log_level); + ziti_log_set_level(ziti_log_lvl); ziti_debug_out = stderr; starttime = uv_now(loop); @@ -277,7 +286,7 @@ int lt_zero(int v) { return v < 0; } int non_zero(int v) { return v != 0; } void hexDump (char *desc, void *addr, int len) { - if (DEBUG > ziti_log_level) return; + if (DEBUG > ziti_log_level()) return; ZITI_LOG(DEBUG, " "); int i; unsigned char buffLine[17]; diff --git a/library/ziti.c b/library/ziti.c index 4a3d98c5..7ac63cb1 100644 --- a/library/ziti.c +++ b/library/ziti.c @@ -71,14 +71,15 @@ XX(Initial)\ XX(Accepting) \ XX(Closed) -static const char* strstate(enum conn_state st) { +static const char *strstate(enum conn_state st) { #define state_case(s) case s: return #s; switch (st) { CONN_STATES(state_case) - default: return ""; + default: + return ""; } #undef state_case } @@ -91,13 +92,11 @@ static size_t parse_ref(const char *val, const char **res) { // load file *res = val + strlen("file://"); len = strlen(*res) + 1; - } - else if (strncmp("pem:", val, 4) == 0) { + } else if (strncmp("pem:", val, 4) == 0) { // load inline PEM *res = val + 4; len = strlen(val + 4) + 1; - } - else { + } else { *res = val; len = strlen(val) + 1; } @@ -112,12 +111,11 @@ static int parse_getopt(const char *q, const char *opt, char *out, size_t maxout if (strncasecmp(q, opt, optlen) == 0 && (q[optlen] == '=' || q[optlen] == 0)) { const char *val = q + optlen + 1; char *end = strchr(val, '&'); - int vlen = (int)(end == NULL ? strlen(val) : end - val); + int vlen = (int) (end == NULL ? strlen(val) : end - val); snprintf(out, maxout, "%*.*s", vlen, vlen, val); return ZITI_OK; - } - else { // skip to next '&' + } else { // skip to next '&' q = strchr(q, '&'); if (q == NULL) { break; @@ -241,7 +239,7 @@ static void ziti_init_async(uv_async_t *ar) { ziti_get_build_version(false), ziti_git_commit(), ziti_git_branch(), time_str, start_time.tv_usec / 1000); - + init_req->login = true; ziti_ctrl_init(loop, &ctx->controller, ctx->opts->controller, ctx->tlsCtx); ziti_ctrl_get_version(&ctx->controller, version_cb, ctx); @@ -520,6 +518,10 @@ static void session_refresh(uv_timer_t *t) { ziti_ctrl_current_api_session(&ztx->controller, session_cb, req); } +void ziti_force_session_refresh(ziti_context ztx) { + uv_timer_start(&ztx->session_timer, session_refresh, 0, 0); +} + static void ziti_re_auth(ziti_context ztx) { ZITI_LOG(WARN, "starting to re-auth with ctlr[%s]", ztx->opts->controller); uv_timer_stop(&ztx->refresh_timer); @@ -678,8 +680,7 @@ static void session_cb(ziti_session *session, ziti_error *err, void *ctx) { if (ztx->opts->refresh_interval > 0 && !uv_is_active((const uv_handle_t *) &ztx->refresh_timer)) { ZITI_LOG(DEBUG, "refresh_interval set to %ld seconds", ztx->opts->refresh_interval); uv_timer_start(&ztx->refresh_timer, services_refresh, 0, ztx->opts->refresh_interval * 1000); - } - else if (ztx->opts->refresh_interval == 0) { + } else if (ztx->opts->refresh_interval == 0) { ZITI_LOG(DEBUG, "refresh_interval not specified"); uv_timer_stop(&ztx->refresh_timer); } @@ -699,8 +700,7 @@ static void session_cb(ziti_session *session, ziti_error *err, void *ctx) { // just try to re-auth ziti_re_auth(ztx); errCode = ztx->ctrl_status; // do not trigger event yet - } - else { + } else { // cannot login or re-auth -- identity no longer valid // notify service removal, and state ZITI_LOG(ERROR, "identity[%s] cannot authenticate with ctrl[%s]", ztx->opts->config, @@ -730,13 +730,10 @@ static void session_cb(ziti_session *session, ziti_error *err, void *ctx) { uv_timer_stop(&ztx->posture_checks->timer); } } - } - else { + } else { uv_timer_start(&ztx->session_timer, session_refresh, 5 * 1000, 0); } - - } - else { + } else { ZITI_LOG(ERROR, "%s: no session or error received", ziti_errorstr(ZITI_WTF)); } diff --git a/library/ziti_ctrl.c b/library/ziti_ctrl.c index d99ce698..ee04a684 100644 --- a/library/ziti_ctrl.c +++ b/library/ziti_ctrl.c @@ -23,7 +23,7 @@ limitations under the License. #define DEFAULT_PAGE_SIZE 25 #define ZITI_CTRL_KEEPALIVE 0 -#define ZITI_CTRL_TIMEOUT 10000 +#define ZITI_CTRL_TIMEOUT 15000 const char *const PC_DOMAIN_TYPE = "DOMAIN"; const char *const PC_OS_TYPE = "OS"; diff --git a/library/ziti_enroll.c b/library/ziti_enroll.c index e389a5dc..42651aec 100644 --- a/library/ziti_enroll.c +++ b/library/ziti_enroll.c @@ -106,7 +106,7 @@ int ziti_enroll(ziti_enroll_opts *opts, uv_loop_t *loop, ziti_enroll_cb enroll_c ecfg->private_key = opts->enroll_key; TRY(ziti, load_jwt(opts->jwt, ecfg, &ecfg->zejh, &ecfg->zej)); - if (DEBUG <= ziti_log_level) { + if (DEBUG <= ziti_log_level()) { dump_ziti_enrollment_jwt_header(ecfg->zejh, 0); dump_ziti_enrollment_jwt(ecfg->zej, 0); }