Skip to content

Commit

Permalink
Fix channel reconnect (#581)
Browse files Browse the repository at this point in the history
* reset connection on failed connect
* wait for enroller to finish
* update [email protected]
*  set msg seq once
* clear up TLS connection on any error
  • Loading branch information
ekoby authored Dec 5, 2023
1 parent 99394ab commit dd8a94b
Show file tree
Hide file tree
Showing 12 changed files with 44 additions and 25 deletions.
2 changes: 1 addition & 1 deletion deps/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ else ()

FetchContent_Declare(tlsuv
GIT_REPOSITORY https://github.com/openziti/tlsuv.git
GIT_TAG v0.27.0
GIT_TAG v0.27.2
)
FetchContent_MakeAvailable(tlsuv)

Expand Down
2 changes: 1 addition & 1 deletion inc_internal/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ message *message_new_from_header(pool_t *pool, uint8_t buf[HEADER_SIZE]);

message *message_new(pool_t *pool, uint32_t content, const hdr_t *headers, int nheaders, size_t body_len);

void message_set_seq(message *m, uint32_t seq);
void message_set_seq(message *m, uint32_t *seq);


#ifdef __cplusplus
Expand Down
3 changes: 2 additions & 1 deletion library/bind.c
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ void on_unbind(void *ctx, message *m, int code) {
.value = (uint8_t *) &conn_id
},
};
ziti_channel_send(b->ch, ContentTypeStateClosed, headers, 1, NULL, 0, NULL);
message *close_msg = message_new(NULL, ContentTypeStateClosed, headers, 1, 0);
ziti_channel_send_message(b->ch, close_msg, NULL);
} else {
CONN_LOG(TRACE, "failed to receive unbind response because channel was disconnected: %d/%s", code, ziti_errorstr(code));
}
Expand Down
15 changes: 11 additions & 4 deletions library/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ static int ziti_channel_init(struct ziti_ctx *ctx, ziti_channel_t *ch, uint32_t
ch->ctx = ctx;
ch->loop = ctx->loop;
ch->id = id;
ch->msg_seq = -1;
// ch->msg_seq = 0;

char hostname[MAXHOSTNAMELEN];
size_t hostlen = sizeof(hostname);
Expand Down Expand Up @@ -363,6 +363,9 @@ void on_channel_send(uv_write_t *w, int status) {

int ziti_channel_send_message(ziti_channel_t *ch, message *msg, struct ziti_write_req_s *ziti_write) {
uv_buf_t buf = uv_buf_init((char *) msg->msgbufp, msg->msgbuflen);
message_set_seq(msg, &ch->msg_seq);
CH_LOG(TRACE, "=> ct[%04X] seq[%d] len[%d]", msg->header.content, msg->header.seq, msg->header.body_len);

NEWP(req, uv_write_t);
if (ziti_write == NULL) {
ziti_write = calloc(1, sizeof(struct ziti_write_req_s));
Expand All @@ -386,7 +389,7 @@ int ziti_channel_send(ziti_channel_t *ch, uint32_t content, const hdr_t *hdrs, i
uint32_t body_len,
struct ziti_write_req_s *ziti_write) {
message *m = message_new(NULL, content, hdrs, nhdrs, body_len);
message_set_seq(m, ch->msg_seq++);
message_set_seq(m, &ch->msg_seq);
CH_LOG(TRACE, "=> ct[%04X] seq[%d] len[%d]", content, m->header.seq, body_len);
memcpy(m->body, body, body_len);

Expand All @@ -406,8 +409,7 @@ ziti_channel_send_for_reply(ziti_channel_t *ch, uint32_t content, const hdr_t *h
reply_cb rep_cb, void *reply_ctx) {
struct waiter_s *result = NULL;
message *m = message_new(NULL, content, hdrs, nhdrs, body_len);
message_set_seq(m, ch->msg_seq++);
CH_LOG(TRACE, "=> ct[%04X] seq[%d] len[%d]", content, m->header.seq, body_len);
message_set_seq(m, &ch->msg_seq);
memcpy(m->body, body, body_len);

if (rep_cb != NULL) {
Expand Down Expand Up @@ -838,6 +840,8 @@ static void on_channel_data(uv_stream_t *s, ssize_t len, const uv_buf_t *buf) {
CH_LOG(INFO, "channel was closed [%zd/%s]", len, uv_strerror(len));
// propagate close
on_channel_close(ch, ZITI_CONNABORT, len);
tlsuv_stream_close(ch->connection, on_tls_close);
ch->connection = NULL;
break;

default:
Expand Down Expand Up @@ -881,6 +885,9 @@ static void on_channel_connect_internal(uv_connect_t *req, int status) {
free(r);
}

tlsuv_stream_close(ch->connection, on_tls_close);
ch->connection = NULL;

if (ch->state != Closed) {
ch->state = Disconnected;
reconnect_channel(ch, false);
Expand Down
11 changes: 7 additions & 4 deletions library/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include "utils.h"
#include "endian_internal.h"

static uint8_t *read_int32(const uint8_t *p, uint32_t *val) {
static const uint8_t *read_int32(const uint8_t *p, uint32_t *val) {
*val = le32toh(*(uint32_t *) p);
return p + sizeof(uint32_t);
}
Expand Down Expand Up @@ -84,7 +84,7 @@ uint8_t *write_hdr(const hdr_t *h, uint8_t *buf) {
}

int parse_hdrs(uint8_t *buf, uint32_t len, hdr_t **hp) {
uint8_t *p = buf;
const uint8_t *p = buf;

hdr_t *headers = NULL;
int count = 0;
Expand Down Expand Up @@ -230,7 +230,10 @@ message *message_new(pool_t *pool, uint32_t content, const hdr_t *hdrs, int nhdr
return m;
}

void message_set_seq(message *m, uint32_t seq) {
m->header.seq = seq;
void message_set_seq(message *m, uint32_t *seq) {
if (m->header.seq == 0) {
*seq += 1;
m->header.seq = *seq;
}
header_to_buffer(&m->header, m->msgbufp);
}
6 changes: 3 additions & 3 deletions library/ziti_ctrl.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ static void ctrl_paging_req(struct ctrl_resp *resp);

static void ctrl_default_cb(void *s, const ziti_error *e, struct ctrl_resp *resp);

static void ctrl_body_cb(tlsuv_http_req_t *req, const char *b, ssize_t len);
static void ctrl_body_cb(tlsuv_http_req_t *req, char *b, ssize_t len);

static tlsuv_http_req_t *
start_request(tlsuv_http_t *http, const char *method, const char *path, tlsuv_http_resp_cb cb, struct ctrl_resp *resp) {
Expand Down Expand Up @@ -285,11 +285,11 @@ static void ctrl_service_cb(ziti_service **services, ziti_error *e, struct ctrl_
free(services);
}

static void free_body_cb(tlsuv_http_req_t *req, const char *body, ssize_t len) {
static void free_body_cb(tlsuv_http_req_t *req, char *body, ssize_t len) {
free((char *) body);
}

static void ctrl_body_cb(tlsuv_http_req_t *req, const char *b, ssize_t len) {
static void ctrl_body_cb(tlsuv_http_req_t *req, char *b, ssize_t len) {
struct ctrl_resp *resp = req->data;
ziti_controller *ctrl = resp->ctrl;

Expand Down
4 changes: 2 additions & 2 deletions programs/sample-host/sample-host.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void on_connect(ziti_connection conn, int status) {

static size_t total;

ssize_t on_data(ziti_connection c, uint8_t *buf, ssize_t len) {
ssize_t on_data(ziti_connection c, const uint8_t *buf, ssize_t len) {
if (len == ZITI_EOF) {

printf("request completed: %s\n", ziti_errorstr(len));
Expand All @@ -152,7 +152,7 @@ ssize_t on_data(ziti_connection c, uint8_t *buf, ssize_t len) {
static uv_signal_t sig;
static void on_signal(uv_signal_t *h, int signal) {
ziti_context ztx = h->data;
ziti_dump(ztx, fprintf, stdout);
ziti_dump(ztx, (int (*)(void *, const char *, ...)) fprintf, stdout);
}

static void on_ziti_init(ziti_context ztx, const ziti_event_t *ev) {
Expand Down
2 changes: 1 addition & 1 deletion programs/sample_http_link/sample_http_link.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void resp_cb(tlsuv_http_resp_t *resp, void *data) {
printf("\n");
}

void body_cb(tlsuv_http_req_t *req, const char *body, ssize_t len) {
void body_cb(tlsuv_http_req_t *req, char *body, ssize_t len) {
if (len == UV_EOF) {
printf("\n\n====================\nRequest completed\n");
ziti_shutdown(ziti);
Expand Down
2 changes: 1 addition & 1 deletion programs/sample_wttr/sample_wttr.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ exit(code);\
static size_t total;
static ziti_context ziti;

ssize_t on_data(ziti_connection c, uint8_t *buf, ssize_t len) {
ssize_t on_data(ziti_connection c, const uint8_t *buf, ssize_t len) {
if (len == ZITI_EOF) {

printf("request completed: %s\n", ziti_errorstr(len));
Expand Down
2 changes: 2 additions & 0 deletions tests/integ/bootstrap.exp
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,14 @@ expect {
"ziti identity is saved in ./test-server.json" {}
eof { error "test-server not enrolled" }
}
wait

spawn $enroller ./test-client.jwt ./test-client.json
expect {
"ziti identity is saved in ./test-client.json" {}
eof { error "test-client not enrolled" }
}
wait



Expand Down
18 changes: 12 additions & 6 deletions tests/message_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ TEST_CASE("simple", "[model]") {
},
};
auto content1 = "this is a message";
uint32_t s1 = 3333;
auto m1 = message_new(p, ContentTypeData, headers, 2, strlen(content1));
strncpy(reinterpret_cast<char *>(m1->body), content1, strlen(content1));
message_set_seq(m1, 3333);
message_set_seq(m1, &s1);

auto m2 = message_new_from_header(p, m1->msgbufp);
CHECK(m2->header.seq == 3333);
CHECK(m2->header.seq == 3334);
CHECK(m2->msgbuflen == m1->msgbuflen);
memcpy(m2->msgbufp, m1->msgbufp, m1->msgbuflen);
m2->nhdrs = parse_hdrs(m2->headers, m2->header.headers_len, &m2->hdrs);
Expand Down Expand Up @@ -75,13 +76,16 @@ TEST_CASE("large", "[model]") {
.value = (uint8_t *) "bar"
},
};
uint32_t seq = 3333;
auto content1 = "this is a very long message, it won't fint into the pooled message structure";
auto m1 = message_new(p, ContentTypeData, headers, 2, strlen(content1));
strncpy(reinterpret_cast<char *>(m1->body), content1, strlen(content1));
message_set_seq(m1, 3333);
message_set_seq(m1, &seq);


auto m2 = message_new_from_header(p, m1->msgbufp);
CHECK(m2->header.seq == 3333);
CHECK(m2->header.seq == 3334);
CHECK(seq == 3334);
CHECK(m2->msgbuflen == m1->msgbuflen);
memcpy(m2->msgbufp, m1->msgbufp, m1->msgbuflen);
m2->nhdrs = parse_hdrs(m2->headers, m2->header.headers_len, &m2->hdrs);
Expand Down Expand Up @@ -116,13 +120,15 @@ TEST_CASE("large unpooled", "[model]") {
.value = (uint8_t *) "bar"
},
};
uint32_t seq = 3333;
auto content1 = "this is a very long message, it won't fint into the pooled message structure";
auto m1 = message_new(p, ContentTypeData, headers, 2, strlen(content1));
strncpy(reinterpret_cast<char *>(m1->body), content1, strlen(content1));
message_set_seq(m1, 3333);
message_set_seq(m1, &seq);

auto m2 = message_new_from_header(nullptr, m1->msgbufp);
CHECK(m2->header.seq == 3333);
CHECK(m2->header.seq == 3334);
CHECK(seq == 3334);
CHECK(m2->msgbuflen == m1->msgbuflen);
memcpy(m2->msgbufp, m1->msgbufp, m1->msgbuflen);
m2->nhdrs = parse_hdrs(m2->headers, m2->header.headers_len, &m2->hdrs);
Expand Down
2 changes: 1 addition & 1 deletion tests/ziti_src_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ TEST_CASE("httpbin.ziti:ziti_src", "[integ]") {
auto t = (source_test*)ctx;
t->code = resp->code;

resp->body_cb = [](tlsuv_http_req_t *req, const char *body, ssize_t len){
resp->body_cb = [](tlsuv_http_req_t *req, char *body, ssize_t len){
auto t = (source_test*)req->data;
if (len > 0)
t->body.append(body, len);
Expand Down

0 comments on commit dd8a94b

Please sign in to comment.