Skip to content

Commit

Permalink
Adding a function to wait for publisher confirms (#841)
Browse files Browse the repository at this point in the history
* Adding a function to wait for publisher confirms
- Also added an example program to demo it
  • Loading branch information
manchicken authored Oct 17, 2024
1 parent c3e4176 commit 7d12118
Show file tree
Hide file tree
Showing 4 changed files with 295 additions and 0 deletions.
3 changes: 3 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ target_link_libraries(amqp_listen PRIVATE examples-common rabbitmq::rabbitmq)
add_executable(amqp_producer amqp_producer.c)
target_link_libraries(amqp_producer PRIVATE examples-common rabbitmq::rabbitmq)

add_executable(amqp_confirm_select amqp_confirm_select.c)
target_link_libraries(amqp_confirm_select PRIVATE examples-common rabbitmq::rabbitmq)

add_executable(amqp_connect_timeout amqp_connect_timeout.c)
target_link_libraries(amqp_connect_timeout PRIVATE examples-common rabbitmq::rabbitmq)

Expand Down
189 changes: 189 additions & 0 deletions examples/amqp_confirm_select.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
// SPDX-License-Identifier: mit

#include <inttypes.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/tcp_socket.h>

#include "utils.h"

#if ((defined(_WIN32)) || (defined(__MINGW32__)) || (defined(__MINGW64__)))
#ifndef WINVER
#define WINVER 0x0502
#endif
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN
#endif
#include <winsock2.h>
#else
#include <sys/time.h>
#endif

#define SUMMARY_EVERY_US 5000

static void send_batch(amqp_connection_state_t conn, char const *queue_name,
int rate_limit, int message_count) {
uint64_t start_time = now_microseconds();
int i;
int sent = 0;
int previous_sent = 0;
uint64_t previous_report_time = start_time;
uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;

char message[256];
amqp_bytes_t message_bytes;

for (i = 0; i < (int)sizeof(message); i++) {
message[i] = i & 0xff;
}

message_bytes.len = sizeof(message);
message_bytes.bytes = message;

for (i = 0; i < message_count; i++) {
uint64_t now = now_microseconds();

die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"),
amqp_cstring_bytes(queue_name), 0, 0, NULL,
message_bytes),
"Publishing");
sent++;
if (now > next_summary_time) {
int countOverInterval = sent - previous_sent;
double intervalRate =
countOverInterval / ((now - previous_report_time) / 1000000.0);
printf("%d ms: Sent %d - %d since last report (%d Hz)\n",
(int)(now - start_time) / 1000, sent, countOverInterval,
(int)intervalRate);

previous_sent = sent;
previous_report_time = now;
next_summary_time += SUMMARY_EVERY_US;
}

while (((i * 1000000.0) / (now - start_time)) > rate_limit) {
microsleep(2000);
now = now_microseconds();
}
}

{
uint64_t stop_time = now_microseconds();
int total_delta = (int)(stop_time - start_time);

printf("PRODUCER - Message count: %d\n", message_count);
printf("Total time, milliseconds: %d\n", total_delta / 1000);
printf("Overall messages-per-second: %g\n",
(message_count / (total_delta / 1000000.0)));
}
}

#define CONSUME_TIMEOUT_USEC 100
#define WAITING_TIMEOUT_USEC (30 * 1000)
void wait_for_acks(amqp_connection_state_t conn) {
uint64_t start_time = now_microseconds();
struct timeval timeout = {0, CONSUME_TIMEOUT_USEC};
uint64_t now = 0;
amqp_publisher_confirm_t result = {};

for (;;) {
amqp_rpc_reply_t ret;

now = now_microseconds();

if (now > start_time + WAITING_TIMEOUT_USEC) {
return;
}

amqp_maybe_release_buffers(conn);
ret = amqp_publisher_confirm_wait(conn, &timeout, &result);

if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type) {
if (AMQP_STATUS_UNEXPECTED_STATE == ret.library_error) {
fprintf(stderr, "An unexpected method was received\n");
return;
} else if (AMQP_STATUS_TIMEOUT == ret.library_error) {
// Timeout means you're done; no publisher confirms were waiting!
return;
} else {
die_on_amqp_error(ret, "Waiting for publisher confirmation");
}
}

switch (result.method) {
case AMQP_BASIC_ACK_METHOD:
fprintf(stderr, "Got an ACK!\n");
fprintf(stderr, "Here's the ACK:\n");
fprintf(stderr, "\tdelivery_tag: «%" PRIu64 "»\n",
result.payload.ack.delivery_tag);
fprintf(stderr, "\tmultiple: «%d»\n", result.payload.ack.multiple);
break;
case AMQP_BASIC_NACK_METHOD:
fprintf(stderr, "NACK\n");
break;
case AMQP_BASIC_REJECT_METHOD:
fprintf(stderr, "REJECT\n");
break;
default:
fprintf(stderr, "Unexpected method «%s» is.\n",
amqp_method_name(result.method));
};
}
}

int main(int argc, char const *const *argv) {
char const *hostname;
int port, status;
int rate_limit;
int message_count;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;

if (argc < 5) {
fprintf(stderr,
"Usage: amqp_producer host port rate_limit message_count\n");
return 1;
}

hostname = argv[1];
port = atoi(argv[2]);
rate_limit = atoi(argv[3]);
message_count = atoi(argv[4]);

conn = amqp_new_connection();

socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}

