You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Want to check if the usage of amqp_channel_flow is correct or not ?
Version Details
RabbitMQ 3.9.13 on Erlang 24.2.1
Rabbitmq-c client is v0.11.0
Scenario
Consumer starts consuming messages from a queue(with exclusive, auto-delete bits set). An upper limit of x-max-length-bytes is set, overflow is set to reject-publish. Internally, Consumer keeps track of how much data is in processing state in bytes and will use amqp_channel_flow method to toggle the consumer consuming/pausing the messsage contents from the queue.
Publisher checks if queue is ready to accept the message before sending, In Order to achieve, publisher redeclares the queue in passive mode and checks consumer_count. If consumer count is zero, tries to resend after some time.
When amqp_channel_flow(conn,1,0) is used by consumer to pause the reading contents from queue, an connection exception is thrown saying operation channel.flow caused a connection exception not_implemented: "active=false"
while(1)
{
amqp_rpc_reply_t res;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn);
res = amqp_consume_message(conn, &envelope, NULL, 0);
if (AMQP_RESPONSE_NORMAL != res.reply_type) {
break;
}
//under some condition consumer wants no more messages to be consumed for some time(too much content is already in
//pipeline) . Want to channel.flow event.
**amqp_channel_flow(conn,1,false) //this is throwing connectoin exception saying operation channel.flow caused a connection exception not_implemented: "active=false" **
// after some time want to resume consumer to accept more messages from the queue
amqp_channel_flow(conn,1,true);
}
} Pseudo Cod Publisher
{
//want to check if consumer is ready to accept before sending. declare the exclusive queue in passive mode
(amqp_queue_declare_ok_t *) declare_ok = amqp_queue_declare(conn,channel, Q1 , 1, 0, 0, 1, amqp_empty_table);
if(decalre_ok-> consumer-count)
//publish message
else
//retry after some time
}
The text was updated successfully, but these errors were encountered:
Want to check if the usage of amqp_channel_flow is correct or not ?
Version Details
Scenario
Consumer starts consuming messages from a queue(with exclusive, auto-delete bits set). An upper limit of x-max-length-bytes is set, overflow is set to reject-publish. Internally, Consumer keeps track of how much data is in processing state in bytes and will use amqp_channel_flow method to toggle the consumer consuming/pausing the messsage contents from the queue.
Publisher checks if queue is ready to accept the message before sending, In Order to achieve, publisher redeclares the queue in passive mode and checks consumer_count. If consumer count is zero, tries to resend after some time.
When amqp_channel_flow(conn,1,0) is used by consumer to pause the reading contents from queue, an connection exception is thrown saying
operation channel.flow caused a connection exception not_implemented: "active=false"
Pseudo Code Of Consumer
{
// declares a exclusive auto-delete queue Q1
amqp_queue_declare_ok_t *) declare_ok = amqp_queue_declare(conn,channel, Q1 , 0, 0, 1, 1, amqp_empty_table);
//basic consume
amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
while(1)
{
amqp_rpc_reply_t res;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn);
res = amqp_consume_message(conn, &envelope, NULL, 0);
if (AMQP_RESPONSE_NORMAL != res.reply_type) {
break;
}
//under some condition consumer wants no more messages to be consumed for some time(too much content is already in
//pipeline) . Want to channel.flow event.
**amqp_channel_flow(conn,1,false) //this is throwing connectoin exception saying operation channel.flow caused a connection exception not_implemented: "active=false" **
// after some time want to resume consumer to accept more messages from the queue
amqp_channel_flow(conn,1,true);
}
}
Pseudo Cod Publisher
{
//want to check if consumer is ready to accept before sending. declare the exclusive queue in passive mode
(amqp_queue_declare_ok_t *) declare_ok = amqp_queue_declare(conn,channel, Q1 , 1, 0, 0, 1, amqp_empty_table);
if(decalre_ok-> consumer-count)
//publish message
else
//retry after some time
}
The text was updated successfully, but these errors were encountered: