Skip to content

Commit

Permalink
Fix ConsumerStartupDelay support
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Sep 27, 2024
1 parent 18eef02 commit 7dcb1b6
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 65 deletions.
137 changes: 107 additions & 30 deletions cmd/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,96 @@ import (

func TestPublishConsume(t *testing.T) {
type test struct {
publish string
consume string
priority string // expected/default message priority
publishProto string
publishToPrefix string
consumeProto string
consumeFromPrefix string
msgPriority string // expected/default message priority
}

tests := []test{
{publish: "stomp", consume: "stomp", priority: ""},
{publish: "stomp", consume: "amqp", priority: "4"},
{publish: "stomp", consume: "mqtt", priority: ""},
{publish: "amqp", consume: "amqp", priority: "0"}, // https://github.com/Azure/go-amqp/issues/313
{publish: "amqp", consume: "stomp", priority: "0"}, // https://github.com/Azure/go-amqp/issues/313
{publish: "amqp", consume: "mqtt", priority: ""},
{publish: "mqtt", consume: "mqtt", priority: ""},
{publish: "mqtt", consume: "stomp", priority: ""},
{publish: "mqtt", consume: "amqp", priority: "4"},
{
publishProto: "amqp",
publishToPrefix: "/queues/",
consumeProto: "amqp",
consumeFromPrefix: "/queues/",
msgPriority: "0", // https://github.com/Azure/go-amqp/issues/313
},
{
publishProto: "stomp",
publishToPrefix: "/topic/",
consumeProto: "amqp",
consumeFromPrefix: "/queues/",
msgPriority: "4",
},
{
publishProto: "mqtt",
publishToPrefix: "/topic/",
consumeProto: "amqp",
consumeFromPrefix: "/queues/",
msgPriority: "4",
},
{
publishProto: "amqp",
publishToPrefix: "/exchanges/amq.topic/",
consumeProto: "stomp",
consumeFromPrefix: "/topic/",
msgPriority: "0", // https://github.com/Azure/go-amqp/issues/313
},
{
publishProto: "amqp",
publishToPrefix: "/exchanges/amq.topic/",
consumeProto: "mqtt",
consumeFromPrefix: "/topic/",
msgPriority: "",
},
{
publishProto: "stomp",
publishToPrefix: "/topic/",
consumeProto: "stomp",
consumeFromPrefix: "/topic/",
msgPriority: "",
},
{
publishProto: "stomp",
publishToPrefix: "/topic/",
consumeProto: "mqtt",
consumeFromPrefix: "/topic/",
msgPriority: "",
},
{
publishProto: "mqtt",
publishToPrefix: "/topic/",
consumeProto: "mqtt",
consumeFromPrefix: "/topic/",
msgPriority: "",
},
{
publishProto: "mqtt",
publishToPrefix: "/topic/",
consumeProto: "stomp",
consumeFromPrefix: "/topic/",
msgPriority: "",
},
}

for _, tc := range tests {
t.Run(tc.publish+"-"+tc.consume, func(t *testing.T) {
t.Run(tc.publishProto+"-"+tc.consumeProto, func(t *testing.T) {
rootCmd := RootCmd()

topic := "/topic/" + tc.publish + tc.consume
args := []string{tc.publish + "-" + tc.consume,
publishTo := tc.publishToPrefix + tc.publishProto + tc.consumeProto
consumeFrom := tc.consumeFromPrefix + tc.publishProto + tc.consumeProto
args := []string{tc.publishProto + "-" + tc.consumeProto,
"-C", "1",
"-D", "1",
"-t", topic,
"-T", topic,
"--queues", "classic",
"--queue-durability", "none"}
"-t", publishTo,
"-T", consumeFrom,
"--queue-durability", "none",
"--time", "3s", // don't want to long in case of issues
}
if tc.consumeProto == "amqp" {
args = append(args, "--queues", "classic", "--cleanup-queues=true")
}
rootCmd.SetArgs(args)
fmt.Println("Running test: omq", strings.Join(args, " "))

Expand All @@ -64,24 +125,38 @@ func TestPublishConsume(t *testing.T) {

func TestPublishWithPriorities(t *testing.T) {
type test struct {
publish string
consume string
publishProto string
publishToPrefix string
consumeProto string
consumeFromPrefix string
}

tests := []test{
// mqtt has no concept of message priority
{publish: "stomp", consume: "stomp"},
{publish: "stomp", consume: "amqp"},
{publish: "amqp", consume: "amqp"},
{publish: "amqp", consume: "stomp"},
{publishProto: "stomp", publishToPrefix: "/topic/", consumeProto: "stomp", consumeFromPrefix: "/topic/"},
{publishProto: "stomp", publishToPrefix: "/topic/", consumeProto: "amqp", consumeFromPrefix: "/queues/"},
{publishProto: "amqp", publishToPrefix: "/queues/", consumeProto: "amqp", consumeFromPrefix: "/queues/"},
{publishProto: "amqp", publishToPrefix: "/exchanges/amq.topic/", consumeProto: "stomp", consumeFromPrefix: "/topic/"},
}

for _, tc := range tests {
t.Run(tc.publish+"-"+tc.consume, func(t *testing.T) {
t.Run(tc.publishProto+"-"+tc.consumeProto, func(t *testing.T) {
rootCmd := RootCmd()

topic := "/topic/" + tc.publish + tc.consume
args := []string{tc.publish + "-" + tc.consume, "-C", "1", "-D", "1", "-t", topic, "-T", topic, "--message-priority", "13", "--queue-durability", "none"}
publishTo := tc.publishToPrefix + tc.publishProto + tc.consumeProto
consumeFrom := tc.consumeFromPrefix + tc.publishProto + tc.consumeProto
args := []string{
tc.publishProto + "-" + tc.consumeProto,
"-C", "1",
"-D", "1",
"-t", publishTo,
"-T", consumeFrom,
"--message-priority", "13",
"--queue-durability", "none",
"--time", "3s"}
if tc.consumeProto == "amqp" {
args = append(args, "--queues", "classic", "--cleanup-queues=true")
}
rootCmd.SetArgs(args)
fmt.Println("Running test: omq", strings.Join(args, " "))

Expand All @@ -107,8 +182,10 @@ func TestConsumerStartupDelay(t *testing.T) {
"-z", "5s",
"-r", "1",
"-D", "1",
"-t", "/topic/consumer-startup-delay",
"-T", "/topic/consumer-startup-delay",
"-t", "/queues/consumer-startup-delay",
"-T", "/queues/consumer-startup-delay",
"--queues", "classic",
"--cleanup-queues=true",
"--consumer-startup-delay", "3s"}
rootCmd.SetArgs(args)
fmt.Println("Running test: omq", strings.Join(args, " "))
Expand Down
58 changes: 41 additions & 17 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/rabbitmq/omq/pkg/log"
"github.com/rabbitmq/omq/pkg/metrics"
"github.com/rabbitmq/omq/pkg/mgmt"
"github.com/rabbitmq/omq/pkg/utils"
"github.com/rabbitmq/omq/pkg/version"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -54,21 +55,27 @@ func RootCmd() *cobra.Command {
Use: "amqp-amqp",
Aliases: []string{"amqp"},
Run: func(cmd *cobra.Command, args []string) {
start(cfg, common.AMQP, common.AMQP)
cfg.PublisherProto = config.AMQP
cfg.ConsumerProto = config.AMQP
start(cfg)
},
}

amqp_stomp = &cobra.Command{
Use: "amqp-stomp",
Run: func(cmd *cobra.Command, args []string) {
start(cfg, common.AMQP, common.STOMP)
cfg.PublisherProto = config.AMQP
cfg.ConsumerProto = config.STOMP
start(cfg)
},
}

amqp_mqtt = &cobra.Command{
Use: "amqp-mqtt",
Run: func(cmd *cobra.Command, args []string) {
start(cfg, common.AMQP, common.MQTT)
cfg.PublisherProto = config.AMQP
cfg.ConsumerProto = config.MQTT
start(cfg)
},
}
amqp_mqtt.Flags().IntVar(&cfg.MqttConsumer.QoS, "mqtt-consumer-qos", 0, "MQTT consumer QoS level (0, 1 or 2; default=0)")
Expand All @@ -79,21 +86,27 @@ func RootCmd() *cobra.Command {
Use: "stomp-stomp",
Aliases: []string{"stomp"},
Run: func(cmd *cobra.Command, args []string) {
start(cfg, common.STOMP, common.STOMP)
cfg.PublisherProto = config.STOMP
cfg.ConsumerProto = config.STOMP
start(cfg)
},
}

stomp_amqp = &cobra.Command{
Use: "stomp-amqp",
Run: func(cmd *cobra.Command, args []string) {
start(cfg, common.STOMP, common.AMQP)
cfg.PublisherProto = config.STOMP
cfg.ConsumerProto = config.AMQP
start(cfg)
},
}

stomp_mqtt = &cobra.Command{
Use: "stomp-mqtt",
Run: func(cmd *cobra.Command, args []string) {
start(cfg, common.STOMP, common.MQTT)
cfg.PublisherProto = config.STOMP
cfg.ConsumerProto = config.MQTT
start(cfg)
},
}
stomp_mqtt.Flags().IntVar(&cfg.MqttConsumer.QoS, "mqtt-consumer-qos", 0, "MQTT consumer QoS level (0, 1 or 2; default=0)")
Expand All @@ -104,7 +117,9 @@ func RootCmd() *cobra.Command {
Use: "mqtt-mqtt",
Aliases: []string{"mqtt"},
Run: func(cmd *cobra.Command, args []string) {
start(cfg, common.MQTT, common.MQTT)
cfg.PublisherProto = config.MQTT
cfg.ConsumerProto = config.MQTT
start(cfg)
},
}
mqtt_mqtt.Flags().IntVar(&cfg.MqttPublisher.QoS, "mqtt-publisher-qos", 0, "MQTT publisher QoS level (0, 1 or 2; default=0)")
Expand All @@ -117,7 +132,9 @@ func RootCmd() *cobra.Command {
mqtt_amqp = &cobra.Command{
Use: "mqtt-amqp",
Run: func(cmd *cobra.Command, args []string) {
start(cfg, common.MQTT, common.AMQP)
cfg.PublisherProto = config.MQTT
cfg.ConsumerProto = config.AMQP
start(cfg)
},
}
mqtt_amqp.Flags().IntVar(&cfg.MqttPublisher.QoS, "mqtt-qos", 0, "MQTT publisher QoS level (0, 1 or 2; default=0)")
Expand All @@ -127,7 +144,9 @@ func RootCmd() *cobra.Command {
mqtt_stomp = &cobra.Command{
Use: "mqtt-stomp",
Run: func(cmd *cobra.Command, args []string) {
start(cfg, common.MQTT, common.STOMP)
cfg.PublisherProto = config.MQTT
cfg.ConsumerProto = config.STOMP
start(cfg)
},
}
mqtt_stomp.Flags().IntVar(&cfg.MqttPublisher.QoS, "mqtt-qos", 0, "MQTT publisher QoS level (0, 1 or 2; default=0)")
Expand Down Expand Up @@ -213,9 +232,9 @@ func RootCmd() *cobra.Command {
rootCmd.PersistentFlags().
IntVarP(&cfg.ConsumeCount, "cmessages", "D", math.MaxInt, "The number of messages to consume per consumer (default=MaxInt)")
rootCmd.PersistentFlags().
StringVarP(&cfg.PublishTo, "publish-to", "t", "/queues/omq", "The topic/terminus to publish to (%d will be replaced with the publisher's id)")
StringVarP(&cfg.PublishTo, "publish-to", "t", "/queues/omq-%d", "The topic/terminus to publish to (%d will be replaced with the publisher's id)")
rootCmd.PersistentFlags().
StringVarP(&cfg.ConsumeFrom, "consume-from", "T", "/queues/omq", "The queue/topic/terminus to consume from (%d will be replaced with the consumer's id)")
StringVarP(&cfg.ConsumeFrom, "consume-from", "T", "/queues/omq-%d", "The queue/topic/terminus to consume from (%d will be replaced with the consumer's id)")
rootCmd.PersistentFlags().
VarP(enumflag.New(&cfg.Queues, "queues", config.QueueTypes, enumflag.EnumCaseInsensitive), "queues", "", "Type of queues to declare (or `predeclared` to use existing queues)")
rootCmd.PersistentFlags().
Expand Down Expand Up @@ -262,8 +281,8 @@ func RootCmd() *cobra.Command {
return rootCmd
}

func start(cfg config.Config, publisherProto common.Protocol, consumerProto common.Protocol) {
if cfg.ConsumerLatency != 0 && consumerProto == common.MQTT {
func start(cfg config.Config) {
if cfg.ConsumerLatency != 0 && cfg.ConsumerProto == config.MQTT {
log.Error("Consumer latency is not supported for MQTT consumers")
os.Exit(1)
}
Expand Down Expand Up @@ -293,11 +312,16 @@ func start(cfg config.Config, publisherProto common.Protocol, consumerProto comm
// if --consumer-startup-delay is not set, we want to start
// all the consumers before we start any publishers
if cfg.ConsumerStartupDelay == 0 {
startConsumers(ctx, consumerProto, &wg)
startConsumers(ctx, cfg.ConsumerProto, &wg)
} else {
// when consumers start with a delay, we still want the queues
// to be present so that publishers can create message backlogs
for i := 1; i <= cfg.Consumers; i++ {
mgmt.DeclareAndBind(cfg, utils.InjectId(cfg.ConsumeFrom, i), i)
}
go func() {
time.Sleep(cfg.ConsumerStartupDelay)
startConsumers(ctx, consumerProto, &wg)
startConsumers(ctx, cfg.ConsumerProto, &wg)
}()
}

Expand All @@ -306,7 +330,7 @@ func start(cfg config.Config, publisherProto common.Protocol, consumerProto comm
n := i
go func() {
defer wg.Done()
p, err := common.NewPublisher(publisherProto, cfg, n)
p, err := common.NewPublisher(cfg.PublisherProto, cfg, n)
if err != nil {
log.Error("Error creating publisher: ", "error", err)
os.Exit(1)
Expand All @@ -327,7 +351,7 @@ func start(cfg config.Config, publisherProto common.Protocol, consumerProto comm
wg.Wait()
}

func startConsumers(ctx context.Context, consumerProto common.Protocol, wg *sync.WaitGroup) {
func startConsumers(ctx context.Context, consumerProto config.Protocol, wg *sync.WaitGroup) {
for i := 1; i <= cfg.Consumers; i++ {
subscribed := make(chan bool)
n := i
Expand Down
24 changes: 8 additions & 16 deletions pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,21 @@ type Consumer interface {
Start(context.Context, chan bool)
}

type Protocol int

const (
AMQP Protocol = iota
STOMP
MQTT
)

func NewPublisher(protocol Protocol, cfg config.Config, id int) (Publisher, error) {
func NewPublisher(protocol config.Protocol, cfg config.Config, id int) (Publisher, error) {
switch protocol {
case AMQP:
case config.AMQP:
p := amqp10_client.NewPublisher(cfg, id)
if p == nil {
return nil, fmt.Errorf("failed to create an AMQP-1.0 publisher")
}
return p, nil
case STOMP:
case config.STOMP:
p := stomp_client.NewPublisher(cfg, id)
if p == nil {
return nil, fmt.Errorf("failed to create a STOMP publisher")
}
return p, nil
case MQTT:
case config.MQTT:
p := mqtt_client.NewPublisher(cfg, id)
if p == nil {
return nil, fmt.Errorf("failed to create an MQTT publisher")
Expand All @@ -51,21 +43,21 @@ func NewPublisher(protocol Protocol, cfg config.Config, id int) (Publisher, erro
return nil, fmt.Errorf("unknown protocol")
}

func NewConsumer(protocol Protocol, cfg config.Config, id int) (Consumer, error) {
func NewConsumer(protocol config.Protocol, cfg config.Config, id int) (Consumer, error) {
switch protocol {
case AMQP:
case config.AMQP:
c := amqp10_client.NewConsumer(cfg, id)
if c == nil {
return nil, fmt.Errorf("failed to create an AMQP-1.0 consumer")
}
return c, nil
case STOMP:
case config.STOMP:
c := stomp_client.NewConsumer(cfg, id)
if c == nil {
return nil, fmt.Errorf("failed to create an AMQP-1.0 consumer")
}
return c, nil
case MQTT:
case config.MQTT:
c := mqtt_client.NewConsumer(cfg, id)
if c == nil {
return nil, fmt.Errorf("failed to create an AMQP-1.0 consumer")
Expand Down
Loading

0 comments on commit 7dcb1b6

Please sign in to comment.