Skip to content

Commit

Permalink
Configurable AMQP1.0 consumer credits
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Sep 26, 2023
1 parent daebb48 commit 5f3ca27
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 1 deletion.
3 changes: 3 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func RootCmd() *cobra.Command {
start(cfg, common.AMQP, common.AMQP)
},
}
amqp_amqp.Flags().IntVarP(&cfg.Amqp.ConsumerCredits, "amqp-consumer-credits", "", 1, "AMQP 1.0 consumer credits")

amqp_stomp = &cobra.Command{
Use: "amqp-stomp",
Expand Down Expand Up @@ -79,6 +80,7 @@ func RootCmd() *cobra.Command {
start(cfg, common.STOMP, common.AMQP)
},
}
stomp_amqp.Flags().IntVarP(&cfg.Amqp.ConsumerCredits, "amqp-consumer-credits", "", 1, "AMQP 1.0 consumer credits")

stomp_mqtt = &cobra.Command{
Use: "stomp-mqtt",
Expand All @@ -101,6 +103,7 @@ func RootCmd() *cobra.Command {
start(cfg, common.MQTT, common.AMQP)
},
}
mqtt_amqp.Flags().IntVarP(&cfg.Amqp.ConsumerCredits, "amqp-consumer-credits", "", 1, "AMQP 1.0 consumer credits")

mqtt_stomp = &cobra.Command{
Use: "mqtt-stomp",
Expand Down
2 changes: 1 addition & 1 deletion pkg/amqp10_client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewConsumer(cfg config.Config, id int) *Amqp10Consumer {
}

func (c Amqp10Consumer) Start(subscribed chan bool) {
receiver, err := c.Session.NewReceiver(context.TODO(), c.Topic, &amqp.ReceiverOptions{Durability: amqp.DurabilityUnsettledState})
receiver, err := c.Session.NewReceiver(context.TODO(), c.Topic, &amqp.ReceiverOptions{Durability: amqp.DurabilityUnsettledState, Credit: int32(c.Config.Amqp.ConsumerCredits)})
if err != nil {
log.Error("consumer failed to create a receiver", "protocol", "amqp-1.0", "consumerId", c.Id, "error", err.Error())
return
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package config

import "time"

type AmqpOptions struct {
ConsumerCredits int
}

type Config struct {
PublisherUri string
ConsumerUri string
Expand All @@ -15,4 +19,5 @@ type Config struct {
Rate int
Duration time.Duration
UseMillis bool
Amqp AmqpOptions
}

0 comments on commit 5f3ca27

Please sign in to comment.