Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buffer: refactor buffer to be dynamically-sized #53

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
152 changes: 114 additions & 38 deletions src/async.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ mpd_async_new(int fd)

async->fd = fd;
mpd_error_init(&async->error);

mpd_buffer_init(&async->input);
mpd_buffer_init(&async->output);

Expand All @@ -92,6 +91,8 @@ mpd_async_free(struct mpd_async *async)

mpd_socket_close(async->fd);
mpd_error_deinit(&async->error);
mpd_buffer_deinit(&async->input);
mpd_buffer_deinit(&async->output);
free(async);
}

Expand Down Expand Up @@ -179,17 +180,29 @@ mpd_async_read(struct mpd_async *async)
{
size_t room;
ssize_t nbytes;
char *write_p;
const size_t min_size = 256;

assert(async != NULL);
assert(async->fd != MPD_INVALID_SOCKET);
assert(!mpd_error_is_defined(&async->error));

if (!mpd_buffer_make_room(&async->input, min_size)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if you're receiving data from a server which pushes a terabyte to you? DoS bug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is my understanding, maybe i'm missing something.
I did not change the reading functions to read all received data. It can still fail for large messages. Details below:

mpd_async_read is only used when (i) you are going to read the buffer or (ii) writing/flushing output data. In addition, mpd_buffer_make_room will only increase the size if there wasn't enough space available.

For (i), you will increase the buffer by 256 bytes (if needed) and read data out. If the message was larger than 256 bytes, an error will occur in mpd_async_recv_line. Even if you divide the terabyte message in chunks of 256 bytes, mpd_async_recv_line will continually free buffer area, so more allocations will not happen.

For (ii), the flush/write will stop once output data is no longer available. Output data is generated locally. Assuming you wrote 32MB of output data [1] (13 loops on mpd_sync_send_command_v) and the input buffer was always full, the new input buffer size is only 7424 bytes.

You can increase the input buffer size by calling mpd_async_recv_raw; but this is done deliberately.

[1] MPD will not accept this, but lets say it does for this example.

If you prefer, we could provide in the meson configure the options for the first and limit allocation sizes for the buffer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're wrong. You assume that the message will be delivered in chunks of 256 bytes, but why do you believe that is true?
What if a rogue server sends a single line which is 1 TB long?

Copy link
Contributor Author

@cataldor cataldor Apr 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see two cases if you send a single 1 TB line
(i) if you are expecting a line, mpd_async_recv_line will not be able to read the 1 TB. The maximum size it can increase the input buffer is min_size, which is currently 256 bytes. If it does not find \n in the input buffer, an error is set.

(ii) You can ask mpd_async_recv_raw to receive a 1TB. But this is the user choice. I know Linux does not normally respect this, but this would be the ENOMEM case.

Case (ii) is not currently doing this because i forgot to change it. It will only receive up to the current size available. mpc_async_recv_raw() is actually working properly: it reads the available input buffer size.

mpd_error_code(&async->error, MPD_ERROR_OOM);
mpd_error_message(&async->error,
"Out of memory for input buffer");
return false;
}
room = mpd_buffer_room(&async->input);
if (room == 0)
return true;

nbytes = recv(async->fd, mpd_buffer_write(&async->input), room,
MSG_DONTWAIT);
write_p = mpd_buffer_write(&async->input);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can mpd_buffer_write() fail, after mpd_buffer_room() has already verified that buffer space is available?
I'm puzzled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because mpd_buffer_room() does not initialize the data, that is the responsibility of mpd_buffer_write()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This design was your idea. And I say it's overly complicated, complex and thus error prone.

if (write_p == NULL) {
mpd_error_code(&async->error, MPD_ERROR_OOM);
mpd_error_message(&async->error,
"Out of memory for input buffer");
return false;
}
nbytes = recv(async->fd, write_p, room, MSG_DONTWAIT);
if (nbytes < 0) {
/* I/O error */

Expand All @@ -216,6 +229,7 @@ mpd_async_write(struct mpd_async *async)
{
size_t size;
ssize_t nbytes;
char *dst;

assert(async != NULL);
assert(async->fd != MPD_INVALID_SOCKET);
Expand All @@ -225,8 +239,14 @@ mpd_async_write(struct mpd_async *async)
if (size == 0)
return true;

nbytes = send(async->fd, mpd_buffer_read(&async->output), size,
MSG_DONTWAIT);
dst = mpd_buffer_read(&async->output);
if (dst == NULL) {
mpd_error_code(&async->error, MPD_ERROR_OOM);
mpd_error_message(&async->error,
"Out of memory for output buffer");
return NULL;
}
nbytes = send(async->fd, dst, size, MSG_DONTWAIT);
if (nbytes < 0) {
/* I/O error */

Expand Down Expand Up @@ -276,39 +296,32 @@ mpd_async_io(struct mpd_async *async, enum mpd_async_event events)
return true;
}

bool
mpd_async_send_command_v(struct mpd_async *async, const char *command,
va_list args)
/* Insert the argument list args into buffer starting from start up to the
* available buffer space. The inserted arguments are quoted, and special
* characters are properly escaped. Because of these operations, the length to
* be written is only known after the fact.
*
* returns false if the current space available in buffer is insufficient or
* true otherwise.
*/
static bool
quote_vargs(struct mpd_buffer *buffer, char *start, char **end_pos,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this function do?
Documentation missing!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 14a9563

va_list args)
{
size_t room, length;
char *dest, *end, *p;
char *end, *p;
const char *arg;

assert(async != NULL);
assert(command != NULL);

if (mpd_error_is_defined(&async->error))
return false;

room = mpd_buffer_room(&async->output);
length = strlen(command);
if (room <= length)
return false;

dest = mpd_buffer_write(&async->output);
/* -1 because we reserve space for the \n character */
end = dest + room - 1;

/* copy the command (no quoting, we asumme it is "clean") */

memcpy(dest, command, length);
p = dest + length;
/* mpd_async_send_command_v() guarantees that mpd_buffer_room() has at
* least 1 position available (length + 1)
*/
assert(mpd_buffer_room(buffer) >= 1);
end = start + mpd_buffer_room(buffer) - 1;
p = start;

/* now append all arguments (quoted) */

while ((arg = va_arg(args, const char *)) != NULL) {
/* append a space separator */

if (p >= end)
return false;

Expand All @@ -317,17 +330,66 @@ mpd_async_send_command_v(struct mpd_async *async, const char *command,
/* quote the argument into the destination buffer */

p = quote(p, end, arg);
assert(p == NULL || (p >= dest && p <= end));
assert(p == NULL || (p >= start && p <= end));
if (p == NULL)
return false;
}


/* append the newline to finish this command */

*p++ = '\n';
*end_pos = p;
return true;
}

bool
mpd_async_send_command_v(struct mpd_async *async, const char *command,
va_list args)
{
const char *error_msg = "Out of memory for output buffer";
char *end_pos = NULL, *write_p;
bool success = false;
size_t length;
va_list cargs;

assert(async != NULL);
assert(command != NULL);

if (mpd_error_is_defined(&async->error))
return false;

length = strlen(command);
/* we need a '\n' at the end */
if (!mpd_buffer_make_room(&async->output, length + 1)) {
mpd_error_code(&async->error, MPD_ERROR_OOM);
mpd_error_message(&async->error, error_msg);
return false;
}

/* copy the command (no quoting, we assume it is "clean") */
write_p = mpd_buffer_write(&async->output);
if (write_p == NULL) {
mpd_error_code(&async->error, MPD_ERROR_OOM);
mpd_error_message(&async->error, error_msg);
return NULL;
}
memcpy(write_p, command, length);
mpd_buffer_expand(&async->output, length);

while (!success) {
va_copy(cargs, args);
write_p = mpd_buffer_write(&async->output);
success = quote_vargs(&async->output, write_p, &end_pos, cargs);
va_end(cargs);
if (!success) {
if (!mpd_buffer_double_buffer_size(&async->output)) {
mpd_error_code(&async->error, MPD_ERROR_OOM);
mpd_error_message(&async->error, error_msg);
return false;
}
}
}

mpd_buffer_expand(&async->output, p - dest);
mpd_buffer_expand(&async->output, end_pos - write_p);
return true;
}

Expand Down Expand Up @@ -360,7 +422,12 @@ mpd_async_recv_line(struct mpd_async *async)
return NULL;

src = mpd_buffer_read(&async->input);
assert(src != NULL);
if (src == NULL) {
mpd_error_code(&async->error, MPD_ERROR_OOM);
mpd_error_message(&async->error,
"Out of memory for input buffer");
return NULL;
}
newline = memchr(src, '\n', size);
if (newline == NULL) {
/* line is not finished yet */
Expand All @@ -384,14 +451,23 @@ mpd_async_recv_line(struct mpd_async *async)
size_t
mpd_async_recv_raw(struct mpd_async *async, void *dest, size_t length)
{
char *src;
size_t max_size = mpd_buffer_size(&async->input);

if (max_size == 0)
return 0;

if (length > max_size)
length = max_size;

memcpy(dest, mpd_buffer_read(&async->input), length);
src = mpd_buffer_read(&async->input);
if (src == NULL) {
mpd_error_code(&async->error, MPD_ERROR_OOM);
mpd_error_message(&async->error,
"Out of memory for input buffer");
return 0;
}
memcpy(dest, src, length);
mpd_buffer_consume(&async->input, length);
return length;
}
Loading