Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement more robust ziti_channel re-connect #736

Merged
merged 6 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ if (NOT TARGET tlsuv)
else ()
FetchContent_Declare(tlsuv
GIT_REPOSITORY https://github.com/openziti/tlsuv.git
GIT_TAG v0.32.1
GIT_TAG v0.32.2
)
FetchContent_MakeAvailable(tlsuv)
endif (tlsuv_DIR)
Expand Down
125 changes: 60 additions & 65 deletions library/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
#include "zt_internal.h"
#include "utils.h"
#include "endian_internal.h"

#if _WIN32
#include "win32_compat.h"
#endif

#ifndef MAXHOSTNAMELEN
#define MAXHOSTNAMELEN 255
Expand All @@ -35,7 +38,7 @@
#define POOLED_MESSAGE_SIZE (32 * 1024)
#define INBOUND_POOL_SIZE (32)

#define CH_LOG(lvl, fmt, ...) ZITI_LOG(lvl, "ch[%d] " fmt, ch->id, ##__VA_ARGS__)

Check warning on line 41 in library/channel.c

View workflow job for this annotation

GitHub Actions / Linux ARM

format '%ld' expects argument of type 'long int', but argument 10 has type 'size_t' ***aka 'unsigned int'*** [-Wformat=]

enum ChannelState {
Initial,
Expand Down Expand Up @@ -67,7 +70,7 @@

static void reconnect_cb(uv_timer_t *t);

static void on_channel_connect_internal(uv_connect_t *req, int status);
static void on_tls_connect(uv_connect_t *req, int status);

static struct msg_receiver *find_receiver(ziti_channel_t *ch, uint32_t conn_id);

Expand All @@ -85,10 +88,12 @@
static void on_tls_close(uv_handle_t *s);

static inline void close_connection(ziti_channel_t *ch) {
if (ch->connection && ch->connection->close_cb == NULL) {
tlsuv_stream_t *conn = ch->connection;
CH_LOG(DEBUG, "closing TLS[%p]", conn);
tlsuv_stream_close(conn, on_tls_close);
tlsuv_stream_t *tls = ch->connection;
ch->connection = NULL;

if (tls) {
CH_LOG(DEBUG, "closing TLS[%p]", tls);
tlsuv_stream_close(tls, on_tls_close);
}
}

Expand All @@ -102,7 +107,7 @@
};

struct msg_receiver {
int id;
uint32_t id;
void *receiver;

void (*receive)(void *receiver, message *m, int code);
Expand Down Expand Up @@ -136,7 +141,7 @@
return 0;
}

static int ziti_channel_init(struct ziti_ctx *ctx, ziti_channel_t *ch, uint32_t id, tls_context *tls) {
static int ziti_channel_init(struct ziti_ctx *ctx, ziti_channel_t *ch, uint32_t id) {
ch->ztx = ctx;
ch->loop = ctx->loop;
ch->id = id;
Expand Down Expand Up @@ -186,7 +191,7 @@
const char *url;
model_list ch_ids = {0};
MODEL_MAP_FOR(it, ztx->channels) {
model_list_append(&ch_ids, model_map_it_key(it));

Check warning on line 194 in library/channel.c

View workflow job for this annotation

GitHub Actions / Linux ARM

passing argument 2 of 'model_list_append' discards 'const' qualifier from pointer target type [-Wdiscarded-qualifiers]

Check warning on line 194 in library/channel.c

View workflow job for this annotation

GitHub Actions / Linux ARM64

passing argument 2 of 'model_list_append' discards 'const' qualifier from pointer target type [-Wdiscarded-qualifiers]

Check warning on line 194 in library/channel.c

View workflow job for this annotation

GitHub Actions / Linux x86_64

passing argument 2 of 'model_list_append' discards 'const' qualifier from pointer target type [-Wdiscarded-qualifiers]

Check warning on line 194 in library/channel.c

View workflow job for this annotation

GitHub Actions / MacOS x86_64

passing 'const char *' to parameter of type 'void *' discards qualifiers [-Wincompatible-pointer-types-discards-qualifiers]

Check warning on line 194 in library/channel.c

View workflow job for this annotation

GitHub Actions / MacOS arm64

passing 'const char *' to parameter of type 'void *' discards qualifiers [-Wincompatible-pointer-types-discards-qualifiers]
}

MODEL_LIST_FOR(it, ch_ids) {
Expand All @@ -203,16 +208,6 @@

static void on_tls_close(uv_handle_t *s) {
tlsuv_stream_t *tls = (tlsuv_stream_t *) s;
ziti_channel_t *ch = tls->data;

if (ch) {
ch->connection = NULL;
if (ch->reconnect) {
ch->reconnect = false;
reconnect_channel(ch, true);
}
}

tlsuv_stream_free(tls);
free(tls);
}
Expand All @@ -228,8 +223,6 @@
uv_close((uv_handle_t *) ch->timer, (uv_close_cb) free);
ch->timer = NULL;

close_connection(ch);

ziti_channel_free(ch);
free(ch);
}
Expand Down Expand Up @@ -265,11 +258,11 @@

static ziti_channel_t *new_ziti_channel(ziti_context ztx, const char *ch_name, const char *url) {
ziti_channel_t *ch = calloc(1, sizeof(ziti_channel_t));
ziti_channel_init(ztx, ch, channel_counter++, ztx->tlsCtx);
ziti_channel_init(ztx, ch, channel_counter++);
const ziti_identity *identity = ziti_get_identity(ztx);
ch->name = strdup(ch_name);
ch->url = strdup(url);
CH_LOG(INFO, "(%s) new channel for ztx[%d] identity[%s]", ch->name, ztx->id, ziti_get_identity(ztx)->name);
CH_LOG(INFO, "(%s) new channel for ztx[%d] identity[%s]", ch->name, ztx->id, identity->name);

struct tlsuv_url_s ingress;
tlsuv_parse_url(&ingress, url);
Expand Down Expand Up @@ -329,7 +322,7 @@

const char* token = ziti_get_api_session_token(ch->ztx);
ziti_channel_send_for_reply(ch, ContentTypeUpdateTokenType,
NULL, 0, token, strlen(token), token_update_cb, ch);

Check warning on line 325 in library/channel.c

View workflow job for this annotation

GitHub Actions / MacOS x86_64

passing 'const char *' to parameter of type 'const uint8_t *' (aka 'const unsigned char *') converts between pointers to integer types where one is of the unique plain 'char' type and the other is not [-Wpointer-sign]

Check warning on line 325 in library/channel.c

View workflow job for this annotation

GitHub Actions / MacOS arm64

passing 'const char *' to parameter of type 'const uint8_t *' (aka 'const unsigned char *') converts between pointers to integer types where one is of the unique plain 'char' type and the other is not [-Wpointer-sign]
return ZITI_OK;
}

Expand Down Expand Up @@ -401,7 +394,9 @@

if (status < 0) {
CH_LOG(ERROR, "write failed [%d/%s]", status, uv_strerror(status));
on_channel_close(ch, ZITI_CONNABORT, status);
if (ch->out_q == 0) {
on_channel_close(ch, ZITI_CONNABORT, status);
}
}

free(w);
Expand Down Expand Up @@ -652,7 +647,6 @@
ch->latency_waiter = NULL;
ch->latency = UINT64_MAX;

close_connection(ch);
on_channel_close(ch, ZITI_TIMEOUT, UV_ETIMEDOUT);
}
}
Expand Down Expand Up @@ -703,15 +697,12 @@
ch->notify_cb(ch, EdgeRouterConnected, ch->notify_ctx);
ch->latency = uv_now(ch->loop) - ch->latency;
uv_timer_start(ch->timer, send_latency_probe, LATENCY_INTERVAL, 0);
}
else {
if (msg)
} else {
if (msg) {
CH_LOG(ERROR, "connect rejected: %d %*s", success, msg->header.body_len, msg->body);
}

ch->state = Disconnected;
ch->notify_cb(ch, EdgeRouterUnavailable, ch->notify_ctx);
close_connection(ch);
reconnect_channel(ch, false);
on_channel_close(ch, ZITI_CONNABORT, 0);
}
}

