diff --git a/examples/unix/c11/z_get_attachment.c b/examples/unix/c11/z_get_attachment.c index 842273fc2..65404138b 100644 --- a/examples/unix/c11/z_get_attachment.c +++ b/examples/unix/c11/z_get_attachment.c @@ -178,7 +178,7 @@ int main(int argc, char **argv) { ze_owned_serializer_t serializer; ze_serializer_empty(&serializer); - ze_serializer_serialize_sequence_length(z_loan_mut(serializer), 2); + ze_serializer_serialize_sequence_length(z_loan_mut(serializer), 1); for (size_t i = 0; i < 1; ++i) { ze_serializer_serialize_string(z_loan_mut(serializer), z_loan(kvs[i].key)); ze_serializer_serialize_string(z_loan_mut(serializer), z_loan(kvs[i].value)); diff --git a/include/zenoh-pico/collections/arc_slice.h b/include/zenoh-pico/collections/arc_slice.h index f74d9f969..ebc21bb56 100644 --- a/include/zenoh-pico/collections/arc_slice.h +++ b/include/zenoh-pico/collections/arc_slice.h @@ -46,6 +46,7 @@ static inline _z_arc_slice_t _z_arc_slice_empty(void) { return (_z_arc_slice_t){ static inline size_t _z_arc_slice_len(const _z_arc_slice_t* s) { return s->len; } static inline bool _z_arc_slice_is_empty(const _z_arc_slice_t* s) { return _z_arc_slice_len(s) == 0; } _z_arc_slice_t _z_arc_slice_wrap(_z_slice_t s, size_t offset, size_t len); +_z_arc_slice_t _z_arc_slice_wrap_slice_rc(_z_slice_simple_rc_t* slice_rc, size_t offset, size_t len); _z_arc_slice_t _z_arc_slice_get_subslice(const _z_arc_slice_t* s, size_t offset, size_t len); const uint8_t* _z_arc_slice_data(const _z_arc_slice_t* s); z_result_t _z_arc_slice_copy(_z_arc_slice_t* dst, const _z_arc_slice_t* src); diff --git a/include/zenoh-pico/collections/refcount.h b/include/zenoh-pico/collections/refcount.h index 644d167f9..bcbd8f411 100644 --- a/include/zenoh-pico/collections/refcount.h +++ b/include/zenoh-pico/collections/refcount.h @@ -244,6 +244,9 @@ size_t _z_simple_rc_strong_count(void *cnt); *p = name##_simple_rc_null(); \ return res; \ } \ + static inline size_t name##_simple_rc_count(const name##_simple_rc_t *p) { \ + return _z_simple_rc_strong_count(p->_cnt); \ + } \ static inline size_t name##_simple_rc_size(name##_simple_rc_t *p) { \ _ZP_UNUSED(p); \ return sizeof(name##_simple_rc_t); \ diff --git a/include/zenoh-pico/net/reply.h b/include/zenoh-pico/net/reply.h index c68fcf678..3e1a1994b 100644 --- a/include/zenoh-pico/net/reply.h +++ b/include/zenoh-pico/net/reply.h @@ -89,8 +89,9 @@ _z_reply_t _z_reply_move(_z_reply_t *src_reply); void _z_reply_clear(_z_reply_t *src); void _z_reply_free(_z_reply_t **hello); z_result_t _z_reply_copy(_z_reply_t *dst, const _z_reply_t *src); -_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_id_t id, const _z_bytes_t payload, const _z_timestamp_t *timestamp, - _z_encoding_t *encoding, z_sample_kind_t kind, const _z_bytes_t attachment); +_z_reply_t _z_reply_create(_z_keyexpr_t *keyexpr, _z_id_t id, const _z_bytes_t *payload, + const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind, + const _z_bytes_t *attachment); _z_reply_t _z_reply_err_create(const _z_bytes_t payload, _z_encoding_t *encoding); typedef struct _z_pending_reply_t { diff --git a/include/zenoh-pico/protocol/iobuf.h b/include/zenoh-pico/protocol/iobuf.h index 888b69d46..7f1ebf722 100644 --- a/include/zenoh-pico/protocol/iobuf.h +++ b/include/zenoh-pico/protocol/iobuf.h @@ -19,6 +19,7 @@ #include #include +#include "zenoh-pico/collections/arc_slice.h" #include "zenoh-pico/collections/element.h" #include "zenoh-pico/collections/slice.h" #include "zenoh-pico/collections/vec.h" @@ -40,6 +41,7 @@ _z_iosli_t _z_iosli_wrap(const uint8_t *buf, size_t length, size_t r_pos, size_t size_t _z_iosli_readable(const _z_iosli_t *ios); uint8_t _z_iosli_read(_z_iosli_t *ios); void _z_iosli_read_bytes(_z_iosli_t *ios, uint8_t *dest, size_t offset, size_t length); +void _z_iosli_copy_bytes(_z_iosli_t *dst, const _z_iosli_t *src); uint8_t _z_iosli_get(const _z_iosli_t *ios, size_t pos); size_t _z_iosli_writable(const _z_iosli_t *ios); @@ -62,8 +64,10 @@ _Z_VEC_DEFINE(_z_iosli, _z_iosli_t) /*------------------ ZBuf ------------------*/ typedef struct { _z_iosli_t _ios; + _z_slice_simple_rc_t _slice; } _z_zbuf_t; +static inline size_t _z_zbuf_get_ref_count(const _z_zbuf_t *zbf) { return _z_slice_simple_rc_count(&zbf->_slice); } _z_zbuf_t _z_zbuf_make(size_t capacity); _z_zbuf_t _z_zbuf_view(_z_zbuf_t *zbf, size_t length); /// Constructs a _borrowing_ reader on `slice` @@ -72,6 +76,7 @@ _z_zbuf_t _z_slice_as_zbuf(_z_slice_t slice); size_t _z_zbuf_capacity(const _z_zbuf_t *zbf); uint8_t const *_z_zbuf_start(const _z_zbuf_t *zbf); size_t _z_zbuf_len(const _z_zbuf_t *zbf); +void _z_zbuf_copy_bytes(_z_zbuf_t *dst, const _z_zbuf_t *src); bool _z_zbuf_can_read(const _z_zbuf_t *zbf); size_t _z_zbuf_space_left(const _z_zbuf_t *zbf); diff --git a/include/zenoh-pico/transport/multicast/rx.h b/include/zenoh-pico/transport/multicast/rx.h index 31a10e64a..97b1e8960 100644 --- a/include/zenoh-pico/transport/multicast/rx.h +++ b/include/zenoh-pico/transport/multicast/rx.h @@ -20,5 +20,6 @@ z_result_t _z_multicast_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_slice_t *addr); z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_slice_t *addr); +z_result_t _z_multicast_update_rx_buffer(_z_transport_multicast_t *ztm); #endif /* ZENOH_PICO_TRANSPORT_LINK_RX_H */ diff --git a/include/zenoh-pico/transport/raweth/rx.h b/include/zenoh-pico/transport/raweth/rx.h index 81874d633..4607dd6b2 100644 --- a/include/zenoh-pico/transport/raweth/rx.h +++ b/include/zenoh-pico/transport/raweth/rx.h @@ -19,5 +19,6 @@ z_result_t _z_raweth_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_slice_t *addr); z_result_t _z_raweth_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_slice_t *addr); +z_result_t _z_raweth_update_rx_buff(_z_transport_multicast_t *ztm); #endif /* ZENOH_PICO_RAWETH_RX_H */ diff --git a/include/zenoh-pico/transport/unicast/rx.h b/include/zenoh-pico/transport/unicast/rx.h index f36cb592c..1fa5b60d7 100644 --- a/include/zenoh-pico/transport/unicast/rx.h +++ b/include/zenoh-pico/transport/unicast/rx.h @@ -20,5 +20,6 @@ z_result_t _z_unicast_recv_t_msg(_z_transport_unicast_t *ztu, _z_transport_message_t *t_msg); z_result_t _z_unicast_recv_t_msg_na(_z_transport_unicast_t *ztu, _z_transport_message_t *t_msg); z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_transport_message_t *t_msg); +z_result_t _z_unicast_update_rx_buffer(_z_transport_unicast_t *ztu); #endif /* ZENOH_PICO_UNICAST_RX_H */ diff --git a/src/collections/arc_slice.c b/src/collections/arc_slice.c index 9c5f6c824..17d17a8eb 100644 --- a/src/collections/arc_slice.c +++ b/src/collections/arc_slice.c @@ -32,6 +32,18 @@ _z_arc_slice_t _z_arc_slice_wrap(_z_slice_t s, size_t offset, size_t len) { return arc_s; } +_z_arc_slice_t _z_arc_slice_wrap_slice_rc(_z_slice_simple_rc_t* slice_rc, size_t offset, size_t len) { + assert(offset + len <= _Z_RC_IN_VAL(slice_rc)->len); + _z_arc_slice_t arc_s; + arc_s.slice = _z_slice_simple_rc_clone(slice_rc); + if (_Z_RC_IS_NULL(&arc_s.slice)) { + return _z_arc_slice_empty(); + } + arc_s.len = len; + arc_s.start = offset; + return arc_s; +} + _z_arc_slice_t _z_arc_slice_get_subslice(const _z_arc_slice_t* s, size_t offset, size_t len) { assert(offset + len <= s->len); assert(!_Z_RC_IS_NULL(&s->slice) || (len == 0 && offset == 0)); diff --git a/src/net/reply.c b/src/net/reply.c index a331ae6cf..67987cb35 100644 --- a/src/net/reply.c +++ b/src/net/reply.c @@ -87,18 +87,19 @@ void _z_pending_reply_clear(_z_pending_reply_t *pr) { _z_timestamp_clear(&pr->_tstamp); } -_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_id_t id, const _z_bytes_t payload, const _z_timestamp_t *timestamp, - _z_encoding_t *encoding, z_sample_kind_t kind, const _z_bytes_t attachment) { +_z_reply_t _z_reply_create(_z_keyexpr_t *keyexpr, _z_id_t id, const _z_bytes_t *payload, + const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind, + const _z_bytes_t *attachment) { _z_reply_t reply = _z_reply_null(); reply.data._tag = _Z_REPLY_TAG_DATA; reply.data.replier_id = id; // Create reply sample - reply.data._result.sample.keyexpr = _z_keyexpr_steal(&keyexpr); + reply.data._result.sample.keyexpr = _z_keyexpr_steal(keyexpr); reply.data._result.sample.kind = kind; reply.data._result.sample.timestamp = _z_timestamp_duplicate(timestamp); - _z_bytes_copy(&reply.data._result.sample.payload, &payload); - _z_bytes_copy(&reply.data._result.sample.attachment, &attachment); + _z_bytes_copy(&reply.data._result.sample.payload, payload); + _z_bytes_copy(&reply.data._result.sample.attachment, attachment); _z_encoding_move(&reply.data._result.sample.encoding, encoding); return reply; @@ -112,8 +113,9 @@ _z_reply_t _z_reply_err_create(const _z_bytes_t payload, _z_encoding_t *encoding return reply; } #else -_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_id_t id, const _z_bytes_t payload, const _z_timestamp_t *timestamp, - _z_encoding_t *encoding, z_sample_kind_t kind, const _z_bytes_t attachment) { +_z_reply_t _z_reply_create(_z_keyexpr_t *keyexpr, _z_id_t id, const _z_bytes_t *payload, + const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind, + const _z_bytes_t *attachment) { _ZP_UNUSED(keyexpr); _ZP_UNUSED(id); _ZP_UNUSED(payload); diff --git a/src/protocol/codec.c b/src/protocol/codec.c index 0d6656327..8574d2d89 100644 --- a/src/protocol/codec.c +++ b/src/protocol/codec.c @@ -19,6 +19,7 @@ #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/utils/endianness.h" #include "zenoh-pico/utils/logging.h" +#include "zenoh-pico/utils/pointers.h" #include "zenoh-pico/utils/result.h" /*------------------ uint8 -------------------*/ @@ -281,13 +282,15 @@ z_result_t _z_slice_val_decode(_z_slice_t *bs, _z_zbuf_t *zbf) { return _z_slice z_result_t _z_slice_decode(_z_slice_t *bs, _z_zbuf_t *zbf) { return _z_slice_decode_na(bs, zbf); } z_result_t _z_bytes_decode(_z_bytes_t *bs, _z_zbuf_t *zbf) { + *bs = _z_bytes_null(); + // Decode slice _z_slice_t s; _Z_RETURN_IF_ERR(_z_slice_decode(&s, zbf)); - if (_z_slice_is_alloced(&s)) { - return _z_bytes_from_slice(bs, s); - } else { - return _z_bytes_from_buf(bs, s.start, s.len); - } + // Calc offset + size_t offset = _z_ptr_u8_diff(s.start, _Z_RC_IN_VAL(&zbf->_slice)->start); + // Get ownership of subslice + _z_arc_slice_t arcs = _z_arc_slice_wrap_slice_rc(&zbf->_slice, offset, s.len); + return _z_bytes_append_slice(bs, &arcs); } z_result_t _z_bytes_encode_val(_z_wbuf_t *wbf, const _z_bytes_t *bs) { diff --git a/src/protocol/iobuf.c b/src/protocol/iobuf.c index c398bc766..7b4b361dc 100644 --- a/src/protocol/iobuf.c +++ b/src/protocol/iobuf.c @@ -20,6 +20,7 @@ #include #include "zenoh-pico/config.h" +#include "zenoh-pico/utils/logging.h" #include "zenoh-pico/utils/pointers.h" #include "zenoh-pico/utils/result.h" @@ -75,6 +76,13 @@ void _z_iosli_read_bytes(_z_iosli_t *ios, uint8_t *dst, size_t offset, size_t le ios->_r_pos = ios->_r_pos + length; } +void _z_iosli_copy_bytes(_z_iosli_t *dst, const _z_iosli_t *src) { + size_t length = _z_iosli_readable(src); + assert(_z_iosli_readable(dst) >= length); + (void)memcpy(dst->_buf + dst->_w_pos, src->_buf + src->_r_pos, length); + dst->_w_pos += length; +} + uint8_t _z_iosli_get(const _z_iosli_t *ios, size_t pos) { assert(pos < ios->_capacity); return ios->_buf[pos]; @@ -160,8 +168,18 @@ _z_iosli_t *_z_iosli_clone(const _z_iosli_t *src) { /*------------------ ZBuf ------------------*/ _z_zbuf_t _z_zbuf_make(size_t capacity) { - _z_zbuf_t zbf; + _z_zbuf_t zbf = {0}; zbf._ios = _z_iosli_make(capacity); + if (_z_zbuf_capacity(&zbf) == 0) { + return zbf; + } + _z_slice_t s = _z_slice_from_buf_custom_deleter(zbf._ios._buf, zbf._ios._capacity, _z_delete_context_default()); + zbf._slice = _z_slice_simple_rc_new_from_val(&s); + if (_Z_RC_IS_NULL(&zbf._slice)) { + _Z_ERROR("slice rc creation failed"); + _z_iosli_clear(&zbf._ios); + } + zbf._ios._is_alloc = false; return zbf; } @@ -169,6 +187,7 @@ _z_zbuf_t _z_zbuf_view(_z_zbuf_t *zbf, size_t length) { assert(_z_iosli_readable(&zbf->_ios) >= length); _z_zbuf_t v; v._ios = _z_iosli_wrap(_z_zbuf_get_rptr(zbf), length, 0, length); + v._slice = zbf->_slice; return v; } _z_zbuf_t _z_slice_as_zbuf(_z_slice_t slice) { @@ -188,6 +207,8 @@ uint8_t const *_z_zbuf_start(const _z_zbuf_t *zbf) { } size_t _z_zbuf_len(const _z_zbuf_t *zbf) { return _z_iosli_readable(&zbf->_ios); } +void _z_zbuf_copy_bytes(_z_zbuf_t *dst, const _z_zbuf_t *src) { _z_iosli_copy_bytes(&dst->_ios, &src->_ios); } + bool _z_zbuf_can_read(const _z_zbuf_t *zbf) { return _z_zbuf_len(zbf) > (size_t)0; } uint8_t _z_zbuf_read(_z_zbuf_t *zbf) { return _z_iosli_read(&zbf->_ios); } @@ -218,7 +239,10 @@ uint8_t *_z_zbuf_get_wptr(const _z_zbuf_t *zbf) { return zbf->_ios._buf + zbf->_ void _z_zbuf_reset(_z_zbuf_t *zbf) { _z_iosli_reset(&zbf->_ios); } -void _z_zbuf_clear(_z_zbuf_t *zbf) { _z_iosli_clear(&zbf->_ios); } +void _z_zbuf_clear(_z_zbuf_t *zbf) { + _z_iosli_clear(&zbf->_ios); + _z_slice_simple_rc_drop(&zbf->_slice); +} void _z_zbuf_compact(_z_zbuf_t *zbf) { if ((zbf->_ios._r_pos != 0) || (zbf->_ios._w_pos != 0)) { diff --git a/src/session/query.c b/src/session/query.c index f9546990f..f3401dace 100644 --- a/src/session/query.c +++ b/src/session/query.c @@ -112,8 +112,8 @@ z_result_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, } // Build the reply - _z_reply_t reply = _z_reply_create(expanded_ke, zn->_local_zid, msg->_payload, &msg->_commons._timestamp, - &msg->_encoding, kind, msg->_attachment); + _z_reply_t reply = _z_reply_create(&expanded_ke, zn->_local_zid, &msg->_payload, &msg->_commons._timestamp, + &msg->_encoding, kind, &msg->_attachment); bool drop = false; // Verify if this is a newer reply, free the old one in case it is diff --git a/src/transport/multicast/read.c b/src/transport/multicast/read.c index 66e177597..0a39fe4e1 100644 --- a/src/transport/multicast/read.c +++ b/src/transport/multicast/read.c @@ -36,7 +36,10 @@ z_result_t _zp_multicast_read(_z_transport_multicast_t *ztm) { ret = _z_multicast_handle_transport_message(ztm, &t_msg, &addr); _z_t_msg_clear(&t_msg); } - + ret = _z_multicast_update_rx_buffer(ztm); + if (ret != _Z_RES_OK) { + _Z_ERROR("Failed to allocate rx buffer"); + } return ret; } #else @@ -57,11 +60,11 @@ void *_zp_multicast_read_task(void *ztm_arg) { // Prepare the buffer _z_zbuf_reset(&ztm->_zbuf); - _z_slice_t addr = _z_slice_alias_buf(NULL, 0); + _z_slice_t addr = _z_slice_empty(); while (ztm->_read_task_running == true) { - // Read bytes from socket to the main buffer size_t to_read = 0; + // Read bytes from socket to the main buffer switch (ztm->_link._cap._flow) { case Z_LINK_CAP_FLOW_STREAM: if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { @@ -94,15 +97,13 @@ void *_zp_multicast_read_task(void *ztm_arg) { default: break; } - // Wrap the main buffer for to_read bytes + // Wrap the main buffer to_read bytes _z_zbuf_t zbuf = _z_zbuf_view(&ztm->_zbuf, to_read); while (_z_zbuf_len(&zbuf) > 0) { - z_result_t ret = _Z_RES_OK; - // Decode one session message _z_transport_message_t t_msg; - ret = _z_transport_message_decode(&t_msg, &zbuf); + z_result_t ret = _z_transport_message_decode(&t_msg, &zbuf); if (ret == _Z_RES_OK) { ret = _z_multicast_handle_transport_message(ztm, &t_msg, &addr); @@ -119,9 +120,12 @@ void *_zp_multicast_read_task(void *ztm_arg) { continue; } } - // Move the read position of the read buffer _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) + to_read); + if (_z_multicast_update_rx_buffer(ztm) != _Z_RES_OK) { + _Z_ERROR("Connection closed due to lack of memory to allocate rx buffer"); + ztm->_read_task_running = false; + } } _z_mutex_unlock(&ztm->_mutex_rx); return NULL; diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index 7ab3ad542..2832d2053 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -350,6 +350,27 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, return ret; } + +z_result_t _z_multicast_update_rx_buffer(_z_transport_multicast_t *ztm) { + // Check if user or defragment buffer took ownership of buffer + if (_z_zbuf_get_ref_count(&ztm->_zbuf) != 1) { + // Allocate a new buffer + _z_zbuf_t new_zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); + if (_z_zbuf_capacity(&new_zbuf) != Z_BATCH_MULTICAST_SIZE) { + return _Z_ERR_SYSTEM_OUT_OF_MEMORY; + } + // Recopy leftover bytes + size_t leftovers = _z_zbuf_len(&ztm->_zbuf); + if (leftovers > 0) { + _z_zbuf_copy_bytes(&new_zbuf, &ztm->_zbuf); + } + // Drop buffer & update + _z_zbuf_clear(&ztm->_zbuf); + ztm->_zbuf = new_zbuf; + } + return _Z_RES_OK; +} + #else z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_slice_t *addr) { diff --git a/src/transport/raweth/read.c b/src/transport/raweth/read.c index b5062ebab..4c49d7b64 100644 --- a/src/transport/raweth/read.c +++ b/src/transport/raweth/read.c @@ -37,6 +37,10 @@ z_result_t _zp_raweth_read(_z_transport_multicast_t *ztm) { _z_t_msg_clear(&t_msg); } _z_slice_clear(&addr); + ret = _z_raweth_update_rx_buff(ztm); + if (ret != _Z_RES_OK) { + _Z_ERROR("Failed to allocate rx buffer"); + } return ret; } #else @@ -85,6 +89,10 @@ void *_zp_raweth_read_task(void *ztm_arg) { } _z_t_msg_clear(&t_msg); _z_slice_clear(&addr); + if (_z_raweth_update_rx_buff(ztm) != _Z_RES_OK) { + _Z_ERROR("Connection closed due to lack of memory to allocate rx buffer"); + ztm->_read_task_running = false; + } } return NULL; } diff --git a/src/transport/raweth/rx.c b/src/transport/raweth/rx.c index 59b57e512..660f49293 100644 --- a/src/transport/raweth/rx.c +++ b/src/transport/raweth/rx.c @@ -117,6 +117,26 @@ z_result_t _z_raweth_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_mess return _z_raweth_recv_t_msg_na(ztm, t_msg, addr); } +z_result_t _z_raweth_update_rx_buff(_z_transport_multicast_t *ztm) { + // Check if user or defragment buffer took ownership of buffer + if (_z_zbuf_get_ref_count(&ztm->_zbuf) != 1) { + // Allocate a new buffer + _z_zbuf_t new_zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); + if (_z_zbuf_capacity(&new_zbuf) != Z_BATCH_MULTICAST_SIZE) { + return _Z_ERR_SYSTEM_OUT_OF_MEMORY; + } + // Recopy leftover bytes + size_t leftovers = _z_zbuf_len(&ztm->_zbuf); + if (leftovers > 0) { + _z_zbuf_copy_bytes(&new_zbuf, &ztm->_zbuf); + } + // Drop buffer & update + _z_zbuf_clear(&ztm->_zbuf); + ztm->_zbuf = new_zbuf; + } + return _Z_RES_OK; +} + #else z_result_t _z_raweth_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_slice_t *addr) { _ZP_UNUSED(ztm); diff --git a/src/transport/unicast/read.c b/src/transport/unicast/read.c index 922b59667..d6520b6c3 100644 --- a/src/transport/unicast/read.c +++ b/src/transport/unicast/read.c @@ -33,7 +33,10 @@ z_result_t _zp_unicast_read(_z_transport_unicast_t *ztu) { ret = _z_unicast_handle_transport_message(ztu, &t_msg); _z_t_msg_clear(&t_msg); } - + ret = _z_unicast_update_rx_buffer(ztu); + if (ret != _Z_RES_OK) { + _Z_ERROR("Failed to allocate rx buffer"); + } return ret; } #else @@ -95,27 +98,33 @@ void *_zp_unicast_read_task(void *ztu_arg) { // Mark the session that we have received data ztu->_received = true; - // Decode one session message - _z_transport_message_t t_msg; - z_result_t ret = _z_transport_message_decode(&t_msg, &zbuf); + while (_z_zbuf_len(&zbuf) > 0) { + // Decode one session message + _z_transport_message_t t_msg; + z_result_t ret = _z_transport_message_decode(&t_msg, &zbuf); - if (ret == _Z_RES_OK) { - ret = _z_unicast_handle_transport_message(ztu, &t_msg); if (ret == _Z_RES_OK) { - _z_t_msg_clear(&t_msg); + ret = _z_unicast_handle_transport_message(ztu, &t_msg); + if (ret == _Z_RES_OK) { + _z_t_msg_clear(&t_msg); + } else { + _Z_ERROR("Connection closed due to message processing error: %d", ret); + ztu->_read_task_running = false; + continue; + } } else { - _Z_ERROR("Connection closed due to message processing error: %d", ret); + _Z_ERROR("Connection closed due to malformed message: %d", ret); ztu->_read_task_running = false; continue; } - } else { - _Z_ERROR("Connection closed due to malformed message: %d", ret); - ztu->_read_task_running = false; - continue; } - // Move the read position of the read buffer _z_zbuf_set_rpos(&ztu->_zbuf, _z_zbuf_get_rpos(&ztu->_zbuf) + to_read); + + if (_z_unicast_update_rx_buffer(ztu) != _Z_RES_OK) { + _Z_ERROR("Connection closed due to lack of memory to allocate rx buffer"); + ztu->_read_task_running = false; + } } _z_mutex_unlock(&ztu->_mutex_rx); return NULL; diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index b500f55c9..be7d55446 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -215,6 +215,28 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t return ret; } + +z_result_t _z_unicast_update_rx_buffer(_z_transport_unicast_t *ztu) { + // Check if user or defragment buffer took ownership of buffer + if (_z_zbuf_get_ref_count(&ztu->_zbuf) != 1) { + // Allocate a new buffer + size_t buff_capacity = _z_zbuf_capacity(&ztu->_zbuf); + _z_zbuf_t new_zbuf = _z_zbuf_make(buff_capacity); + if (_z_zbuf_capacity(&new_zbuf) != buff_capacity) { + return _Z_ERR_SYSTEM_OUT_OF_MEMORY; + } + // Recopy leftover bytes + size_t leftovers = _z_zbuf_len(&ztu->_zbuf); + if (leftovers > 0) { + _z_zbuf_copy_bytes(&new_zbuf, &ztu->_zbuf); + } + // Drop buffer & update + _z_zbuf_clear(&ztu->_zbuf); + ztu->_zbuf = new_zbuf; + } + return _Z_RES_OK; +} + #else z_result_t _z_unicast_recv_t_msg(_z_transport_unicast_t *ztu, _z_transport_message_t *t_msg) { _ZP_UNUSED(ztu); diff --git a/tests/z_client_test.c b/tests/z_client_test.c index d073bc554..229722a48 100644 --- a/tests/z_client_test.c +++ b/tests/z_client_test.c @@ -138,7 +138,7 @@ int main(int argc, char **argv) { z_owned_session_t s1; assert(z_open(&s1, z_move(config), NULL) == Z_OK); _z_string_t zid1 = format_id(&(_Z_RC_IN_VAL(z_loan(s1))->_local_zid)); - printf("Session 1 with PID: %s\n", _z_string_data(&zid1)); + printf("Session 1 with PID: %.*s\n", (int)z_string_len(&zid1), _z_string_data(&zid1)); _z_string_clear(&zid1); // Start the read session session lease loops @@ -154,7 +154,7 @@ int main(int argc, char **argv) { assert(z_open(&s2, z_move(config), NULL) == Z_OK); assert(z_internal_check(s2)); _z_string_t zid2 = format_id(&(_Z_RC_IN_VAL(z_loan(s2))->_local_zid)); - printf("Session 2 with PID: %s\n", _z_string_data(&zid2)); + printf("Session 2 with PID: %.*s\n", (int)z_string_len(&zid2), _z_string_data(&zid2)); _z_string_clear(&zid2); // Start the read session session lease loops diff --git a/tests/z_peer_multicast_test.c b/tests/z_peer_multicast_test.c index 383ba799d..9cbd017b5 100644 --- a/tests/z_peer_multicast_test.c +++ b/tests/z_peer_multicast_test.c @@ -79,7 +79,7 @@ int main(int argc, char **argv) { _z_slice_t id_as_bytes = _z_slice_alias_buf(_Z_RC_IN_VAL(z_loan(s1))->_local_zid.id, _z_id_len(_Z_RC_IN_VAL(z_loan(s1))->_local_zid)); _z_string_t zid1 = _z_string_convert_bytes(&id_as_bytes); - printf("Session 1 with PID: %s\n", z_string_data(&zid1)); + printf("Session 1 with PID: %.*s\n", (int)z_string_len(&zid1), z_string_data(&zid1)); _z_string_clear(&zid1); // Start the read session session lease loops @@ -98,7 +98,7 @@ int main(int argc, char **argv) { id_as_bytes = _z_slice_alias_buf(_Z_RC_IN_VAL(z_loan(s2))->_local_zid.id, _z_id_len(_Z_RC_IN_VAL(z_loan(s2))->_local_zid)); _z_string_t zid2 = _z_string_convert_bytes(&id_as_bytes); - printf("Session 2 with PID: %s\n", z_string_data(&zid2)); + printf("Session 2 with PID: %.*s\n", (int)z_string_len(&zid2), z_string_data(&zid2)); _z_string_clear(&zid2); // Start the read session session lease loops diff --git a/tests/z_refcount_test.c b/tests/z_refcount_test.c index 31a8faf71..7935b6c63 100644 --- a/tests/z_refcount_test.c +++ b/tests/z_refcount_test.c @@ -326,9 +326,10 @@ void test_simple_rc_clone_as_ptr(void) { _dummy_simple_rc_t *drc2 = _dummy_simple_rc_clone_as_ptr(&drc1); assert(drc2->_val != NULL); assert(!_Z_RC_IS_NULL(drc2)); - assert(_z_simple_rc_strong_count(drc2->_cnt) == 2); + assert(_dummy_simple_rc_count(drc2) == 2); assert(_dummy_simple_rc_eq(&drc1, drc2)); assert(!_dummy_simple_rc_drop(&drc1)); + assert(_dummy_simple_rc_count(drc2) == 1); assert(_dummy_simple_rc_drop(drc2)); z_free(drc2); } @@ -339,7 +340,7 @@ void test_simple_rc_copy(void) { _dummy_simple_rc_t drc2 = _dummy_simple_rc_null(); assert(!_dummy_simple_rc_eq(&drc1, &drc2)); _dummy_simple_rc_copy(&drc2, &drc1); - assert(_z_simple_rc_strong_count(drc2._cnt) == 2); + assert(_dummy_simple_rc_count(&drc2) == 2); assert(_dummy_simple_rc_eq(&drc1, &drc2)); assert(!_dummy_simple_rc_drop(&drc2)); assert(_dummy_simple_rc_drop(&drc1));