status = amqp_socket_open(socket, hostname, port);
if (status) {
die("opening TCP socket");
}

die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
"guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");

// Enable confirm_select
amqp_confirm_select(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Enable confirm-select");

send_batch(conn, "test queue", rate_limit, message_count);

wait_for_acks(conn);
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
"Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
}
48 changes: 48 additions & 0 deletions include/rabbitmq-c/amqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -2433,6 +2433,54 @@ AMQP_EXPORT
int AMQP_CALL amqp_set_rpc_timeout(amqp_connection_state_t state,
const struct timeval *timeout);

/**
* Possible payload permutations for publisher confirms.
**/
typedef union amqp_publisher_confirm_payload_t_ {
amqp_basic_ack_t ack; /* basic.ack */
amqp_basic_nack_t nack; /* basic.nack */
amqp_basic_reject_t reject; /* basic.reject */
} amqp_publisher_confirm_payload_t;

/**
* Return information from publisher confirm wait
**/
typedef struct amqp_publisher_confirm_t_ {
amqp_publisher_confirm_payload_t payload; /* The response payload; check the `method` value to see which value you should use in the union */
amqp_channel_t channel; /* The channel where the confirmation was received */
amqp_method_number_t method; /* The method which was received */
} amqp_publisher_confirm_t;

/**
* amqp_publisher_confirm_wait
*
* Wait for a publisher confirm when one or more channel is in select mode.
* If the response has a `reply_type` of `AMQP_RESPONSE_LIBRARY_EXCEPTION` _and_
* the `library_error` is `AMQP_STATUS_UNEXPECTED_STATE`, then the frame
* received was not an ack.
*
* In the event that there are no publisher confirms received during the
* allotted time, `reply_type` will be `AMQP_RESPONSE_LIBRARY_EXCEPTION`
* and the `library_error` will be `AMQP_STATUS_TIMEOUT`.
*
* When a publisher confirm is received, `reply_type` will equal
* `AMQP_RESPONSE_NORMAL`, and the `result` out parameter will
* contain all of the information you need:
*
* - The `channel` will identify which channel the publisher confirm was received on
* - The `method` will tell you whether this is an `ack`, `nack`, or `reject`
* - The `payload` is a union, and based on the `method` it will use one of `amqp_basic_ack_t`, `amqp_basic_nack_t`, or `amqp_basic_reject_t`
*
* \param [in] state connection state
* \param [in] timeout when waiting for the frame. Passing NULL will result in
* blocking behavior
* \param [out] The result of the publisher confirm wait.
*/
AMQP_EXPORT
amqp_rpc_reply_t AMQP_CALL amqp_publisher_confirm_wait(
amqp_connection_state_t state, const struct timeval *timeout,
amqp_publisher_confirm_t *result);

AMQP_END_DECLS

#endif /* RABBITMQ_C_RABBITMQ_C_H */
55 changes: 55 additions & 0 deletions librabbitmq/amqp_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -364,3 +364,58 @@ int amqp_set_rpc_timeout(amqp_connection_state_t state,
}
return AMQP_STATUS_OK;
}

amqp_rpc_reply_t amqp_publisher_confirm_wait(amqp_connection_state_t state,
const struct timeval *timeout,
amqp_publisher_confirm_t *result) {
int res;
amqp_frame_t frame;
amqp_rpc_reply_t ret;

memset(&ret, 0x0, sizeof(ret));
memset(result, 0x0, sizeof(amqp_publisher_confirm_t));

res = amqp_simple_wait_frame_noblock(state, &frame, timeout);

if (AMQP_STATUS_OK != res) {
ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
ret.library_error = res;
return ret;
} else if (AMQP_FRAME_METHOD != frame.frame_type ||
(AMQP_BASIC_ACK_METHOD != frame.payload.method.id &&
AMQP_BASIC_NACK_METHOD != frame.payload.method.id &&
AMQP_BASIC_REJECT_METHOD != frame.payload.method.id)) {
amqp_put_back_frame(state, &frame);
ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
ret.library_error = AMQP_STATUS_UNEXPECTED_STATE;
return ret;
}

switch (frame.payload.method.id) {
case AMQP_BASIC_ACK_METHOD:
memcpy(&(result->payload.ack), frame.payload.method.decoded,
sizeof(amqp_basic_ack_t));
break;

case AMQP_BASIC_NACK_METHOD:
memcpy(&(result->payload.nack), frame.payload.method.decoded,
sizeof(amqp_basic_nack_t));
break;

case AMQP_BASIC_REJECT_METHOD:
memcpy(&(result->payload.reject), frame.payload.method.decoded,
sizeof(amqp_basic_reject_t));
break;

default:
amqp_put_back_frame(state, &frame);
ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
ret.library_error = AMQP_STATUS_UNSUPPORTED;
return ret;
}
result->method = frame.payload.method.id;
result->channel = frame.channel;
ret.reply_type = AMQP_RESPONSE_NORMAL;

return ret;
}

0 comments on commit 7d12118

Please sign in to comment.