Expand All @@ -730,7 +721,7 @@
},
};
ch->latency = uv_now(ch->loop);
ziti_channel_send_for_reply(ch, ContentTypeHelloType, headers, 2, ch->token, strlen(ch->token), hello_reply_cb, ch);

Check warning on line 724 in library/channel.c

View workflow job for this annotation

GitHub Actions / MacOS x86_64

passing 'char[37]' to parameter of type 'const uint8_t *' (aka 'const unsigned char *') converts between pointers to integer types where one is of the unique plain 'char' type and the other is not [-Wpointer-sign]

Check warning on line 724 in library/channel.c

View workflow job for this annotation

GitHub Actions / MacOS arm64

passing 'char[37]' to parameter of type 'const uint8_t *' (aka 'const unsigned char *') converts between pointers to integer types where one is of the unique plain 'char' type and the other is not [-Wpointer-sign]
}


Expand All @@ -738,17 +729,12 @@
ziti_channel_t *ch = t->data;
CH_LOG(ERROR, "connect timeout");

if (ch->state == Closed) {
return;
}

ch->state = Disconnected;
if (ch->connection->conn_req == NULL) {
if (ch->connection && ch->connection->conn_req == NULL) {
// diagnostics
CH_LOG(WARN, "diagnostics: no conn_req in connect timeout");
}
reconnect_channel(ch, false);
close_connection(ch);

on_channel_close(ch, ZITI_TIMEOUT, UV_ETIMEDOUT);
}

