Skip to content

Commit

Permalink
Merge 0.18.6 (#207)
Browse files Browse the repository at this point in the history
* fix crash if conn is closed before connect reply

* remove controller keep-alive and set timeout to 15s

* fix handling of controller errors

- AUTH error -- retry, and if fails again stop context, notify services
  unavailable
- NOT AVAILABLE -- try refresh, keep services

* ziti_log_level()

* force session refresh if router closes edge connection

* log changing log level
  • Loading branch information
ekoby authored Dec 21, 2020
1 parent 2e36335 commit 68871fc
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 70 deletions.
6 changes: 5 additions & 1 deletion inc_internal/zt_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ extern "C" {

void ziti_on_channel_event(ziti_channel_t *ch, ziti_router_status status, ziti_context ztx);

void ziti_force_session_refresh(ziti_context ztx);

int ziti_close_channels(ziti_context ztx);

int ziti_channel_connect(ziti_context ztx, const char *name, const char *url, ch_connect_cb, void *ctx);
Expand All @@ -221,11 +223,13 @@ int ziti_channel_send(ziti_channel_t *ch, uint32_t content, const hdr_t *hdrs, i
uint32_t body_len,
struct ziti_write_req_s *ziti_write);

int
struct waiter_s*
ziti_channel_send_for_reply(ziti_channel_t *ch, uint32_t content, const hdr_t *headers, int nhdrs, const uint8_t *body,
uint32_t body_len, reply_cb,
void *reply_ctx);

void ziti_channel_remove_waiter(ziti_channel_t *ch, struct waiter_s *waiter);

int load_config(const char *filename, ziti_config **);

int load_jwt(const char *filename, struct enroll_cfg_s *ecfg, ziti_enrollment_jwt_header **, ziti_enrollment_jwt **);
Expand Down
4 changes: 2 additions & 2 deletions includes/ziti/ziti_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ enum DebugLevel {
};

#define ZITI_LOG(level, fmt, ...) do { \
if (level <= ziti_log_level) { ziti_logger(level, __FILE__, __LINE__, __func__, fmt, ##__VA_ARGS__); }\
if (level <= ziti_log_level()) { ziti_logger(level, __FILE__, __LINE__, __func__, fmt, ##__VA_ARGS__); }\
} while(0)

#ifdef __cplusplus
Expand All @@ -65,7 +65,7 @@ ZITI_FUNC extern void ziti_log_set_logger(log_writer logger);
ZITI_FUNC extern void ziti_log_set_level(int level);

// don't use directly
ZITI_FUNC extern int ziti_log_level;
ZITI_FUNC extern int ziti_log_level();

#ifdef __cplusplus
}
Expand Down
82 changes: 44 additions & 38 deletions library/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ static int ziti_channel_init(struct ziti_ctx *ctx, ziti_channel_t *ch, uint32_t
return 0;
}

void ziti_channel_free(ziti_channel_t* ch) {
void ziti_channel_free(ziti_channel_t *ch) {
free(ch->conn_reqs);
free_buffer(ch->incoming);
FREE(ch->name);
Expand Down Expand Up @@ -168,23 +168,20 @@ int ziti_channel_connect(ziti_context ztx, const char *ch_name, const char *url,

if (ch->state == Connected) {
cb(ch, cb_ctx, ZITI_OK);
}
else if (ch->state == Connecting || ch->state == Initial) {
} else if (ch->state == Connecting || ch->state == Initial) {
// not connected yet, add to the callbacks
if (cb != NULL) {
NEWP(r, struct ch_conn_req);
r->cb = cb;
r->ctx = cb_ctx;
ch->conn_reqs[ch->conn_reqs_n++] = r;
}
}
else if (ch->state == Disconnected) {
} else if (ch->state == Disconnected) {
if (cb) {
cb(ch, cb_ctx, UV_ENOTCONN);
}
return ZITI_GATEWAY_UNAVAILABLE;
}
else {
} else {
ZITI_LOG(ERROR, "should not be here: %s", ziti_errorstr(ZITI_WTF));
return ZITI_WTF;
}
Expand Down Expand Up @@ -270,9 +267,18 @@ int ziti_channel_send(ziti_channel_t *ch, uint32_t content, const hdr_t *hdrs, i
return uv_mbed_write(req, &ch->connection, &buf, on_channel_send);
}

int ziti_channel_send_for_reply(ziti_channel_t *ch, uint32_t content, const hdr_t *hdrs, int nhdrs, const uint8_t *body,
uint32_t body_len,
reply_cb rep_cb, void *reply_ctx) {
void ziti_channel_remove_waiter(ziti_channel_t *ch, struct waiter_s *waiter) {
if (waiter) {
LIST_REMOVE(waiter, next);
free(waiter);
}
}

struct waiter_s *
ziti_channel_send_for_reply(ziti_channel_t *ch, uint32_t content, const hdr_t *hdrs, int nhdrs, const uint8_t *body,
uint32_t body_len,
reply_cb rep_cb, void *reply_ctx) {
struct waiter_s *result = NULL;
header_t header;
header_init(&header, ch->msg_seq++);

Expand Down Expand Up @@ -304,6 +310,7 @@ int ziti_channel_send_for_reply(ziti_channel_t *ch, uint32_t content, const hdr_
w->reply_ctx = reply_ctx;

LIST_INSERT_HEAD(&ch->waiters, w, next);
result = w;
}

NEWP(wr, struct async_write_req);
Expand All @@ -317,12 +324,11 @@ int ziti_channel_send_for_reply(ziti_channel_t *ch, uint32_t content, const hdr_
// Guard against write requests coming on a thread different from our loop
if (uv_thread_self() == ch->ctx->loop_thread) {
async_write(async_req);
}
else {
} else {
uv_async_send(async_req);
}

return 0;
return result;
}

static struct msg_receiver *find_receiver(ziti_channel_t *ch, uint32_t conn_id) {
Expand Down Expand Up @@ -352,6 +358,7 @@ static bool is_edge(int32_t content) {
return false;
}
}

static void dispatch_message(ziti_channel_t *ch, message *m) {
struct waiter_s *w = NULL;

Expand Down Expand Up @@ -383,20 +390,17 @@ static void dispatch_message(ziti_channel_t *ch, message *m) {

if (!has_conn_id) {
ZITI_LOG(ERROR, "ch[%d] received message without conn_id ct[%d]", ch->id, m->header.content);
}
else {
} else {
struct msg_receiver *conn = find_receiver(ch, conn_id);
if (conn == NULL) {
ZITI_LOG(DEBUG, "ch[%d] received message for unknown connection conn_id[%d] ct[%d]",
ch->id, conn_id, m->header.content);
}
else {
} else {
conn->receive(conn->receiver, m, ZITI_OK);
}
}
}
else {
ZITI_LOG(WARN, "ch[%d] unsupported content type [%d]", ch->id, m->header.content);
} else {
ZITI_LOG(WARN, "ch[%d] unsupported content type [%d]", ch->id, m->header.content);
}
}

Expand All @@ -413,7 +417,7 @@ static void process_inbound(ziti_channel_t *ch) {
uint8_t header_buf[HEADER_SIZE];
int header_read = 0;

while(header_read < HEADER_SIZE) {
while (header_read < HEADER_SIZE) {
len = buffer_get_next(ch->incoming, HEADER_SIZE - header_read, &ptr);
memcpy(header_buf + header_read, ptr, len);
header_read += len;
Expand Down Expand Up @@ -515,8 +519,7 @@ static void hello_reply_cb(void *ctx, message *msg) {
FREE(ch->version);
ch->version = strndup(erVersion, erVersionLen);
ch->notify_cb(ch, EdgeRouterConnected, ch->notify_ctx);
}
else {
} else {
ZITI_LOG(ERROR, "channel[%d] connect rejected: %d %*s", ch->id, success, msg->header.body_len, msg->body);
ch->state = Closed;
cb_code = ZITI_GATEWAY_UNAVAILABLE;
Expand All @@ -537,8 +540,7 @@ static void hello_reply_cb(void *ctx, message *msg) {
ch->latency_timer.data = ch;
uv_unref((uv_handle_t *) &ch->latency_timer);
uv_timer_start(&ch->latency_timer, send_latency_probe, 0, 60 * 1000);
}
else {
} else {
reconnect_channel(ch);
}
}
Expand Down Expand Up @@ -568,17 +570,23 @@ static void async_write(uv_async_t *ar) {

static void reconnect_cb(uv_timer_t *t) {
ziti_channel_t *ch = t->data;
ziti_context ztx = ch->ctx;

ch->msg_seq = 0;
if (ztx->session == NULL || ztx->session->token == NULL) {
ZITI_LOG(ERROR, "ziti context is not authenticated, delaying re-connect");
reconnect_channel(ch);
} else {
ch->msg_seq = 0;

uv_connect_t *req = calloc(1, sizeof(uv_connect_t));
req->data = ch;
uv_connect_t *req = calloc(1, sizeof(uv_connect_t));
req->data = ch;

ch->state = Connecting;
ch->state = Connecting;

uv_mbed_init(ch->loop, &ch->connection, ch->connection.tls);
ch->connection._stream.data = ch;
uv_mbed_connect(req, &ch->connection, ch->host, ch->port, on_channel_connect_internal);
uv_mbed_init(ch->loop, &ch->connection, ch->connection.tls);
ch->connection._stream.data = ch;
uv_mbed_connect(req, &ch->connection, ch->host, ch->port, on_channel_connect_internal);
}
uv_close((uv_handle_t *) t, (uv_close_cb) free);
}

Expand Down Expand Up @@ -620,6 +628,7 @@ static void on_channel_close(ziti_channel_t *ch, ssize_t code) {

if (ch->state != Closed) {
reconnect_channel(ch);
ziti_force_session_refresh(ch->ctx);
}
}

Expand Down Expand Up @@ -663,12 +672,10 @@ static void on_channel_data(uv_stream_t *s, ssize_t len, const uv_buf_t *buf) {
on_channel_close(ch, len);

}
}
else if (len == 0) {
} else if (len == 0) {
// sometimes SSL message has no payload
free(buf->base);
}
else {
} else {
ZITI_LOG(TRACE, "ch[%d] on_data [len=%zd]", ch->id, len);
if (len > 0) {
buffer_append(ch->incoming, buf->base, (uint32_t) len);
Expand All @@ -689,8 +696,7 @@ static void on_channel_connect_internal(uv_connect_t *req, int status) {
uv_mbed_keepalive(mbed, 1, ch->ctx->opts->router_keepalive);
}
send_hello(ch);
}
else {
} else {
ZITI_LOG(ERROR, "ch[%d] failed to connect[%s] [status=%d]", ch->id, ch->name, status);

for (int i = 0; i < ch->conn_reqs_n; i++) {
Expand Down
6 changes: 6 additions & 0 deletions library/connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ struct ziti_conn_req {
ziti_dial_opts *dial_opts;
ziti_listen_opts *listen_opts;

struct waiter_s *waiter;
int retry_count;
uv_timer_t *conn_timeout;
bool failed;
Expand Down Expand Up @@ -126,6 +127,7 @@ int close_conn_internal(struct ziti_conn *conn) {
}

if (conn->conn_req) {
ziti_channel_remove_waiter(conn->channel, conn->conn_req->waiter);
free_conn_req(conn->conn_req);
}

Expand Down Expand Up @@ -758,6 +760,8 @@ void connect_reply_cb(void *ctx, message *msg) {
uv_timer_stop(req->conn_timeout);
}

req->waiter = NULL;

switch (msg->header.content) {
case ContentTypeStateClosed:
if (strncmp(INVALID_SESSION, (const char *) msg->body, msg->header.body_len) == 0) {
Expand Down Expand Up @@ -948,6 +952,8 @@ int ziti_channel_start_connection(struct ziti_conn *conn) {
}
break;
}

req->waiter =
ziti_channel_send_for_reply(ch, content_type, headers, nheaders, conn->token, strlen(conn->token),
connect_reply_cb, conn);

Expand Down
25 changes: 17 additions & 8 deletions library/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ const char* ziti_git_commit() {
return to_str(ZITI_COMMIT);
}

int ziti_log_level = ZITI_LOG_DEFAULT_LEVEL;
static int ziti_log_lvl = ZITI_LOG_DEFAULT_LEVEL;
static FILE *ziti_debug_out;
static bool log_initialized = false;

Expand Down Expand Up @@ -128,7 +128,7 @@ void ziti_log_init(uv_loop_t *loop, int level, log_writer log_func) {
init_debug(loop);

if (level == ZITI_LOG_DEFAULT_LEVEL)
level = ziti_log_level; // in case it was set before
level = ziti_log_lvl; // in case it was set before

ziti_log_set_level(level);

Expand All @@ -144,15 +144,24 @@ void ziti_log_set_level(int level) {
if (level == ZITI_LOG_DEFAULT_LEVEL) {
char *lvl = getenv("ZITI_LOG");
if (lvl != NULL) {
ziti_log_level = (int) strtol(lvl, NULL, 10);
ziti_log_lvl = (int) strtol(lvl, NULL, 10);
} else {
ziti_log_level = INFO;
ziti_log_lvl = INFO;
}
} else {
ziti_log_level = level;
ziti_log_lvl = level;
}

uv_mbed_set_debug(ziti_log_level, uv_mbed_logger);
uv_mbed_set_debug(ziti_log_lvl, uv_mbed_logger);
if (logger) {
char msg[128];
int len = snprintf(msg, sizeof(msg), "set log level: ziti_log_lvl=%d &ziti_log_lvl = %p", ziti_log_lvl, &ziti_log_lvl);
logger(INFO, "ziti_log_set_level", msg, len);
}
}

int ziti_log_level() {
return ziti_log_lvl;
}

void ziti_log_set_logger(log_writer log) {
Expand All @@ -170,7 +179,7 @@ static void init_debug(uv_loop_t *loop) {
}
ts_loop = loop;
log_initialized = true;
ziti_log_set_level(ziti_log_level);
ziti_log_set_level(ziti_log_lvl);
ziti_debug_out = stderr;

starttime = uv_now(loop);
Expand Down Expand Up @@ -277,7 +286,7 @@ int lt_zero(int v) { return v < 0; }
int non_zero(int v) { return v != 0; }

void hexDump (char *desc, void *addr, int len) {
if (DEBUG > ziti_log_level) return;
if (DEBUG > ziti_log_level()) return;
ZITI_LOG(DEBUG, " ");
int i;
unsigned char buffLine[17];
Expand Down
Loading

0 comments on commit 68871fc

Please sign in to comment.