Skip to content

Commit

Permalink
set channel readability before each IO phase
Browse files Browse the repository at this point in the history
this enables backpressure when application cannot keep up with incoming data
  • Loading branch information
ekoby committed Dec 14, 2023
1 parent f9ed9f3 commit 98d6995
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 15 deletions.
2 changes: 1 addition & 1 deletion deps/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ else ()

FetchContent_Declare(tlsuv
GIT_REPOSITORY https://github.com/openziti/tlsuv.git
GIT_TAG v0.26.1
GIT_TAG v0.28.0
)
FetchContent_MakeAvailable(tlsuv)

Expand Down
4 changes: 3 additions & 1 deletion inc_internal/zt_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ struct ziti_ctx {

uv_timer_t *api_session_timer;
uv_timer_t *service_refresh_timer;
uv_prepare_t *reaper;
uv_prepare_t *prepper;

uv_loop_t *loop;
uv_thread_t loop_thread;
Expand Down Expand Up @@ -329,6 +329,8 @@ uint64_t ziti_channel_latency(ziti_channel_t *ch);

int ziti_channel_connect(ziti_context ztx, const char *name, const char *url, ch_connect_cb, void *ctx);

int ziti_channel_prepare(ziti_channel_t *ch);

int ziti_channel_close(ziti_channel_t *ch, int err);

void ziti_channel_add_receiver(ziti_channel_t *ch, int id, void *receiver, void (*receive_f)(void *, message *, int));
Expand Down
26 changes: 24 additions & 2 deletions library/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ static void ch_connect_timeout(uv_timer_t *t);

static void hello_reply_cb(void *ctx, message *msg, int err);

static void channel_alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
static void on_channel_data(uv_stream_t *s, ssize_t len, const uv_buf_t *buf);
static void process_inbound(ziti_channel_t *ch);

// global channel sequence
static uint32_t channel_counter = 0;

Expand Down Expand Up @@ -118,6 +122,23 @@ static void ch_init_stream(ziti_channel_t *ch) {
}
}

int ziti_channel_prepare(ziti_channel_t *ch) {
process_inbound(ch);

// process_inbound() may consume all message buffers from the pool,
// but it will put ziti connection(s) into `flush` state
// activating uv_idle_t handle, causing zero-timeout IO
// and a flush attempt on the next loop iteration
if (ch->state == Connected) {
if (pool_has_available(ch->in_msg_pool) || ch->in_next != NULL) {
tlsuv_stream_read_start(ch->connection, channel_alloc_cb, on_channel_data);
} else {
tlsuv_stream_read_stop(ch->connection);
}
}
return 0;
}

static int ziti_channel_init(struct ziti_ctx *ctx, ziti_channel_t *ch, uint32_t id, tls_context *tls) {
ch->ctx = ctx;
ch->loop = ctx->loop;
Expand Down Expand Up @@ -823,13 +844,14 @@ static void channel_alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_
}

static void on_channel_data(uv_stream_t *s, ssize_t len, const uv_buf_t *buf) {
tlsuv_stream_t *mbed = (tlsuv_stream_t *) s;
ziti_channel_t *ch = mbed->data;
tlsuv_stream_t *ssl = (tlsuv_stream_t *) s;
ziti_channel_t *ch = ssl->data;

if (len < 0) {
free(buf->base);
switch (len) {
case UV_ENOBUFS:
tlsuv_stream_read_stop(ssl);
CH_LOG(VERBOSE, "blocked until messages are processed");
return;
case UV_EOF:
Expand Down
42 changes: 31 additions & 11 deletions library/ziti.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ static void ziti_init_async(ziti_context ztx, void *data);

static void ziti_re_auth(ziti_context ztx);

static void grim_reaper(uv_prepare_t *p);
static void ztx_prepare(uv_prepare_t *prep);
static void grim_reaper(ziti_context ztx);

static void ztx_work_async(uv_async_t *ar);

Expand Down Expand Up @@ -419,7 +420,7 @@ uv_timer_t* new_ztx_timer(ziti_context ztx) {
static void ziti_start_internal(ziti_context ztx, void *init_req) {
if (!ztx->enabled) {
ztx->enabled = true;
uv_prepare_start(ztx->reaper, grim_reaper);
uv_prepare_start(ztx->prepper, ztx_prepare);
ziti_ctrl_get_version(&ztx->controller, version_cb, ztx);
ziti_set_unauthenticated(ztx);

Expand Down Expand Up @@ -464,10 +465,10 @@ static void ziti_init_async(ziti_context ztx, void *data) {
ztx->api_session_timer = new_ztx_timer(ztx);
ztx->service_refresh_timer = new_ztx_timer(ztx);

ztx->reaper = calloc(1, sizeof(uv_prepare_t));
uv_prepare_init(loop, ztx->reaper);
ztx->reaper->data = ztx;
uv_unref((uv_handle_t *) ztx->reaper);
ztx->prepper = calloc(1, sizeof(uv_prepare_t));
uv_prepare_init(loop, ztx->prepper);
ztx->prepper->data = ztx;
uv_unref((uv_handle_t *) ztx->prepper);

ZTX_LOG(DEBUG, "using metrics interval: %d", (int) ztx->opts.metrics_type);
metrics_rate_init(&ztx->up_rate, ztx->opts.metrics_type);
Expand Down Expand Up @@ -583,8 +584,8 @@ static void shutdown_and_free(ziti_context ztx) {
return;
}

grim_reaper(ztx->reaper);
CLOSE_AND_NULL(ztx->reaper);
grim_reaper(ztx);
CLOSE_AND_NULL(ztx->prepper);
CLOSE_AND_NULL(ztx->api_session_timer);
CLOSE_AND_NULL(ztx->service_refresh_timer);

Expand Down Expand Up @@ -1714,15 +1715,13 @@ const ziti_version *ziti_get_version() {
return &sdk_version;
}

static void grim_reaper(uv_prepare_t *p) {
ziti_context ztx = p->data;
static void grim_reaper(ziti_context ztx) {

size_t total = model_map_size(&ztx->connections);
size_t count = 0;

if (total == 0 && !ztx->enabled) {
// context disabled and no connections
uv_prepare_stop(p);
return;
}

Expand All @@ -1738,6 +1737,27 @@ static void grim_reaper(uv_prepare_t *p) {
}
}

void ztx_prepare(uv_prepare_t *prep) {
ziti_context ztx = prep->data;

grim_reaper(ztx);

// prepare channels for IO
// NOTE: stalled ziti connections are flushed with idle handlers,
// which run before prepare, which means that message
// buffers could be returned to their corresponding channels
// therefore enabling channel read if it was blocked
const char *id;
ziti_channel_t *ch;
MODEL_MAP_FOREACH(id, ch, &ztx->channels) {
ziti_channel_prepare(ch);
}

if (!ztx->enabled) {
uv_prepare_stop(ztx->prepper);
}
}

void ziti_on_channel_event(ziti_channel_t *ch, ziti_router_status status, ziti_context ztx) {
ziti_event_t ev = {
.type = ZitiRouterEvent,
Expand Down

0 comments on commit 98d6995

Please sign in to comment.