From 6ee97bd322f92847f94e9a56c40b0a2614eaccaf Mon Sep 17 00:00:00 2001 From: ekoby <7406535+ekoby@users.noreply.github.com> Date: Mon, 21 Dec 2020 15:35:28 -0500 Subject: [PATCH] Ziti conn improvement (#206) * improve ziti_connection lifecycle [fixes #154] - ziti_close() now takes a callback - avoid crash if we receive close before connect reply --- inc_internal/zt_internal.h | 30 ++-- includes/ziti/ziti.h | 15 +- library/channel.c | 16 +- library/connect.c | 233 +++++++++++++++++++++++------ library/ziti.c | 87 ++++------- library/ziti_src.c | 2 +- programs/sample-host/sample-host.c | 8 +- programs/sample_wttr/sample_wttr.c | 4 +- programs/wzcat/ziti_ws.c | 4 +- programs/ziti-prox-c/proxy.c | 22 +-- tests/model_tests.cpp | 1 + tests/test_ziti_model.cpp | 4 +- 12 files changed, 282 insertions(+), 144 deletions(-) diff --git a/inc_internal/zt_internal.h b/inc_internal/zt_internal.h index 253c4654..da33d5c3 100644 --- a/inc_internal/zt_internal.h +++ b/inc_internal/zt_internal.h @@ -38,6 +38,8 @@ limitations under the License. #define UUID_STR_LEN 37 #endif +#define TYPE_DIAL "Dial" +#define TYPE_BIND "Bind" typedef struct ziti_channel ziti_channel_t; @@ -49,18 +51,8 @@ typedef void (*ch_connect_cb)(ziti_channel_t *ch, void *ctx, int status); typedef void (*ch_notify_state)(ziti_channel_t *ch, ziti_router_status status, void *ctx); -enum conn_state { - Initial, - Connecting, - Connected, - Binding, - Bound, - Accepting, - Timedout, - CloseWrite, - Disconnected, - Closed -}; +typedef int ch_state; +typedef int conn_state; typedef struct ziti_channel { uv_loop_t *loop; @@ -77,7 +69,7 @@ typedef struct ziti_channel { uint64_t latency; uv_timer_t latency_timer; - enum conn_state state; + ch_state state; uint32_t reconnect_count; struct ch_conn_req **conn_reqs; @@ -122,9 +114,11 @@ struct ziti_conn { ziti_channel_t *channel; ziti_data_cb data_cb; ziti_client_cb client_cb; - enum conn_state state; + ziti_close_cb close_cb; + conn_state state; bool fin_sent; bool fin_recv; + bool close; int timeout; buffer *inbound; @@ -205,12 +199,16 @@ struct ziti_ctx { extern "C" { #endif +void ziti_invalidate_session(ziti_context ztx, ziti_net_session *session, const char *service_id, const char *type); + void ziti_on_channel_event(ziti_channel_t *ch, ziti_router_status status, ziti_context ztx); void ziti_force_session_refresh(ziti_context ztx); int ziti_close_channels(ziti_context ztx); +bool ziti_channel_is_connected(ziti_channel_t *ch); + int ziti_channel_connect(ziti_context ztx, const char *name, const char *url, ch_connect_cb, void *ctx); int ziti_channel_close(ziti_channel_t *ch); @@ -243,12 +241,12 @@ void conn_inbound_data_msg(ziti_connection conn, message *msg); int ziti_write_req(struct ziti_write_req_s *req); -int ziti_disconnect(struct ziti_conn *conn); - void on_write_completed(struct ziti_conn *conn, struct ziti_write_req_s *req, int status); int close_conn_internal(struct ziti_conn *conn); +const char* ziti_conn_state(ziti_connection conn); + int establish_crypto(ziti_connection conn, message *msg); void ziti_fmt_time(char *time_str, size_t time_str_len, uv_timeval64_t *tv); diff --git a/includes/ziti/ziti.h b/includes/ziti/ziti.h index 76f1b53e..e3135245 100644 --- a/includes/ziti/ziti.h +++ b/includes/ziti/ziti.h @@ -378,6 +378,15 @@ typedef void (*ziti_write_cb)(ziti_connection conn, ssize_t status, void *write_ */ typedef void (*ziti_enroll_cb)(ziti_config *cfg, int status, char *err_message, void *enroll_ctx); +/** + * @brief Callback called after connection was closed. + * + * @param conn connection that was closed + * + * @see ziti_close() + */ +typedef void (*ziti_close_cb)(ziti_connection conn); + /** * @brief Performs a Ziti enrollment. * @@ -689,12 +698,16 @@ extern int ziti_accept(ziti_connection clt, ziti_conn_cb cb, ziti_data_cb data_c * When no longer needed a [connection](#ziti_connection) should be closed to gracefully disconnect. This * function should be invoked after any status is returned which indicates an error situation. * + * This method initiates the disconnect(if needed) and the release all associated resources. + * After close_cb() is called, the ziti_connection handle is no longer valid. + * * @param conn the #ziti_connection to be closed + * @param close_cb callback called after connection is closed * * @return #ZITI_OK or corresponding #ZITI_ERRORS */ ZITI_FUNC -extern int ziti_close(ziti_connection *conn); +extern int ziti_close(ziti_connection conn, ziti_close_cb close_cb); /** * @brief Closes the outgoing (write) side of the given ziti connection. diff --git a/library/channel.c b/library/channel.c index ca09d94b..549f0323 100644 --- a/library/channel.c +++ b/library/channel.c @@ -30,6 +30,14 @@ limitations under the License. #define BACKOFF_TIME 3000 /* 3 seconds */ #define MAX_BACKOFF 5 /* max reconnection timeout: (1 << 5) * BACKOFF_TIME = 96 seconds */ +enum ChannelState { + Initial, + Connecting, + Connected, + Disconnected, + Closed, +}; + static void reconnect_channel(ziti_channel_t *ch); static void on_channel_connect_internal(uv_connect_t *req, int status); @@ -119,7 +127,7 @@ int ziti_close_channels(struct ziti_ctx *ziti) { return ZITI_OK; } -void ziti_close_cb(uv_handle_t *h) { +static void close_handle_cb(uv_handle_t *h) { uv_mbed_t *mbed = (uv_mbed_t *) h; ziti_channel_t *ch = mbed->_stream.data; @@ -132,7 +140,7 @@ int ziti_channel_close(ziti_channel_t *ch) { int r = 0; if (ch->state != Closed) { ZITI_LOG(INFO, "closing ch[%d](%s)", ch->id, ch->name); - r = uv_mbed_close(&ch->connection, ziti_close_cb); + r = uv_mbed_close(&ch->connection, close_handle_cb); uv_timer_stop(&ch->latency_timer); uv_close((uv_handle_t *) &ch->latency_timer, NULL); ch->state = Closed; @@ -160,6 +168,10 @@ void ziti_channel_rem_receiver(ziti_channel_t *ch, int id) { } } +bool ziti_channel_is_connected(ziti_channel_t *ch) { + return ch->state == Connected; +} + int ziti_channel_connect(ziti_context ztx, const char *ch_name, const char *url, ch_connect_cb cb, void *cb_ctx) { ziti_channel_t *ch = model_map_get(&ztx->channels, ch_name); diff --git a/library/connect.c b/library/connect.c index 0c8cc7a7..05a5100b 100644 --- a/library/connect.c +++ b/library/connect.c @@ -22,13 +22,35 @@ limitations under the License. #include "endian_internal.h" #include "win32_compat.h" -static const char *TYPE_BIND = "Bind"; -static const char *TYPE_DIAL = "Dial"; static const char *INVALID_SESSION = "Invalid Session"; static const int MAX_CONNECT_RETRY = 3; #define crypto(func) crypto_secretstream_xchacha20poly1305_##func +#define conn_states(XX) \ + XX(Initial)\ + XX(Connecting)\ + XX(Connected)\ + XX(Binding)\ + XX(Bound)\ + XX(Accepting)\ + XX(Timedout)\ + XX(CloseWrite)\ + XX(Disconnected)\ + XX(Closed) + +enum conn_state { +#define state_enum(ST) ST, + conn_states(state_enum) +}; + +static const char* conn_state_str[] = { +#define state_str(ST) #ST , + conn_states(state_str) +}; + + + struct ziti_conn_req { const char *session_type; char *service_id; @@ -37,9 +59,9 @@ struct ziti_conn_req { ziti_dial_opts *dial_opts; ziti_listen_opts *listen_opts; - struct waiter_s *waiter; int retry_count; uv_timer_t *conn_timeout; + struct waiter_s *waiter; bool failed; }; @@ -51,12 +73,21 @@ static int send_fin_message(ziti_connection conn); static void process_edge_message(struct ziti_conn *conn, message *msg, int code); -int ziti_channel_start_connection(struct ziti_conn *conn); +static int ziti_channel_start_connection(struct ziti_conn *conn); static void free_handle(uv_handle_t *h) { free(h); } +const char* ziti_conn_state(ziti_connection conn) { + return conn ? conn_state_str[conn->state] : ""; +} + +static void conn_set_state(struct ziti_conn *conn, enum conn_state state) { + ZITI_LOG(VERBOSE, "conn[%d] transitioning %s => %s", conn->conn_id, conn_state_str[conn->state], conn_state_str[state]); + conn->state = state; +} + static ziti_dial_opts *clone_ziti_dial_opts(const ziti_dial_opts *dial_opts) { if (dial_opts == NULL) { ZITI_LOG(DEBUG, "refuse to clone NULL dial_opts"); @@ -122,6 +153,10 @@ static void free_conn_req(struct ziti_conn_req *r) { int close_conn_internal(struct ziti_conn *conn) { if (conn->state == Closed && conn->write_reqs == 0) { ZITI_LOG(DEBUG, "removing conn_id[%d]", conn->conn_id); + if (conn->close_cb) { + conn->close_cb(conn); + } + if (conn->channel) { ziti_channel_rem_receiver(conn->channel, conn->conn_id); } @@ -171,16 +206,16 @@ void on_write_completed(struct ziti_conn *conn, struct ziti_write_req_s *req, in uv_close((uv_handle_t *) req->timeout, free_handle); } + if (status < 0) { + conn_set_state(conn, Disconnected); + ZITI_LOG(DEBUG, "conn[%d] is now Disconnected due to write failure: %d", conn->conn_id, status); + } + if (req->cb != NULL) { if (status == 0) { status = req->len; } - if (status < 0) { - conn->state = Closed; - ZITI_LOG(TRACE, "connection[%d] state is now Closed", conn->conn_id); - } - req->cb(conn, status, req->ctx); } @@ -263,7 +298,7 @@ static void connect_timeout(uv_timer_t *timer) { if (conn->state == Connecting) { ZITI_LOG(WARN, "ziti connection timed out"); - conn->state = Timedout; + conn_set_state(conn, Timedout); complete_conn_req(conn, ZITI_TIMEOUT); } else { @@ -446,8 +481,7 @@ static int do_ziti_dial(ziti_connection conn, const char *service, ziti_dial_opt } conn->data_cb = data_cb; - conn->state = Connecting; - + conn_set_state(conn, Connecting); NEWP(async_cr, uv_async_t); uv_async_init(conn->ziti_ctx->loop, async_cr, ziti_connect_async); @@ -483,8 +517,8 @@ static void ziti_write_timeout(uv_timer_t *t) { req->timeout = NULL; req->conn = NULL; - if (conn->state != Closed) { - conn->state = Closed; + if (conn->state < Disconnected) { + conn_set_state(conn, Disconnected); req->cb(conn, ZITI_TIMEOUT, req->ctx); } @@ -495,7 +529,7 @@ static void ziti_write_async(uv_async_t *ar) { struct ziti_write_req_s *req = ar->data; struct ziti_conn *conn = req->conn; - if (conn->state == Closed) { + if (conn->state >= Disconnected) { ZITI_LOG(WARN, "got write req for closed conn[%d]", conn->conn_id); conn->write_reqs--; @@ -539,7 +573,7 @@ int ziti_write_req(struct ziti_write_req_s *req) { } static void ziti_disconnect_cb(ziti_connection conn, ssize_t status, void *ctx) { - conn->state = Closed; + conn_set_state(conn, conn->close ? Closed : Disconnected); } static void ziti_disconnect_async(uv_async_t *ar) { @@ -547,6 +581,7 @@ static void ziti_disconnect_async(uv_async_t *ar) { switch (conn->state) { case Bound: case Accepting: + case Connecting: case Connected: case CloseWrite: { NEWP(wr, struct ziti_write_req_s); @@ -558,20 +593,21 @@ static void ziti_disconnect_async(uv_async_t *ar) { } default: - ZITI_LOG(DEBUG, "conn[%d] can't send StateClosed in state[%d]", conn->conn_id, conn->state); + ZITI_LOG(DEBUG, "conn[%d] can't send StateClosed in state[%s]", conn->conn_id, conn_state_str[conn->state]); + ziti_disconnect_cb(conn, 0, NULL); } } -int ziti_disconnect(struct ziti_conn *conn) { - NEWP(ar, uv_async_t); - if (conn->channel) { +static int ziti_disconnect(struct ziti_conn *conn) { + if (conn->state < Disconnected) { + NEWP(ar, uv_async_t); uv_async_init(conn->ziti_ctx->loop, ar, ziti_disconnect_async); ar->data = conn; conn->disconnector = ar; return uv_async_send(conn->disconnector); } else { - conn->state = Closed; + conn_set_state(conn, conn->close ? Closed : Disconnected); } return ZITI_OK; } @@ -579,7 +615,7 @@ int ziti_disconnect(struct ziti_conn *conn) { static void crypto_wr_cb(ziti_connection conn, ssize_t status, void *ctx) { if (status < 0) { ZITI_LOG(ERROR, "crypto header write failed with status[%zd]", status); - conn->state = Closed; + conn_set_state(conn, Disconnected); conn->data_cb(conn, NULL, status); } } @@ -642,7 +678,7 @@ static int send_crypto_header(ziti_connection conn) { static void flush_to_client(uv_check_t *fl) { ziti_connection conn = fl->data; - if (conn == NULL || conn->state == Closed) { + if (conn == NULL || conn->state >= Disconnected) { uv_check_stop(fl); return; } @@ -680,7 +716,7 @@ static void flush_to_client(uv_check_t *fl) { void conn_inbound_data_msg(ziti_connection conn, message *msg) { uint8_t *plain_text = NULL; - if (conn->state == Closed || conn->fin_recv) { + if (conn->state >= Disconnected || conn->fin_recv) { ZITI_LOG(WARN, "inbound data on closed connection"); return; } @@ -711,7 +747,7 @@ void conn_inbound_data_msg(ziti_connection conn, message *msg) { CATCH(crypto) { FREE(plain_text); - conn->state = Closed; + conn_set_state(conn, Disconnected); conn->data_cb(conn, NULL, ZITI_CRYPTO_FAIL); return; } @@ -788,7 +824,7 @@ void connect_reply_cb(void *ctx, message *msg) { ZITI_LOG(ERROR, "edge conn_id[%d]: failed to %s, reason=%*.*s", conn->conn_id, conn->state == Binding ? "bind" : "connect", msg->header.body_len, msg->header.body_len, msg->body); - conn->state = Closed; + conn_set_state(conn, Disconnected); complete_conn_req(conn, ZITI_CONN_CLOSED); } break; @@ -800,12 +836,12 @@ void connect_reply_cb(void *ctx, message *msg) { if (rc == ZITI_OK && conn->encrypted) { send_crypto_header(conn); } - conn->state = rc == ZITI_OK ? Connected : Closed; + conn_set_state(conn, rc == ZITI_OK ? Connected : Disconnected); complete_conn_req(conn, rc); } else if (conn->state == Binding) { ZITI_LOG(TRACE, "edge conn_id[%d]: bound.", conn->conn_id); - conn->state = Bound; + conn_set_state(conn, Bound); complete_conn_req(conn, ZITI_OK); } else if (conn->state == Accepting) { @@ -813,10 +849,10 @@ void connect_reply_cb(void *ctx, message *msg) { if (conn->encrypted) { send_crypto_header(conn); } - conn->state = Connected; + conn_set_state(conn, Connected); complete_conn_req(conn, ZITI_OK); } - else if (conn->state == Closed || conn->state == Timedout) { + else if (conn->state >= Timedout) { ZITI_LOG(WARN, "received connect reply for closed/timedout connection[%d]", conn->conn_id); ziti_disconnect(conn); } @@ -828,7 +864,7 @@ void connect_reply_cb(void *ctx, message *msg) { } } -int ziti_channel_start_connection(struct ziti_conn *conn) { +static int ziti_channel_start_connection(struct ziti_conn *conn) { struct ziti_conn_req *req = conn->conn_req; ziti_channel_t *ch = conn->channel; @@ -843,7 +879,7 @@ int ziti_channel_start_connection(struct ziti_conn *conn) { case Connecting: content_type = ContentTypeConnect; break; - case Closed: + case Disconnected: ZITI_LOG(WARN, "channel did not connect in time for connection[%d]. ", conn->conn_id); return ZITI_OK; default: @@ -979,7 +1015,7 @@ int ziti_bind(ziti_connection conn, const char *service, ziti_listen_opts *liste } conn->client_cb = on_clt_cb; - conn->state = Binding; + conn_set_state(conn, Binding); NEWP(async_cr, uv_async_t); uv_async_init(conn->ziti_ctx->loop, async_cr, ziti_connect_async); @@ -999,7 +1035,7 @@ int ziti_accept(ziti_connection conn, ziti_conn_cb cb, ziti_data_cb data_cb) { conn->flusher->data = conn; uv_unref((uv_handle_t *) &conn->flusher); - ziti_channel_add_receiver(ch, conn->conn_id, conn, process_edge_message); + ziti_channel_add_receiver(ch, conn->conn_id, conn, (void (*)(void *, message *, int)) process_edge_message); ZITI_LOG(TRACE, "ch[%d] => Edge Accept conn_id[%d] parent_conn_id[%d]", ch->id, conn->conn_id, conn->parent->conn_id); @@ -1031,12 +1067,33 @@ int ziti_accept(ziti_connection conn, ziti_conn_cb cb, ziti_data_cb data_cb) { req->cb = cb; conn->conn_req = req; - ziti_channel_send_for_reply(ch, content_type, headers, 3, (const uint8_t *) &clt_conn_id, sizeof(clt_conn_id), - connect_reply_cb, conn); + req->waiter = ziti_channel_send_for_reply( + ch, content_type, headers, 3, + (const uint8_t *) &clt_conn_id, sizeof(clt_conn_id), + connect_reply_cb, conn); return ZITI_OK; } +int ziti_write(ziti_connection conn, uint8_t *data, size_t length, ziti_write_cb write_cb, void *write_ctx) { + if (conn->state != Connected) { + ZITI_LOG(ERROR, "attempted write on conn[%d] in invalid state[%d]", conn->conn_id, conn->state); + write_cb(conn, ZITI_INVALID_STATE, write_ctx); + return ZITI_INVALID_STATE; + } + + NEWP(req, struct ziti_write_req_s); + req->conn = conn; + req->buf = data; + req->len = length; + req->cb = write_cb; + req->ctx = write_ctx; + + metrics_rate_update(&conn->ziti_ctx->up_rate, length); + + return ziti_write_req(req); +} + static int send_fin_message(ziti_connection conn) { ziti_channel_t *ch = conn->channel; int32_t conn_id = htole32(conn->conn_id); @@ -1063,11 +1120,22 @@ static int send_fin_message(ziti_connection conn) { return ziti_channel_send(ch, ContentTypeData, headers, 3, NULL, 0, wr); } +int ziti_close(ziti_connection conn, ziti_close_cb close_cb) { + if (conn != NULL) { + conn->close = true; + conn->close_cb = close_cb; + ziti_disconnect(conn); + } + + return ZITI_OK; +} + + int ziti_close_write(ziti_connection conn) { - if (conn->fin_sent || conn->state == Closed) { + if (conn->fin_sent || conn->state >= Disconnected) { return ZITI_OK; } - conn->state = CloseWrite; + conn_set_state(conn, CloseWrite); if (conn->write_reqs == 0) { return send_fin_message(conn); } @@ -1078,12 +1146,12 @@ static void process_edge_message(struct ziti_conn *conn, message *msg, int code) if (msg == NULL) { ZITI_LOG(DEBUG, "conn[%d] is closed due to err[%d](%s)", conn->conn_id, code, ziti_errorstr(code)); + conn_set_state(conn, Disconnected); if (conn->state == Connected) { conn->data_cb(conn, NULL, code); } else if (conn->state == Bound) { conn->client_cb(conn, NULL, code); } - conn->state = Closed; return; } @@ -1097,14 +1165,56 @@ static void process_edge_message(struct ziti_conn *conn, message *msg, int code) switch (msg->header.content) { case ContentTypeStateClosed: - ZITI_LOG(VERBOSE, "connection status[%d] conn_id[%d] seq[%d]", msg->header.content, conn_id, seq); - if (conn->state == Bound) { - conn->client_cb(conn, NULL, ZITI_CONN_CLOSED); - } - else if (conn->state == Connected || conn->state == CloseWrite) { - conn->data_cb(conn, NULL, ZITI_CONN_CLOSED); + ZITI_LOG(DEBUG, "connection status[%d] conn_id[%d] seq[%d] err[%.*s]", msg->header.content, conn_id, seq, + msg->header.body_len, msg->body); + bool retry_connect = false; + + switch (conn->state) { + case Connecting: + case Accepting: + case Binding: { + if (strncmp(INVALID_SESSION, (const char *) msg->body, msg->header.body_len) == 0) { + ZITI_LOG(WARN, "conn[%d] session for service[%s] became invalid", conn->conn_id, conn->service); + ziti_invalidate_session(conn->ziti_ctx, conn->conn_req->session, + conn->conn_req->service_id, conn->conn_req->session_type); + conn->conn_req->session = NULL; + retry_connect = true; + } + if (retry_connect) { + ziti_channel_rem_receiver(conn->channel, conn->conn_id); + conn->channel = NULL; + conn_set_state(conn, Connecting); + restart_connect(conn); + } else { + ZITI_LOG(ERROR, "edge conn_id[%d]: failed to %s, reason=%*.*s", + conn->conn_id, conn->state == Binding ? "bind" : "connect", + msg->header.body_len, msg->header.body_len, msg->body); + conn_set_state(conn, Disconnected); + complete_conn_req(conn, ZITI_CONN_CLOSED); + } + break; + } + + case Bound: + conn_set_state(conn, Disconnected); + conn->client_cb(conn, NULL, ZITI_CONN_CLOSED); + break; + + case Connected: + case CloseWrite: + conn_set_state(conn, Disconnected); + conn->data_cb(conn, NULL, ZITI_CONN_CLOSED); + break; + + + default: + ZITI_LOG(WARN, "unexpected msg for conn[%d] state[%d]", conn->conn_id, conn->state); + conn_set_state(conn, Disconnected); + if (conn->data_cb) { + conn->data_cb(conn, NULL, ZITI_INVALID_STATE); + } + break; } - conn->state = Closed; break; case ContentTypeData: @@ -1134,7 +1244,7 @@ static void process_edge_message(struct ziti_conn *conn, message *msg, int code) if (caller_id_sent) { clt->source_identity = strndup((char *)source_identity, source_identity_sz); } - clt->state = Accepting; + conn_set_state(clt, Accepting); clt->parent = conn; clt->channel = conn->channel; clt->dial_req_seq = msg->header.seq; @@ -1145,6 +1255,35 @@ static void process_edge_message(struct ziti_conn *conn, message *msg, int code) conn->client_cb(conn, clt, ZITI_OK); break; + case ContentTypeStateConnected: + if (conn->state == Connecting) { + ZITI_LOG(TRACE, "edge conn_id[%d]: connected.", conn->conn_id); + int rc = establish_crypto(conn, msg); + if (rc == ZITI_OK && conn->encrypted) { + send_crypto_header(conn); + } + conn_set_state(conn, rc == ZITI_OK ? Connected : Disconnected); + complete_conn_req(conn, rc); + } + else if (conn->state == Binding) { + ZITI_LOG(TRACE, "edge conn_id[%d]: bound.", conn->conn_id); + conn_set_state(conn, Bound); + complete_conn_req(conn, ZITI_OK); + } + else if (conn->state == Accepting) { + ZITI_LOG(TRACE, "edge conn_id[%d]: accepted.", conn->conn_id); + if (conn->encrypted) { + send_crypto_header(conn); + } + conn_set_state(conn, Connected); + complete_conn_req(conn, ZITI_OK); + } + else if (conn->state >= Timedout) { + ZITI_LOG(WARN, "received connect reply for closed/timedout connection[%d]", conn->conn_id); + ziti_disconnect(conn); + } + break; + default: ZITI_LOG(ERROR, "conn[%d] received unexpected content_type[%d]", conn_id, msg->header.content); } diff --git a/library/ziti.c b/library/ziti.c index 7ac63cb1..e522002f 100644 --- a/library/ziti.c +++ b/library/ziti.c @@ -62,28 +62,6 @@ static void ziti_init_async(uv_async_t *ar); static void grim_reaper(uv_prepare_t *p); -#define CONN_STATES(XX) \ -XX(Initial)\ - XX(Connecting)\ - XX(Connected)\ - XX(Binding)\ - XX(Bound)\ - XX(Accepting) \ - XX(Closed) - -static const char *strstate(enum conn_state st) { -#define state_case(s) case s: return #s; - - switch (st) { - - CONN_STATES(state_case) - - default: - return ""; - } -#undef state_case -} - static size_t parse_ref(const char *val, const char **res) { size_t len = 0; *res = NULL; @@ -281,7 +259,6 @@ extern void *ziti_app_ctx(ziti_context ztx) { return ztx->opts->app_ctx; } - const char *ziti_get_controller(ziti_context ztx) { return ztx->opts->controller; } @@ -369,17 +346,18 @@ void ziti_dump(ziti_context ztx) { ziti_channel_t *ch; const char *url; MODEL_MAP_FOREACH(url, ch, &ztx->channels) { - printf("ch[%d](%s) %s\n", ch->id, url, ch->state == Disconnected ? "Disconnected" : ""); + printf("ch[%d](%s) %s\n", ch->id, url, ziti_channel_is_connected(ch) ? "" : "Disconnected"); } printf("\n==================\nConnections:\n"); ziti_connection conn; LIST_FOREACH(conn, &ztx->connections, next) { - printf("conn[%d]: state[%d] service[%s] using ch[%d] %s\n", - conn->conn_id, conn->state, conn->service, + printf("conn[%d]: state[%s] service[%s] using ch[%d] %s\n", + conn->conn_id, ziti_conn_state(conn), conn->service, conn->channel ? conn->channel->id : -1, conn->channel ? conn->channel->name : "(none)"); } + printf("\n==================\n\n"); } int ziti_conn_init(ziti_context ztx, ziti_connection *conn, void *data) { @@ -388,7 +366,6 @@ int ziti_conn_init(ziti_context ztx, ziti_connection *conn, void *data) { c->ziti_ctx = ztx; c->data = data; c->channel = NULL; - c->state = Initial; c->timeout = ctx->ziti_timeout; c->edge_msg_seq = 1; c->conn_id = ztx->conn_seq++; @@ -413,36 +390,6 @@ const char *ziti_conn_source_identity(ziti_connection conn) { return conn != NULL ? conn->source_identity : NULL; } -int ziti_close(ziti_connection *conn) { - struct ziti_conn *c = *conn; - - if (c != NULL) { - ziti_disconnect(c); - } - - *conn = NULL; - - return ZITI_OK; -} - -int ziti_write(ziti_connection conn, uint8_t *data, size_t length, ziti_write_cb write_cb, void *write_ctx) { - if (conn->state != Connected) { - ZITI_LOG(ERROR, "attempted write on conn[%d] in invalid state[%d]", conn->conn_id, conn->state); - write_cb(conn, ZITI_INVALID_STATE, write_ctx); - return ZITI_INVALID_STATE; - } - - NEWP(req, struct ziti_write_req_s); - req->conn = conn; - req->buf = data; - req->len = length; - req->cb = write_cb; - req->ctx = write_ctx; - - metrics_rate_update(&conn->ziti_ctx->up_rate, length); - - return ziti_write_req(req); -} static void ziti_send_event(ziti_context ztx, const ziti_event_t *e) { if ((ztx->opts->events & e->type) && ztx->opts->event_cb) { @@ -539,7 +486,8 @@ static void ziti_re_auth(ziti_context ztx) { ziti_ctrl_login(&ztx->controller, ztx->opts->config_types, session_cb, init_req); } -static void update_services(ziti_service_array services, ziti_error *error, ziti_context ztx) { +static void update_services(ziti_service_array services, ziti_error *error, void *ctx) { + ziti_context ztx = ctx; if (error) { ZITI_LOG(ERROR, "failed to get service updates err[%s/%s] from ctrl[%s]", error->code, error->message, ztx->opts->controller); @@ -761,6 +709,27 @@ static void version_cb(ziti_version *v, ziti_error *err, void *ctx) { } } +void ziti_invalidate_session(ziti_context ztx, ziti_net_session *session, const char *service_id, const char* type) { + if (session == NULL) { + return; + } + + if (strcmp(TYPE_DIAL, type) == 0) { + ziti_net_session *s = model_map_get(&ztx->sessions, service_id); + if (s != session) { + // already removed or different one + // passed reference is no longer valid + session = NULL; + } + else if (s == session) { + model_map_remove(&ztx->sessions, session->service_id); + } + } + + free_ziti_net_session(session); + FREE(session); +} + static const ziti_version sdk_version = { .version = to_str(ZITI_VERSION), .revision = to_str(ZITI_COMMIT), @@ -785,7 +754,7 @@ static void grim_reaper(uv_prepare_t *p) { count += close_conn_internal(try_close); } if (count > 0) { - ZITI_LOG(INFO, "reaped %d closed (out of %d total) connections", count, total); + ZITI_LOG(DEBUG, "reaped %d closed (out of %d total) connections", count, total); } } diff --git a/library/ziti_src.c b/library/ziti_src.c index 99d9cf2d..7a22959d 100644 --- a/library/ziti_src.c +++ b/library/ziti_src.c @@ -156,7 +156,7 @@ void zl_close(uv_link_t* link, uv_link_t* source, uv_link_close_cb link_close_cb ziti_link_t *zl = (ziti_link_t *)link; ZITI_LOG(TRACE, "%s", zl->service); - ziti_close(&zl->conn); + ziti_close(zl->conn, NULL); link_close_cb((uv_link_t *) zl); } diff --git a/programs/sample-host/sample-host.c b/programs/sample-host/sample-host.c index 48e717a5..662d7b5f 100644 --- a/programs/sample-host/sample-host.c +++ b/programs/sample-host/sample-host.c @@ -43,7 +43,7 @@ static ssize_t on_client_data(ziti_connection clt, uint8_t *data, ssize_t len) { } else if (len == ZITI_EOF) { printf("client disconnected\n"); - ziti_close(&clt); + ziti_close(clt, NULL); } else { fprintf(stderr, "error: %zd(%s)", len, ziti_errorstr(len)); @@ -82,7 +82,7 @@ static void listen_cb(ziti_connection serv, int status) { } else { printf("ERROR The Byte Counter could not be started: %d(%s)\n", status, ziti_errorstr(status)); - ziti_close(&serv); + ziti_close(serv, NULL); } } @@ -109,13 +109,13 @@ ssize_t on_data(ziti_connection c, uint8_t *buf, ssize_t len) { if (len == ZITI_EOF) { printf("request completed: %s\n", ziti_errorstr(len)); - ziti_close(&c); + ziti_close(c, NULL); ziti_shutdown(ziti); } else if (len < 0) { fprintf(stderr, "unexpected error: %s\n", ziti_errorstr(len)); - ziti_close(&c); + ziti_close(c, NULL); ziti_shutdown(ziti); } else { diff --git a/programs/sample_wttr/sample_wttr.c b/programs/sample_wttr/sample_wttr.c index 9b2686e7..83f240ad 100644 --- a/programs/sample_wttr/sample_wttr.c +++ b/programs/sample_wttr/sample_wttr.c @@ -35,13 +35,13 @@ ssize_t on_data(ziti_connection c, uint8_t *buf, ssize_t len) { if (len == ZITI_EOF) { printf("request completed: %s\n", ziti_errorstr(len)); - ziti_close(&c); + ziti_close(c, NULL); ziti_shutdown(ziti); } else if (len < 0) { fprintf(stderr, "unexpected error: %s\n", ziti_errorstr(len)); - ziti_close(&c); + ziti_close(c, NULL); ziti_shutdown(ziti); } else { diff --git a/programs/wzcat/ziti_ws.c b/programs/wzcat/ziti_ws.c index cccf3d22..94aac930 100644 --- a/programs/wzcat/ziti_ws.c +++ b/programs/wzcat/ziti_ws.c @@ -87,8 +87,8 @@ static void on_ws_data(uv_stream_t *s, ssize_t len, const uv_buf_t *buf) { printf("< %.*s", (int)len, buf->base); } else if (len < 0) { fprintf(stderr, "=========================\nwebsocket error[%zd]: %s\n", len, ziti_errorstr(len)); - um_websocket_close(s, (uv_close_cb) free); - uv_close(&input, NULL); + um_websocket_close((um_websocket_t *) s, (uv_close_cb) free); + uv_close((uv_handle_t *) &input, NULL); ziti_shutdown(ctx); } } diff --git a/programs/ziti-prox-c/proxy.c b/programs/ziti-prox-c/proxy.c index 76f6e0cc..c604cc5f 100644 --- a/programs/ziti-prox-c/proxy.c +++ b/programs/ziti-prox-c/proxy.c @@ -164,6 +164,14 @@ static void close_cb(uv_handle_t *h) { free(h); } +static void on_ziti_close(ziti_connection conn) { + uv_stream_t *tcp = ziti_conn_data(conn); + struct client *clt = tcp->data; + ZITI_LOG(INFO, "ziti connection closed for clt[%s]", clt->addr_s); + clt->closed = true; + uv_close((uv_handle_t *) tcp, close_cb); +} + static void on_client_write(uv_write_t *req, int status) { uv_stream_t *s = req->handle; struct client *client = s->data; @@ -175,7 +183,7 @@ static void on_client_write(uv_write_t *req, int status) { ZITI_LOG(WARN, "write failed: [%d/%s](%s) -- closing client[%s]", status, uv_err_name(status), uv_strerror(status), client->addr_s); client->write_done = true; - ziti_close(&client->ziti_conn); + ziti_close(client->ziti_conn, on_ziti_close); break; } default: @@ -231,10 +239,7 @@ static void data_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { clt->read_done = true; if (clt->write_done) { ZITI_LOG(DEBUG, "closing client[%s]", clt->addr_s); - ziti_conn_set_data(clt->ziti_conn, NULL); - ziti_close(&clt->ziti_conn); - clt->closed = true; - uv_close((uv_handle_t *) stream, close_cb); + ziti_close(clt->ziti_conn, on_ziti_close); } else { ziti_close_write(clt->ziti_conn); @@ -246,8 +251,7 @@ static void data_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { ZITI_LOG(DEBUG, "connection closed %s [%zd/%s](%s)", clt->addr_s, nread, uv_err_name(nread), uv_strerror(nread)); - ziti_conn_set_data(clt->ziti_conn, NULL); - ziti_close(&clt->ziti_conn); + ziti_close(clt->ziti_conn, on_ziti_close); uv_read_stop(stream); uv_close((uv_handle_t *) stream, close_cb); @@ -273,7 +277,7 @@ void on_ziti_connect(ziti_connection conn, int status) { else { ZITI_LOG(ERROR, "ziti connect failed: %s(%d)", ziti_errorstr(status), status); uv_close((uv_handle_t *) clt, close_cb); - ziti_close(&conn); + ziti_close(conn, on_ziti_close); } } @@ -294,7 +298,7 @@ ssize_t on_ziti_data(ziti_connection conn, uint8_t *data, ssize_t len) { if (!uv_is_active((const uv_handle_t *) clt)) { c->closed = true; ZITI_LOG(DEBUG, "tcp side of client[%s] is closed", c->addr_s); - ziti_close(&c->ziti_conn); + ziti_close(c->ziti_conn, on_ziti_close); return UV_ECONNABORTED; } diff --git a/tests/model_tests.cpp b/tests/model_tests.cpp index 0e7cf0f0..84191f4f 100644 --- a/tests/model_tests.cpp +++ b/tests/model_tests.cpp @@ -186,6 +186,7 @@ TEST_CASE("test string escape", "[model]") { XX(bar, json, none, bar, __VA_ARGS__) \ XX(ok, bool, none, ok, __VA_ARGS__) +#undef MODEL_API #define MODEL_API static DECLARE_MODEL(Baz, baz_model) IMPL_MODEL(Baz, baz_model) diff --git a/tests/test_ziti_model.cpp b/tests/test_ziti_model.cpp index fc4ad1d6..b1ca9ad7 100644 --- a/tests/test_ziti_model.cpp +++ b/tests/test_ziti_model.cpp @@ -22,7 +22,9 @@ limitations under the License. #define timegm(v) _mkgmtime(v) #define gmtime(v) _gmtime32(v) #else -#define _GNU_SOURCE //add time.h include after defining _GNU_SOURCE +# ifndef _GNU_SOURCE +# define _GNU_SOURCE //add time.h include after defining _GNU_SOURCE +# endif #include #endif