diff --git a/cmd/cmd_test.go b/cmd/cmd_test.go index f17e9b1..805261b 100644 --- a/cmd/cmd_test.go +++ b/cmd/cmd_test.go @@ -177,7 +177,7 @@ func TestPublishWithPriorities(t *testing.T) { } } -func TestAMQPStreamFilters(t *testing.T) { +func TestAMQPStreamAppPropertyFilters(t *testing.T) { defer metrics.Reset() rootCmd := RootCmd() @@ -207,6 +207,36 @@ func TestAMQPStreamFilters(t *testing.T) { }, 2*time.Second, 100*time.Millisecond) } +func TestAMQPStreamPropertyFilters(t *testing.T) { + defer metrics.Reset() + + rootCmd := RootCmd() + + args := []string{"amqp", + "-C", "3", + "--publish-to", "/queues/stream-with-property-filters", + "--consume-from", "/queues/stream-with-property-filters", + "--amqp-subject", "foo,bar,baz", + "--amqp-property-filter", "subject=baz", + "--queues", "stream", + "--cleanup-queues=true", + "--time", "2s", + } + + rootCmd.SetArgs(args) + fmt.Println("Running test: omq", strings.Join(args, " ")) + err := rootCmd.Execute() + assert.Nil(t, err) + + assert.Eventually(t, func() bool { + return 3 == metrics.MessagesPublished.Get() + + }, 2*time.Second, 100*time.Millisecond) + assert.Eventually(t, func() bool { + return 1 == metrics.MessagesConsumedNormalPriority.Get() + }, 2*time.Second, 100*time.Millisecond) +} + func TestFanInFromMQTTtoAMQP(t *testing.T) { defer metrics.Reset() diff --git a/cmd/root.go b/cmd/root.go index 4d77a99..4fcef5d 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -41,6 +41,7 @@ var ( var metricTags []string var amqpAppProperties []string var amqpAppPropertyFilters []string +var amqpPropertyFilters []string func Execute() { rootCmd := RootCmd() @@ -226,6 +227,17 @@ func RootCmd() *cobra.Command { cfg.Amqp.AppPropertyFilters[parts[0]] = parts[1] } + // AMQP property filters + cfg.Amqp.PropertyFilters = make(map[string]string) + for _, filter := range amqpPropertyFilters { + parts := strings.SplitN(filter, "=", 2) + if len(parts) != 2 { + _, _ = fmt.Fprintf(os.Stderr, "ERROR: invalid AMQP property filter: %s, use key=filterExpression format\n", filter) + os.Exit(1) + } + cfg.Amqp.PropertyFilters[parts[0]] = parts[1] + } + // split metric tags into key-value pairs cfg.MetricTags = make(map[string]string) for _, tag := range metricTags { @@ -270,7 +282,7 @@ func RootCmd() *cobra.Command { BoolVarP(&cfg.UseMillis, "use-millis", "m", false, "Use milliseconds for timestamps (automatically enabled when no publishers or no consumers)") rootCmd.PersistentFlags(). VarP(enumflag.New(&cfg.QueueDurability, "queue-durability", config.AmqpDurabilityModes, enumflag.EnumCaseInsensitive), "queue-durability", "", "Queue durability (default: configuration - the queue definition is durable)") - rootCmd.PersistentFlags().StringVar(&cfg.Amqp.Subject, "amqp-subject", "", "AMQP 1.0 message subject") + rootCmd.PersistentFlags().StringSliceVar(&cfg.Amqp.Subjects, "amqp-subject", []string{}, "AMQP 1.0 message subject(s)") rootCmd.PersistentFlags().StringVar(&cfg.Amqp.BindingKey, "amqp-binding-key", "", "AMQP 1.0 consumer binding key") rootCmd.PersistentFlags(). BoolVar(&cfg.Amqp.SendSettled, "amqp-send-settled", false, "Send settled messages (fire and forget)") @@ -293,6 +305,7 @@ func RootCmd() *cobra.Command { rootCmd.PersistentFlags().DurationVar(&cfg.ConsumerStartupDelay, "consumer-startup-delay", 0, "Delay consumer startup to allow a backlog of messages to build up (eg. 10s)") rootCmd.PersistentFlags().StringArrayVar(&amqpAppProperties, "amqp-app-property", []string{}, "AMQP application properties, eg. key1=val1,val2") rootCmd.PersistentFlags().StringArrayVar(&amqpAppPropertyFilters, "amqp-app-property-filter", []string{}, "AMQP application property filters, eg. key1=$p:prefix") + rootCmd.PersistentFlags().StringArrayVar(&amqpPropertyFilters, "amqp-property-filter", []string{}, "AMQP property filters, eg. key1=$p:prefix") rootCmd.AddCommand(amqp_amqp) rootCmd.AddCommand(amqp_stomp) diff --git a/go.mod b/go.mod index 3abd282..ec469c2 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/rabbitmq/omq go 1.23.0 +replace github.com/Azure/go-amqp => github.com/mkuratczyk/go-amqp v0.0.0-20241014095613-d9376f5f2d9f + require ( github.com/Azure/go-amqp v1.2.0 github.com/VictoriaMetrics/metrics v1.35.1 @@ -31,16 +33,16 @@ require ( github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/valyala/fastrand v1.1.0 // indirect github.com/valyala/histogram v1.2.0 // indirect - golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect - golang.org/x/net v0.29.0 // indirect + golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect + golang.org/x/net v0.30.0 // indirect golang.org/x/sync v0.8.0 // indirect - golang.org/x/sys v0.25.0 // indirect + golang.org/x/sys v0.26.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) require ( github.com/go-stomp/stomp/v3 v3.1.2 github.com/inconshreveable/mousetrap v1.1.0 // indirect - github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20240924141749-6d8aaeb2b8f7 + github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20241001222512-5fc29f4968d0 github.com/spf13/pflag v1.0.5 // indirect ) diff --git a/go.sum b/go.sum index 8c6b384..8b3e70d 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,3 @@ -github.com/Azure/go-amqp v1.1.0 h1:XUhx5f4lZFVf6LQc5kBUFECW0iJW9VLxKCYrBeGwl0U= -github.com/Azure/go-amqp v1.1.0/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= -github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719 h1:rL7yrEV9yputQV7T+Y9eJVmTVkK4B0aHlBc8TUITC5A= -github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= -github.com/Azure/go-amqp v1.2.0 h1:NNyfN3/cRszWzMvjmm64yaPZDHX/2DJkowv8Ub9y01I= -github.com/Azure/go-amqp v1.2.0/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= github.com/VictoriaMetrics/metrics v1.35.1 h1:o84wtBKQbzLdDy14XeskkCZih6anG+veZ1SwJHFGwrU= github.com/VictoriaMetrics/metrics v1.35.1/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8= github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= @@ -12,8 +6,6 @@ github.com/charmbracelet/lipgloss v0.13.0 h1:4X3PPeoWEDCMvzDvGmTajSyYPcZM4+y8sCA github.com/charmbracelet/lipgloss v0.13.0/go.mod h1:nw4zy0SBX/F/eAO1cWdcvy6qnkDUxr8Lw7dvFrAIbbY= github.com/charmbracelet/log v0.4.0 h1:G9bQAcx8rWA2T3pWvx7YtPTPwgqpk7D68BX21IRW8ZM= github.com/charmbracelet/log v0.4.0/go.mod h1:63bXt/djrizTec0l11H20t8FDSvA4CRZJ1KH22MdptM= -github.com/charmbracelet/x/ansi v0.2.3 h1:VfFN0NUpcjBRd4DnKfRaIRo53KRgey/nhOoEqosGDEY= -github.com/charmbracelet/x/ansi v0.2.3/go.mod h1:dk73KoMTT5AX5BsX0KrqhsTqAnhZZoCBjs7dGWp4Ktw= github.com/charmbracelet/x/ansi v0.3.2 h1:wsEwgAN+C9U06l9dCVMX0/L3x7ptvY1qmjMwyfE6USY= github.com/charmbracelet/x/ansi v0.3.2/go.mod h1:dk73KoMTT5AX5BsX0KrqhsTqAnhZZoCBjs7dGWp4Ktw= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= @@ -26,18 +18,19 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8 github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4= github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= -github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= -github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-stomp/stomp/v3 v3.1.2 h1:kmrNek021BsFUO8rxDhbkOYslRomKO/JIrUCIqyL0r8= github.com/go-stomp/stomp/v3 v3.1.2/go.mod h1:ztzZej6T2W4Y6FlD+Tb5n7HQP3/O5UNQiuC169pIp10= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= -github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE= -github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 h1:c5FlPPgxOn7kJz3VoPLkQYQXGBS3EklQ4Zfi57uOuqQ= +github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= @@ -57,17 +50,19 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mkuratczyk/go-amqp v0.0.0-20241014095613-d9376f5f2d9f h1:WVdm43rq1NhTOuRrxn73Tw4j9rrrC/ilOEKWTYxFWqo= +github.com/mkuratczyk/go-amqp v0.0.0-20241014095613-d9376f5f2d9f/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= github.com/muesli/termenv v0.15.2 h1:GohcuySI0QmI3wN8Ok9PtKGkgkFIk7y6Vpb5PvrY+Wo= github.com/muesli/termenv v0.15.2/go.mod h1:Epx+iuz8sNs7mNKhxzH4fWXGNpZwUaJKRS1noLXviQ8= -github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4= -github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o= -github.com/onsi/gomega v1.28.1 h1:MijcGUbfYuznzK/5R4CPNoUP/9Xvuo20sXfEm6XxoTA= -github.com/onsi/gomega v1.28.1/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= +github.com/onsi/ginkgo/v2 v2.20.2 h1:7NVCeyIWROIAheY21RLS+3j2bb52W0W82tkberYytp4= +github.com/onsi/ginkgo/v2 v2.20.2/go.mod h1:K9gyxPIlb+aIvnZ8bd9Ak+YP18w3APlR+5coaZoE2ag= +github.com/onsi/gomega v1.34.2 h1:pNCwDkzrsv7MS9kpaQvVb1aVLahQXyJ/Tv5oAZMI3i8= +github.com/onsi/gomega v1.34.2/go.mod h1:v1xfxRgk0KIsG+QOdm7p8UosrOzPYRo60fd3B/1Dukc= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20240924141749-6d8aaeb2b8f7 h1:PqmnnV/+mTJp+yKe+cHKtvi9lM7PgmIL6prhFRL1qnA= -github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20240924141749-6d8aaeb2b8f7/go.mod h1:fatkNU802Qu/1pmg6x7pmHC6GKLTJsmjNVPrJ5BoZfE= +github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20241001222512-5fc29f4968d0 h1:aX62gNFKXqBsacs3ejwl21dfyP8GT8WUPX04cRuLzwQ= +github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20241001222512-5fc29f4968d0/go.mod h1:Km231GyOZAw9I3SZIqkfB9VVzCsu8jvFWYdghmnwueM= github.com/relvacode/iso8601 v1.4.0 h1:GsInVSEJfkYuirYFxa80nMLbH2aydgZpIf52gYZXUJs= github.com/relvacode/iso8601 v1.4.0/go.mod h1:FlNp+jz+TXpyRqgmM7tnzHHzBnz776kmAH2h3sZCn0I= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= @@ -96,18 +91,14 @@ github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tz github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 h1:kx6Ds3MlpiUHKj7syVnbp57++8WpuKPcR5yjLBjvLEA= -golang.org/x/exp v0.0.0-20240823005443-9b4947da3948/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ= -golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= -golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= +golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c h1:7dEasQXItcW1xKJ2+gg5VOiBnqWrJc+rq0DPKyvvdbY= +golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c/go.mod h1:NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= -golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= -golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= -golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= +golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= @@ -118,20 +109,18 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= -golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= -golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= -golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= +golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24= -golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= +golang.org/x/tools v0.26.0 h1:v/60pFQmzmT9ExmjDv2gGIfi3OqfKoEP6I5+umXlbnQ= +golang.org/x/tools v0.26.0/go.mod h1:TPVVj70c7JJ3WCazhD8OdXcZg/og+b9+tH/KxylGwH0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/amqp10_client/consumer.go b/pkg/amqp10_client/consumer.go index 01f917b..47bcf6d 100644 --- a/pkg/amqp10_client/consumer.go +++ b/pkg/amqp10_client/consumer.go @@ -112,7 +112,14 @@ func (c *Amqp10Consumer) CreateReceiver(ctx context.Context) { for c.Receiver == nil { mgmt.DeclareAndBind(c.Config, c.Terminus, c.Id) - receiver, err := c.Session.NewReceiver(ctx, c.Terminus, &amqp.ReceiverOptions{SourceDurability: durability, Credit: int32(c.Config.ConsumerCredits), Properties: buildLinkProperties(c.Config), Filters: buildLinkFilters(c.Config)}) + receiver, err := c.Session.NewReceiver(ctx, + c.Terminus, + &amqp.ReceiverOptions{ + SourceDurability: durability, + Credit: int32(c.Config.ConsumerCredits), + Properties: buildLinkProperties(c.Config), + Filters: buildLinkFilters(c.Config), + }) if err != nil { if err == context.Canceled { return @@ -159,11 +166,19 @@ func (c *Amqp10Consumer) Start(ctx context.Context, subscribed chan bool) { if c.Config.LogOutOfOrder && timeSent.Before(previousMessageTimeSent) { metrics.MessagesConsumedOutOfOrderMetric(priority).Inc() - log.Info("Out of order message received. This message was sent before the previous message", "this messsage", timeSent, "previous message", previousMessageTimeSent) + log.Info("Out of order message received. This message was sent before the previous message", + "this messsage", timeSent, + "previous message", previousMessageTimeSent) } previousMessageTimeSent = timeSent - log.Debug("message received", "id", c.Id, "terminus", c.Terminus, "size", len(payload), "priority", priority, "latency", latency, "appProps", msg.ApplicationProperties) + log.Debug("message received", + "id", c.Id, + "terminus", c.Terminus, + "size", len(payload), + "priority", priority, + "latency", latency, + "appProps", msg.ApplicationProperties) if c.Config.ConsumerLatency > 0 { log.Debug("consumer latency", "id", c.Id, "latency", c.Config.ConsumerLatency) @@ -249,7 +264,18 @@ func buildLinkFilters(cfg config.Config) []amqp.LinkFilter { } for appProperty, filterExpression := range cfg.Amqp.AppPropertyFilters { - filters = append(filters, amqp.NewLinkFilter("amqp:application-properties-filter", 0, map[string]any{appProperty: filterExpression})) + filters = append(filters, amqp.NewLinkFilter("amqp:application-properties-filter", + 0, + map[string]any{ + appProperty: filterExpression})) + } + + for property, filterExpression := range cfg.Amqp.PropertyFilters { + filters = append(filters, + amqp.NewLinkFilter("amqp:properties-filter", + 0, + map[amqp.Symbol]any{ + amqp.Symbol(property): filterExpression})) } return filters } diff --git a/pkg/amqp10_client/publisher.go b/pkg/amqp10_client/publisher.go index be2b20f..053882f 100644 --- a/pkg/amqp10_client/publisher.go +++ b/pkg/amqp10_client/publisher.go @@ -207,8 +207,8 @@ func (p *Amqp10Publisher) Send() error { } } - if p.Config.Amqp.Subject != "" { - msg.Properties = &amqp.MessageProperties{Subject: &p.Config.Amqp.Subject} + if len(p.Config.Amqp.Subjects) > 0 { + msg.Properties = &amqp.MessageProperties{Subject: &p.Config.Amqp.Subjects[metrics.MessagesPublished.Get()%uint64(len(p.Config.Amqp.Subjects))]} } if p.Config.StreamFilterValueSet != "" { @@ -240,7 +240,7 @@ func (p *Amqp10Publisher) Send() error { // rejected messages are not counted as published, maybe they should be? metrics.MessagesPublished.Inc() metrics.PublishingLatency.Update(latency.Seconds()) - log.Debug("message sent", "id", p.Id, "destination", p.Terminus, "latency", latency, "appProps", msg.ApplicationProperties) + log.Debug("message sent", "id", p.Id, "destination", p.Terminus, "latency", latency, "appProps", msg.ApplicationProperties, "subject", *msg.Properties.Subject) return nil } diff --git a/pkg/config/config.go b/pkg/config/config.go index b6999b0..346afe2 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -45,11 +45,12 @@ var QueueTypes = map[QueueType][]string{ } type AmqpOptions struct { - Subject string + Subjects []string SendSettled bool ReleaseRate int RejectRate int BindingKey string + PropertyFilters map[string]string AppProperties map[string][]string AppPropertyFilters map[string]string }