static void reconnect_cb(uv_timer_t *t) {
Expand All @@ -773,9 +759,9 @@

CH_LOG(DEBUG, "connecting to %s", ch->url);

int rc = tlsuv_stream_connect(req, ch->connection, ch->host, ch->port, on_channel_connect_internal);
int rc = tlsuv_stream_connect(req, ch->connection, ch->host, ch->port, on_tls_connect);
if (rc != 0) {
on_channel_connect_internal(req, rc);
on_tls_connect(req, rc);
} else {
uv_timer_start(ch->timer, ch_connect_timeout, CONNECT_TIMEOUT, 0);
}
Expand Down Expand Up @@ -852,22 +838,24 @@
ch->in_next = NULL;
}

close_connection(ch);

if (ziti_err == ZITI_DISABLED || ziti_err == ZITI_GATEWAY_UNAVAILABLE) {
return;
}

if (ch->state != Closed) {
if (uv_err == UV_EOF) {
ZTX_LOG(VERBOSE, "edge router closed connection, trying to refresh api session");
ziti_force_api_session_refresh(ch->ztx);
}
reconnect_channel(ch, false);
if (uv_err == UV_EOF) {
ZTX_LOG(VERBOSE, "edge router closed connection, trying to refresh api session");
ziti_force_api_session_refresh(ch->ztx);
}

reconnect_channel(ch, ch->reconnect);
ch->reconnect = false;
}

static void channel_alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
tlsuv_stream_t *mbed = (tlsuv_stream_t *) handle;
ziti_channel_t *ch = mbed->data;
tlsuv_stream_t *tls = (tlsuv_stream_t *) handle;
ziti_channel_t *ch = tls->data;
if (ch->in_next || pool_has_available(ch->in_msg_pool)) {
buf->base = (char *) malloc(suggested_size);
if (buf->base == NULL) {
Expand All @@ -885,11 +873,11 @@
}

static void on_channel_data(uv_stream_t *s, ssize_t len, const uv_buf_t *buf) {
tlsuv_stream_t *ssl = (tlsuv_stream_t *) s;
ziti_channel_t *ch = ssl->data;
tlsuv_stream_t *tls = (tlsuv_stream_t *) s;
ziti_channel_t *ch = tls->data;

if (len == UV_ENOBUFS) {
tlsuv_stream_read_stop(ssl);
tlsuv_stream_read_stop(tls);
CH_LOG(VERBOSE, "blocked until messages are processed");
return;
}
Expand All @@ -899,7 +887,6 @@
CH_LOG(INFO, "channel disconnected [%zd/%s]", len, uv_strerror(len));
// propagate close
on_channel_close(ch, ZITI_CONNABORT, len);
close_connection(ch);
return;
}

Expand All @@ -912,13 +899,29 @@

CH_LOG(TRACE, "on_data [len=%zd]", len);
ch->last_read = uv_now(ch->loop);
buffer_append(ch->incoming, buf->base, (uint32_t) len);

Check warning on line 902 in library/channel.c

View workflow job for this annotation

GitHub Actions / MacOS arm64

passing 'char *const' to parameter of type 'uint8_t *' (aka 'unsigned char *') converts between pointers to integer types where one is of the unique plain 'char' type and the other is not [-Wpointer-sign]
process_inbound(ch);
}

static void on_channel_connect_internal(uv_connect_t *req, int status) {
static void on_tls_connect(uv_connect_t *req, int status) {
tlsuv_stream_t *tls = (tlsuv_stream_t *)req->handle;

// connect request was cancelled via tlsuv_stream_close
// cleanup in close callback
if (status == UV_ECANCELED || tls->data == NULL) {
goto done;
}

ziti_channel_t *ch = tls->data;
assert(ch);

if (tls != ch->connection) {
scareything marked this conversation as resolved.
Show resolved Hide resolved
// this should never happen but handle it anyway -- close connected tls stream
CH_LOG(ERROR, "invalid state, mismatch req->conn[%p] != ch->conn[%p]", tls, ch->connection);
tls->data = NULL;
tlsuv_stream_close(tls, on_tls_close);
goto done;
}

if (status == 0) {
const char *token = ziti_get_api_session_token(ch->ztx);
Expand All @@ -930,21 +933,13 @@
send_hello(ch, token);
} else {
CH_LOG(WARN, "api session invalidated, while connecting");
close_connection(ch);
reconnect_channel(ch, false);
on_channel_close(ch, ZITI_CONNABORT, 0);
}
} else if (ch != NULL) {
} else {
CH_LOG(ERROR, "failed to connect to ER[%s] [%d/%s]", ch->name, status, uv_strerror(status));

if (status != UV_ECANCELED) {
close_connection(ch);
}

if (ch->state != Closed) {
ch->state = Disconnected;
reconnect_channel(ch, false);
}
on_channel_close(ch, ZITI_CONNABORT, status);
}
done:
free(req);
}

Expand Down
Loading