Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve throughput performance #3 (rx buffer focus) #749

Merged
merged 12 commits into from
Oct 21, 2024
2 changes: 1 addition & 1 deletion examples/unix/c11/z_get_attachment.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ int main(int argc, char **argv) {

ze_owned_serializer_t serializer;
ze_serializer_empty(&serializer);
ze_serializer_serialize_sequence_length(z_loan_mut(serializer), 2);
ze_serializer_serialize_sequence_length(z_loan_mut(serializer), 1);
for (size_t i = 0; i < 1; ++i) {
ze_serializer_serialize_string(z_loan_mut(serializer), z_loan(kvs[i].key));
ze_serializer_serialize_string(z_loan_mut(serializer), z_loan(kvs[i].value));
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/collections/arc_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ static inline _z_arc_slice_t _z_arc_slice_empty(void) { return (_z_arc_slice_t){
static inline size_t _z_arc_slice_len(const _z_arc_slice_t* s) { return s->len; }
static inline bool _z_arc_slice_is_empty(const _z_arc_slice_t* s) { return _z_arc_slice_len(s) == 0; }
_z_arc_slice_t _z_arc_slice_wrap(_z_slice_t s, size_t offset, size_t len);
_z_arc_slice_t _z_arc_slice_wrap_slice_rc(_z_slice_simple_rc_t* slice_rc, size_t offset, size_t len);
_z_arc_slice_t _z_arc_slice_get_subslice(const _z_arc_slice_t* s, size_t offset, size_t len);
const uint8_t* _z_arc_slice_data(const _z_arc_slice_t* s);
z_result_t _z_arc_slice_copy(_z_arc_slice_t* dst, const _z_arc_slice_t* src);
Expand Down
3 changes: 3 additions & 0 deletions include/zenoh-pico/collections/refcount.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ size_t _z_simple_rc_strong_count(void *cnt);
*p = name##_simple_rc_null(); \
return res; \
} \
static inline size_t name##_simple_rc_count(const name##_simple_rc_t *p) { \
return _z_simple_rc_strong_count(p->_cnt); \
} \
static inline size_t name##_simple_rc_size(name##_simple_rc_t *p) { \
_ZP_UNUSED(p); \
return sizeof(name##_simple_rc_t); \
Expand Down
5 changes: 3 additions & 2 deletions include/zenoh-pico/net/reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ _z_reply_t _z_reply_move(_z_reply_t *src_reply);
void _z_reply_clear(_z_reply_t *src);
void _z_reply_free(_z_reply_t **hello);
z_result_t _z_reply_copy(_z_reply_t *dst, const _z_reply_t *src);
_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_id_t id, const _z_bytes_t payload, const _z_timestamp_t *timestamp,
_z_encoding_t *encoding, z_sample_kind_t kind, const _z_bytes_t attachment);
_z_reply_t _z_reply_create(_z_keyexpr_t *keyexpr, _z_id_t id, const _z_bytes_t *payload,
const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind,
const _z_bytes_t *attachment);
_z_reply_t _z_reply_err_create(const _z_bytes_t payload, _z_encoding_t *encoding);

typedef struct _z_pending_reply_t {
Expand Down
5 changes: 5 additions & 0 deletions include/zenoh-pico/protocol/iobuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <stddef.h>
#include <stdint.h>

#include "zenoh-pico/collections/arc_slice.h"
#include "zenoh-pico/collections/element.h"
#include "zenoh-pico/collections/slice.h"
#include "zenoh-pico/collections/vec.h"
Expand All @@ -40,6 +41,7 @@ _z_iosli_t _z_iosli_wrap(const uint8_t *buf, size_t length, size_t r_pos, size_t
size_t _z_iosli_readable(const _z_iosli_t *ios);
uint8_t _z_iosli_read(_z_iosli_t *ios);
void _z_iosli_read_bytes(_z_iosli_t *ios, uint8_t *dest, size_t offset, size_t length);
void _z_iosli_copy_bytes(_z_iosli_t *dst, const _z_iosli_t *src);
uint8_t _z_iosli_get(const _z_iosli_t *ios, size_t pos);

size_t _z_iosli_writable(const _z_iosli_t *ios);
Expand All @@ -62,8 +64,10 @@ _Z_VEC_DEFINE(_z_iosli, _z_iosli_t)
/*------------------ ZBuf ------------------*/
typedef struct {
_z_iosli_t _ios;
_z_slice_simple_rc_t _slice;
} _z_zbuf_t;

static inline size_t _z_zbuf_get_ref_count(const _z_zbuf_t *zbf) { return _z_slice_simple_rc_count(&zbf->_slice); }
_z_zbuf_t _z_zbuf_make(size_t capacity);
_z_zbuf_t _z_zbuf_view(_z_zbuf_t *zbf, size_t length);
/// Constructs a _borrowing_ reader on `slice`
Expand All @@ -72,6 +76,7 @@ _z_zbuf_t _z_slice_as_zbuf(_z_slice_t slice);
size_t _z_zbuf_capacity(const _z_zbuf_t *zbf);
uint8_t const *_z_zbuf_start(const _z_zbuf_t *zbf);
size_t _z_zbuf_len(const _z_zbuf_t *zbf);
void _z_zbuf_copy_bytes(_z_zbuf_t *dst, const _z_zbuf_t *src);
bool _z_zbuf_can_read(const _z_zbuf_t *zbf);
size_t _z_zbuf_space_left(const _z_zbuf_t *zbf);

Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/transport/multicast/rx.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@
z_result_t _z_multicast_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_slice_t *addr);
z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg,
_z_slice_t *addr);
z_result_t _z_multicast_update_rx_buffer(_z_transport_multicast_t *ztm);

#endif /* ZENOH_PICO_TRANSPORT_LINK_RX_H */
1 change: 1 addition & 0 deletions include/zenoh-pico/transport/raweth/rx.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@

z_result_t _z_raweth_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_slice_t *addr);
z_result_t _z_raweth_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_slice_t *addr);
z_result_t _z_raweth_update_rx_buff(_z_transport_multicast_t *ztm);

#endif /* ZENOH_PICO_RAWETH_RX_H */
1 change: 1 addition & 0 deletions include/zenoh-pico/transport/unicast/rx.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@
z_result_t _z_unicast_recv_t_msg(_z_transport_unicast_t *ztu, _z_transport_message_t *t_msg);
z_result_t _z_unicast_recv_t_msg_na(_z_transport_unicast_t *ztu, _z_transport_message_t *t_msg);
z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_transport_message_t *t_msg);
z_result_t _z_unicast_update_rx_buffer(_z_transport_unicast_t *ztu);

#endif /* ZENOH_PICO_UNICAST_RX_H */
12 changes: 12 additions & 0 deletions src/collections/arc_slice.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ _z_arc_slice_t _z_arc_slice_wrap(_z_slice_t s, size_t offset, size_t len) {
return arc_s;
}

_z_arc_slice_t _z_arc_slice_wrap_slice_rc(_z_slice_simple_rc_t* slice_rc, size_t offset, size_t len) {
assert(offset + len <= _Z_RC_IN_VAL(slice_rc)->len);
_z_arc_slice_t arc_s;
arc_s.slice = _z_slice_simple_rc_clone(slice_rc);
if (_Z_RC_IS_NULL(&arc_s.slice)) {
return _z_arc_slice_empty();
}
arc_s.len = len;
arc_s.start = offset;
return arc_s;
}

_z_arc_slice_t _z_arc_slice_get_subslice(const _z_arc_slice_t* s, size_t offset, size_t len) {
assert(offset + len <= s->len);
assert(!_Z_RC_IS_NULL(&s->slice) || (len == 0 && offset == 0));
Expand Down
16 changes: 9 additions & 7 deletions src/net/reply.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,19 @@ void _z_pending_reply_clear(_z_pending_reply_t *pr) {
_z_timestamp_clear(&pr->_tstamp);
}

_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_id_t id, const _z_bytes_t payload, const _z_timestamp_t *timestamp,
_z_encoding_t *encoding, z_sample_kind_t kind, const _z_bytes_t attachment) {
_z_reply_t _z_reply_create(_z_keyexpr_t *keyexpr, _z_id_t id, const _z_bytes_t *payload,
const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind,
const _z_bytes_t *attachment) {
_z_reply_t reply = _z_reply_null();
reply.data._tag = _Z_REPLY_TAG_DATA;
reply.data.replier_id = id;

// Create reply sample
reply.data._result.sample.keyexpr = _z_keyexpr_steal(&keyexpr);
reply.data._result.sample.keyexpr = _z_keyexpr_steal(keyexpr);
reply.data._result.sample.kind = kind;
reply.data._result.sample.timestamp = _z_timestamp_duplicate(timestamp);
_z_bytes_copy(&reply.data._result.sample.payload, &payload);
_z_bytes_copy(&reply.data._result.sample.attachment, &attachment);
_z_bytes_copy(&reply.data._result.sample.payload, payload);
_z_bytes_copy(&reply.data._result.sample.attachment, attachment);
_z_encoding_move(&reply.data._result.sample.encoding, encoding);

return reply;
Expand All @@ -112,8 +113,9 @@ _z_reply_t _z_reply_err_create(const _z_bytes_t payload, _z_encoding_t *encoding
return reply;
}
#else
_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_id_t id, const _z_bytes_t payload, const _z_timestamp_t *timestamp,
_z_encoding_t *encoding, z_sample_kind_t kind, const _z_bytes_t attachment) {
_z_reply_t _z_reply_create(_z_keyexpr_t *keyexpr, _z_id_t id, const _z_bytes_t *payload,
const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind,
const _z_bytes_t *attachment) {
_ZP_UNUSED(keyexpr);
_ZP_UNUSED(id);
_ZP_UNUSED(payload);
Expand Down
13 changes: 8 additions & 5 deletions src/protocol/codec.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/utils/endianness.h"
#include "zenoh-pico/utils/logging.h"
#include "zenoh-pico/utils/pointers.h"
#include "zenoh-pico/utils/result.h"

/*------------------ uint8 -------------------*/
Expand Down Expand Up @@ -281,13 +282,15 @@ z_result_t _z_slice_val_decode(_z_slice_t *bs, _z_zbuf_t *zbf) { return _z_slice
z_result_t _z_slice_decode(_z_slice_t *bs, _z_zbuf_t *zbf) { return _z_slice_decode_na(bs, zbf); }

z_result_t _z_bytes_decode(_z_bytes_t *bs, _z_zbuf_t *zbf) {
*bs = _z_bytes_null();
// Decode slice
_z_slice_t s;
_Z_RETURN_IF_ERR(_z_slice_decode(&s, zbf));
if (_z_slice_is_alloced(&s)) {
return _z_bytes_from_slice(bs, s);
} else {
return _z_bytes_from_buf(bs, s.start, s.len);
}
// Calc offset
size_t offset = _z_ptr_u8_diff(s.start, _Z_RC_IN_VAL(&zbf->_slice)->start);
// Get ownership of subslice
_z_arc_slice_t arcs = _z_arc_slice_wrap_slice_rc(&zbf->_slice, offset, s.len);
return _z_bytes_append_slice(bs, &arcs);
}

z_result_t _z_bytes_encode_val(_z_wbuf_t *wbf, const _z_bytes_t *bs) {
Expand Down
28 changes: 26 additions & 2 deletions src/protocol/iobuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <string.h>

#include "zenoh-pico/config.h"
#include "zenoh-pico/utils/logging.h"
#include "zenoh-pico/utils/pointers.h"
#include "zenoh-pico/utils/result.h"

Expand Down Expand Up @@ -75,6 +76,13 @@ void _z_iosli_read_bytes(_z_iosli_t *ios, uint8_t *dst, size_t offset, size_t le
ios->_r_pos = ios->_r_pos + length;
}

void _z_iosli_copy_bytes(_z_iosli_t *dst, const _z_iosli_t *src) {
size_t length = _z_iosli_readable(src);
assert(_z_iosli_readable(dst) >= length);
(void)memcpy(dst->_buf + dst->_w_pos, src->_buf + src->_r_pos, length);
dst->_w_pos += length;
}

uint8_t _z_iosli_get(const _z_iosli_t *ios, size_t pos) {
assert(pos < ios->_capacity);
return ios->_buf[pos];
Expand Down Expand Up @@ -160,15 +168,26 @@ _z_iosli_t *_z_iosli_clone(const _z_iosli_t *src) {

/*------------------ ZBuf ------------------*/
_z_zbuf_t _z_zbuf_make(size_t capacity) {
_z_zbuf_t zbf;
_z_zbuf_t zbf = {0};
zbf._ios = _z_iosli_make(capacity);
if (_z_zbuf_capacity(&zbf) == 0) {
return zbf;
}
_z_slice_t s = _z_slice_from_buf_custom_deleter(zbf._ios._buf, zbf._ios._capacity, _z_delete_context_default());
zbf._slice = _z_slice_simple_rc_new_from_val(&s);
if (_Z_RC_IS_NULL(&zbf._slice)) {
_Z_ERROR("slice rc creation failed");
_z_iosli_clear(&zbf._ios);
}
zbf._ios._is_alloc = false;
return zbf;
}

_z_zbuf_t _z_zbuf_view(_z_zbuf_t *zbf, size_t length) {
assert(_z_iosli_readable(&zbf->_ios) >= length);
_z_zbuf_t v;
v._ios = _z_iosli_wrap(_z_zbuf_get_rptr(zbf), length, 0, length);
v._slice = zbf->_slice;
return v;
}
_z_zbuf_t _z_slice_as_zbuf(_z_slice_t slice) {
Expand All @@ -188,6 +207,8 @@ uint8_t const *_z_zbuf_start(const _z_zbuf_t *zbf) {
}
size_t _z_zbuf_len(const _z_zbuf_t *zbf) { return _z_iosli_readable(&zbf->_ios); }

void _z_zbuf_copy_bytes(_z_zbuf_t *dst, const _z_zbuf_t *src) { _z_iosli_copy_bytes(&dst->_ios, &src->_ios); }

bool _z_zbuf_can_read(const _z_zbuf_t *zbf) { return _z_zbuf_len(zbf) > (size_t)0; }

uint8_t _z_zbuf_read(_z_zbuf_t *zbf) { return _z_iosli_read(&zbf->_ios); }
Expand Down Expand Up @@ -218,7 +239,10 @@ uint8_t *_z_zbuf_get_wptr(const _z_zbuf_t *zbf) { return zbf->_ios._buf + zbf->_

void _z_zbuf_reset(_z_zbuf_t *zbf) { _z_iosli_reset(&zbf->_ios); }

void _z_zbuf_clear(_z_zbuf_t *zbf) { _z_iosli_clear(&zbf->_ios); }
void _z_zbuf_clear(_z_zbuf_t *zbf) {
_z_iosli_clear(&zbf->_ios);
_z_slice_simple_rc_drop(&zbf->_slice);
}

void _z_zbuf_compact(_z_zbuf_t *zbf) {
if ((zbf->_ios._r_pos != 0) || (zbf->_ios._w_pos != 0)) {
Expand Down
4 changes: 2 additions & 2 deletions src/session/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ z_result_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id,
}

// Build the reply
_z_reply_t reply = _z_reply_create(expanded_ke, zn->_local_zid, msg->_payload, &msg->_commons._timestamp,
&msg->_encoding, kind, msg->_attachment);
_z_reply_t reply = _z_reply_create(&expanded_ke, zn->_local_zid, &msg->_payload, &msg->_commons._timestamp,
&msg->_encoding, kind, &msg->_attachment);

bool drop = false;
// Verify if this is a newer reply, free the old one in case it is
Expand Down
20 changes: 12 additions & 8 deletions src/transport/multicast/read.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ z_result_t _zp_multicast_read(_z_transport_multicast_t *ztm) {
ret = _z_multicast_handle_transport_message(ztm, &t_msg, &addr);
_z_t_msg_clear(&t_msg);
}

ret = _z_multicast_update_rx_buffer(ztm);
if (ret != _Z_RES_OK) {
_Z_ERROR("Failed to allocate rx buffer");
}
return ret;
}
#else
Expand All @@ -57,11 +60,11 @@ void *_zp_multicast_read_task(void *ztm_arg) {
// Prepare the buffer
_z_zbuf_reset(&ztm->_zbuf);

_z_slice_t addr = _z_slice_alias_buf(NULL, 0);
_z_slice_t addr = _z_slice_empty();
while (ztm->_read_task_running == true) {
// Read bytes from socket to the main buffer
size_t to_read = 0;

// Read bytes from socket to the main buffer
switch (ztm->_link._cap._flow) {
case Z_LINK_CAP_FLOW_STREAM:
if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) {
Expand Down Expand Up @@ -94,15 +97,13 @@ void *_zp_multicast_read_task(void *ztm_arg) {
default:
break;
}
// Wrap the main buffer for to_read bytes
// Wrap the main buffer to_read bytes
_z_zbuf_t zbuf = _z_zbuf_view(&ztm->_zbuf, to_read);

while (_z_zbuf_len(&zbuf) > 0) {
z_result_t ret = _Z_RES_OK;

// Decode one session message
_z_transport_message_t t_msg;
ret = _z_transport_message_decode(&t_msg, &zbuf);
z_result_t ret = _z_transport_message_decode(&t_msg, &zbuf);
if (ret == _Z_RES_OK) {
ret = _z_multicast_handle_transport_message(ztm, &t_msg, &addr);

Expand All @@ -119,9 +120,12 @@ void *_zp_multicast_read_task(void *ztm_arg) {
continue;
}
}

// Move the read position of the read buffer
_z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) + to_read);
if (_z_multicast_update_rx_buffer(ztm) != _Z_RES_OK) {
_Z_ERROR("Connection closed due to lack of memory to allocate rx buffer");
ztm->_read_task_running = false;
}
}
_z_mutex_unlock(&ztm->_mutex_rx);
return NULL;
Expand Down
21 changes: 21 additions & 0 deletions src/transport/multicast/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,27 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm,

return ret;
}

z_result_t _z_multicast_update_rx_buffer(_z_transport_multicast_t *ztm) {
// Check if user or defragment buffer took ownership of buffer
if (_z_zbuf_get_ref_count(&ztm->_zbuf) != 1) {
// Allocate a new buffer
_z_zbuf_t new_zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE);
if (_z_zbuf_capacity(&new_zbuf) != Z_BATCH_MULTICAST_SIZE) {
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
}
// Recopy leftover bytes
size_t leftovers = _z_zbuf_len(&ztm->_zbuf);
if (leftovers > 0) {
_z_zbuf_copy_bytes(&new_zbuf, &ztm->_zbuf);
}
// Drop buffer & update
_z_zbuf_clear(&ztm->_zbuf);
ztm->_zbuf = new_zbuf;
}
return _Z_RES_OK;
}

#else
z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg,
_z_slice_t *addr) {
Expand Down
8 changes: 8 additions & 0 deletions src/transport/raweth/read.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ z_result_t _zp_raweth_read(_z_transport_multicast_t *ztm) {
_z_t_msg_clear(&t_msg);
}
_z_slice_clear(&addr);
ret = _z_raweth_update_rx_buff(ztm);
if (ret != _Z_RES_OK) {
_Z_ERROR("Failed to allocate rx buffer");
}
return ret;
}
#else
Expand Down Expand Up @@ -85,6 +89,10 @@ void *_zp_raweth_read_task(void *ztm_arg) {
}
_z_t_msg_clear(&t_msg);
_z_slice_clear(&addr);
if (_z_raweth_update_rx_buff(ztm) != _Z_RES_OK) {
_Z_ERROR("Connection closed due to lack of memory to allocate rx buffer");
ztm->_read_task_running = false;
}
}
return NULL;
}
Expand Down
20 changes: 20 additions & 0 deletions src/transport/raweth/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,26 @@ z_result_t _z_raweth_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_mess
return _z_raweth_recv_t_msg_na(ztm, t_msg, addr);
}

z_result_t _z_raweth_update_rx_buff(_z_transport_multicast_t *ztm) {
// Check if user or defragment buffer took ownership of buffer
if (_z_zbuf_get_ref_count(&ztm->_zbuf) != 1) {
// Allocate a new buffer
_z_zbuf_t new_zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE);
jean-roland marked this conversation as resolved.
Show resolved Hide resolved
if (_z_zbuf_capacity(&new_zbuf) != Z_BATCH_MULTICAST_SIZE) {
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
}
// Recopy leftover bytes
size_t leftovers = _z_zbuf_len(&ztm->_zbuf);
if (leftovers > 0) {
_z_zbuf_copy_bytes(&new_zbuf, &ztm->_zbuf);
}
// Drop buffer & update
_z_zbuf_clear(&ztm->_zbuf);
ztm->_zbuf = new_zbuf;
}
return _Z_RES_OK;
}

#else
z_result_t _z_raweth_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_slice_t *addr) {
_ZP_UNUSED(ztm);
Expand Down
Loading
Loading