From 560af9680134ad2c27642e1187258d3a2151e9ec Mon Sep 17 00:00:00 2001 From: Pau Peinado Date: Fri, 13 Sep 2024 13:44:03 +0200 Subject: [PATCH] Allow middlewares between writer and reader --- Makefile | 6 +-- README.md | 15 +++--- docker-compose.yaml | 28 +++++++--- go.mod | 24 ++++++--- go.sum | 60 ++++++++++++++++++++++ mocks/Channel.go | 28 +++++++++- mocks/Connection.go | 20 +++++++- mocks/WriteAdapter.go | 36 +++++++++++-- pkg/main.go | 18 +++++-- pkg/messages/ack.go | 6 --- pkg/messages/message.go | 20 ++++---- pkg/middleware/manager.go | 11 ++-- pkg/readers/adapters/amqp.go | 12 +++-- pkg/readers/adapters/amqp_test.go | 35 +++++++------ pkg/readers/adapters/dummy.go | 5 +- pkg/readers/adapters/dummy_test.go | 13 ++--- pkg/readers/adapters/interface.go | 3 +- pkg/readers/reader.go | 18 +++---- pkg/readers/reader_test.go | 9 ++-- pkg/writers/adapters/dummy.go | 13 ++--- pkg/writers/adapters/elasticsearch.go | 15 +++--- pkg/writers/adapters/elasticsearch_test.go | 25 ++++----- pkg/writers/adapters/interface.go | 5 +- pkg/writers/adapters/nop.go | 7 +-- pkg/writers/writer.go | 17 +++--- pkg/writers/writer_test.go | 21 ++++---- 26 files changed, 319 insertions(+), 151 deletions(-) delete mode 100644 pkg/messages/ack.go diff --git a/Makefile b/Makefile index 769c301..0387255 100644 --- a/Makefile +++ b/Makefile @@ -16,8 +16,8 @@ stress-build: dep generate-proto docker-build: docker build -t softonic/homing-pigeon:${TAG} . mock: - mockery -name=WriteAdapter -recursive - mockery -name=Channel -recursive - mockery -name=Connection -recursive + mockery --name=WriteAdapter -r + mockery --name=Channel -r + mockery --name=Connection -r test: gotest -race -count=1 ./... -v diff --git a/README.md b/README.md index 3c3caf9..8ad6df9 100644 --- a/README.md +++ b/README.md @@ -69,13 +69,14 @@ In order to start up correctly, it needs well defined environment variables: ##### Core -| Name | Value | -|-----------------------|----------------------------------------------------------------------------| -| MESSAGE_BUFFER_LENGTH | Buffer length for internal golang channel used for messaging | -| ACK_BUFFER_LENGTH | Buffer length for internal golang channel used for acks | -| MIDDLEWARES_SOCKET | Socket to connect to middlewares. Ex: passthrough:///unix://tmp/test.sock" | -| READ_ADAPTER | Read interface implementation. Default: AMQP | -| WRITE_ADAPTER | Write interface implementation. Default: ELASTIC | +| Name | Value | +|-------------------------------|------------------------------------------------------------------------------------------------------| +| MESSAGE_BUFFER_LENGTH | Buffer length for internal golang channel used for messaging | +| ACK_BUFFER_LENGTH | Buffer length for internal golang channel used for acks | +| REQUEST_MIDDLEWARES_SOCKET | Socket to connect to middlewares between reader and writer. Ex: passthrough:///unix://tmp/test.sock" | +| RESPONSE_MIDDLEWARES_SOCKET | Socket to connect to middlewares between writer and reader. Ex: passthrough:///unix://tmp/test.sock" | +| READ_ADAPTER | Read interface implementation. Default: AMQP | +| WRITE_ADAPTER | Write interface implementation. Default: ELASTIC | ##### Read Adapters diff --git a/docker-compose.yaml b/docker-compose.yaml index 338c87e..ac37a7f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,9 +1,7 @@ -version: '3.7' - services: homing-pigeon: volumes: - - ./sockets:/tmp + - shared_socket:/tmp - ./:/go/src/github.com/softonic/homing-pigeon build: context: . @@ -25,18 +23,28 @@ services: ACK_BUFFER_LENGTH: "500" GRPC_GO_LOG_VERBOSITY_LEVEL: 99 GRPC_GO_LOG_SEVERITY_LEVEL: info - MIDDLEWARES_SOCKET: "passthrough:///unix:///tmp/hp" + REQUEST_MIDDLEWARES_SOCKET: "passthrough:///unix:///tmp/hprq" + RESPONSE_MIDDLEWARES_SOCKET: "passthrough:///unix:///tmp/hprp" READ_ADAPTER: "AMQP" WRITE_ADAPTER: "ELASTIC" depends_on: rabbit-mq: condition: service_healthy - hp-middleware-pass: + request-middleware-pass: + platform: linux/amd64 + volumes: + - shared_socket:/tmp + image: softonic/hp-pass-middleware:0.1.0 + environment: + IN_SOCKET: "/tmp/hprq" + command: ["-stderrthreshold=INFO"] + reponse-middleware-pass: + platform: linux/amd64 volumes: - - ./sockets:/tmp + - shared_socket:/tmp image: softonic/hp-pass-middleware:0.1.0 environment: - IN_SOCKET: "/tmp/hp" + IN_SOCKET: "/tmp/hprp" command: ["-stderrthreshold=INFO"] rabbit-mq: image: rabbitmq:3.8-management @@ -49,6 +57,7 @@ services: timeout: 20s retries: 5 elasticsearch: + platform: linux/amd64 image: docker.elastic.co/elasticsearch/elasticsearch-oss:7.4.2 ports: - 9200:9200 @@ -58,7 +67,12 @@ services: - bootstrap.memory_lock=true - discovery.seed_hosts=elasticsearch - cluster.initial_master_nodes=elasticsearch + - bootstrap.system_call_filter=false ulimits: memlock: soft: -1 hard: -1 + +volumes: + shared_socket: + driver: local \ No newline at end of file diff --git a/go.mod b/go.mod index a1df575..f1cefa7 100644 --- a/go.mod +++ b/go.mod @@ -4,24 +4,32 @@ require ( github.com/elastic/go-elasticsearch/v7 v7.10.0 github.com/golang/protobuf v1.5.4 github.com/streadway/amqp v1.0.0 - github.com/stretchr/testify v1.7.0 - google.golang.org/grpc v1.66.0 + github.com/stretchr/testify v1.9.0 + google.golang.org/grpc v1.66.2 google.golang.org/protobuf v1.34.2 k8s.io/klog v1.0.0 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/fatih/color v1.17.0 // indirect github.com/kr/text v0.2.0 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/stretchr/objx v0.3.0 // indirect - golang.org/x/net v0.27.0 // indirect - golang.org/x/sys v0.22.0 // indirect - golang.org/x/text v0.16.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf // indirect + github.com/rakyll/gotest v0.0.6 // indirect + github.com/stretchr/objx v0.5.2 // indirect + github.com/vektra/mockery v1.1.2 // indirect + golang.org/x/mod v0.21.0 // indirect + golang.org/x/net v0.29.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.25.0 // indirect + golang.org/x/text v0.18.0 // indirect + golang.org/x/tools v0.25.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect - gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) go 1.23 diff --git a/go.sum b/go.sum index d4ab79f..854c979 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,9 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/elastic/go-elasticsearch/v7 v7.10.0 h1:vYRwqgFM46ZUHFMRdvKr+y1WA4ehJO6WqAGV9Btbl2o= github.com/elastic/go-elasticsearch/v7 v7.10.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= +github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= +github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4= +github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= @@ -13,28 +16,83 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rakyll/gotest v0.0.6 h1:hBTqkO3jiuwYW/M9gL4bu0oTYcm8J6knQAAPUsJsz1I= +github.com/rakyll/gotest v0.0.6/go.mod h1:SkoesdNCWmiD4R2dljIUcfSnNdVZ12y8qK4ojDkc2Sc= github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.3.0 h1:NGXK3lHquSN08v5vWalVI/L8XU9hdzE/G6xsrze47As= github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/vektra/mockery v1.1.2 h1:uc0Yn67rJpjt8U/mAZimdCKn9AeA97BOkjpmtBSlfP4= +github.com/vektra/mockery v1.1.2/go.mod h1:VcfZjKaFOPO+MpN4ZvwPjs4c48lkq1o3Ym8yHZJu0jU= +github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.21.0 h1:vvrHzRwRfVKSiLrG+d4FMl/Qi4ukBCE6kZlTUkDYRT0= +golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= +golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200323144430-8dcfad9e016e/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8= +golang.org/x/tools v0.25.0 h1:oFU9pkj/iJgs+0DT+VMHrx+oBKs/LJMV+Uvg78sl+fE= +golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf h1:liao9UHurZLtiEwBgT9LMOnKYsHze6eA6w1KQCMVN2Q= google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c= google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= +google.golang.org/grpc v1.66.2 h1:3QdXkuq3Bkh7w+ywLdLvM56cmGvQHUMZpiCzt6Rqaoo= +google.golang.org/grpc v1.66.2/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -43,5 +101,7 @@ gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8= k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= diff --git a/mocks/Channel.go b/mocks/Channel.go index d172829..52076da 100644 --- a/mocks/Channel.go +++ b/mocks/Channel.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v2.45.1. DO NOT EDIT. package mocks @@ -13,6 +13,10 @@ type Channel struct { func (_m *Channel) Ack(tag uint64, multiple bool) error { ret := _m.Called(tag, multiple) + if len(ret) == 0 { + panic("no return value specified for Ack") + } + var r0 error if rf, ok := ret.Get(0).(func(uint64, bool) error); ok { r0 = rf(tag, multiple) @@ -27,6 +31,10 @@ func (_m *Channel) Ack(tag uint64, multiple bool) error { func (_m *Channel) Close() error { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Close") + } + var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -41,6 +49,10 @@ func (_m *Channel) Close() error { func (_m *Channel) Nack(tag uint64, multiple bool, requeue bool) error { ret := _m.Called(tag, multiple, requeue) + if len(ret) == 0 { + panic("no return value specified for Nack") + } + var r0 error if rf, ok := ret.Get(0).(func(uint64, bool, bool) error); ok { r0 = rf(tag, multiple, requeue) @@ -50,3 +62,17 @@ func (_m *Channel) Nack(tag uint64, multiple bool, requeue bool) error { return r0 } + +// NewChannel creates a new instance of Channel. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewChannel(t interface { + mock.TestingT + Cleanup(func()) +}) *Channel { + mock := &Channel{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/mocks/Connection.go b/mocks/Connection.go index 64c8846..d68fed3 100644 --- a/mocks/Connection.go +++ b/mocks/Connection.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v2.45.1. DO NOT EDIT. package mocks @@ -13,6 +13,10 @@ type Connection struct { func (_m *Connection) Close() error { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Close") + } + var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -22,3 +26,17 @@ func (_m *Connection) Close() error { return r0 } + +// NewConnection creates a new instance of Connection. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewConnection(t interface { + mock.TestingT + Cleanup(func()) +}) *Connection { + mock := &Connection{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/mocks/WriteAdapter.go b/mocks/WriteAdapter.go index e67a946..5a5929e 100644 --- a/mocks/WriteAdapter.go +++ b/mocks/WriteAdapter.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v2.45.1. DO NOT EDIT. package mocks @@ -18,6 +18,10 @@ type WriteAdapter struct { func (_m *WriteAdapter) GetTimeout() time.Duration { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetTimeout") + } + var r0 time.Duration if rf, ok := ret.Get(0).(func() time.Duration); ok { r0 = rf() @@ -29,15 +33,19 @@ func (_m *WriteAdapter) GetTimeout() time.Duration { } // ProcessMessages provides a mock function with given fields: msgs -func (_m *WriteAdapter) ProcessMessages(msgs []messages.Message) []messages.Ack { +func (_m *WriteAdapter) ProcessMessages(msgs []messages.Message) []messages.Message { ret := _m.Called(msgs) - var r0 []messages.Ack - if rf, ok := ret.Get(0).(func([]messages.Message) []messages.Ack); ok { + if len(ret) == 0 { + panic("no return value specified for ProcessMessages") + } + + var r0 []messages.Message + if rf, ok := ret.Get(0).(func([]messages.Message) []messages.Message); ok { r0 = rf(msgs) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]messages.Ack) + r0 = ret.Get(0).([]messages.Message) } } @@ -48,6 +56,10 @@ func (_m *WriteAdapter) ProcessMessages(msgs []messages.Message) []messages.Ack func (_m *WriteAdapter) ShouldProcess(msgs []messages.Message) bool { ret := _m.Called(msgs) + if len(ret) == 0 { + panic("no return value specified for ShouldProcess") + } + var r0 bool if rf, ok := ret.Get(0).(func([]messages.Message) bool); ok { r0 = rf(msgs) @@ -57,3 +69,17 @@ func (_m *WriteAdapter) ShouldProcess(msgs []messages.Message) bool { return r0 } + +// NewWriteAdapter creates a new instance of WriteAdapter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewWriteAdapter(t interface { + mock.TestingT + Cleanup(func()) +}) *WriteAdapter { + mock := &WriteAdapter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/main.go b/pkg/main.go index 5920b7b..d443005 100644 --- a/pkg/main.go +++ b/pkg/main.go @@ -4,6 +4,7 @@ import ( "os" "strconv" + "github.com/softonic/homing-pigeon/pkg/helpers" "github.com/softonic/homing-pigeon/pkg/messages" "github.com/softonic/homing-pigeon/pkg/middleware" "github.com/softonic/homing-pigeon/pkg/readers" @@ -19,25 +20,32 @@ func main() { msgCh2 := make(chan messages.Message, bufLen) bufLen = GetBufferLength("ACK_BUFFER_LENGTH") - ackCh := make(chan messages.Ack, bufLen) + ackCh1 := make(chan messages.Message, bufLen) + ackCh2 := make(chan messages.Message, bufLen) - reader, err := readers.NewReader(msgCh1, ackCh) + reader, err := readers.NewReader(msgCh1, ackCh1) if err != nil { panic(err) } - writer, err := writers.NewWriter(msgCh2, ackCh) + writer, err := writers.NewWriter(msgCh2, ackCh2) if err != nil { panic(err) } - middleware := middleware.NewMiddlewareManager(msgCh1, msgCh2) + requestMiddleWareAddress := helpers.GetEnv("REQUEST_MIDDLEWARES_SOCKET", "") + requestMiddleware := middleware.NewMiddlewareManager(msgCh1, msgCh2, requestMiddleWareAddress) + + responseMiddleWareAddress := helpers.GetEnv("RESPONSE_MIDDLEWARES_SOCKET", "") + responseMiddleware := middleware.NewMiddlewareManager(ackCh2, ackCh1, responseMiddleWareAddress) go reader.Start() - go middleware.Start() + go requestMiddleware.Start() + go responseMiddleware.Start() writer.Start() } +// GetBufferLength returns the buffer length for bufferKey env func GetBufferLength(bufferKey string) int { bufLen, err := strconv.Atoi(os.Getenv(bufferKey)) if err != nil { diff --git a/pkg/messages/ack.go b/pkg/messages/ack.go deleted file mode 100644 index 32ff3cd..0000000 --- a/pkg/messages/ack.go +++ /dev/null @@ -1,6 +0,0 @@ -package messages - -type Ack struct { - Id interface{} - Ack bool -} diff --git a/pkg/messages/message.go b/pkg/messages/message.go index 5d8d51a..4e9c95e 100644 --- a/pkg/messages/message.go +++ b/pkg/messages/message.go @@ -8,27 +8,27 @@ type Message struct { acked bool } -func (m Message) Nack() (Ack, error) { +func (m Message) Nack() (Message, error) { err := m.setAsAcked() if err != nil { - return Ack{}, err + return Message{}, err } - return Ack{ - Id: m.Id, - Ack: false, + return Message{ + Id: m.Id, + Body: []byte{0}, }, nil } -func (m Message) Ack() (Ack, error) { +func (m Message) Ack() (Message, error) { err := m.setAsAcked() if err != nil { - return Ack{}, err + return Message{}, err } - return Ack{ - Id: m.Id, - Ack: true, + return Message{ + Id: m.Id, + Body: []byte{1}, }, nil } diff --git a/pkg/middleware/manager.go b/pkg/middleware/manager.go index ec13954..0c8eeb7 100644 --- a/pkg/middleware/manager.go +++ b/pkg/middleware/manager.go @@ -4,7 +4,6 @@ import ( "context" "time" - "github.com/softonic/homing-pigeon/pkg/helpers" "github.com/softonic/homing-pigeon/pkg/messages" "github.com/softonic/homing-pigeon/proto" "google.golang.org/grpc" @@ -19,6 +18,7 @@ type MiddlwareManager struct { MiddlewareAddress string } +// Start starts the middleware manager. func (m *MiddlwareManager) Start() { if m.isMiddlewareNotAvailable() { klog.V(1).Infof("Middlewares not available") @@ -66,10 +66,11 @@ func (m *MiddlwareManager) isMiddlewareNotAvailable() bool { return m.MiddlewareAddress == "" } -func NewMiddlewareManager(msgCh1 chan messages.Message, msgCh2 chan messages.Message) *MiddlwareManager { +// NewMiddlewareManager creates a new instance of MiddlwareManager. +func NewMiddlewareManager(inputChannel chan messages.Message, outputChannel chan messages.Message, address string) *MiddlwareManager { return &MiddlwareManager{ - InputChannel: msgCh1, - OutputChannel: msgCh2, - MiddlewareAddress: helpers.GetEnv("MIDDLEWARES_SOCKET", ""), + InputChannel: inputChannel, + OutputChannel: outputChannel, + MiddlewareAddress: address, } } diff --git a/pkg/readers/adapters/amqp.go b/pkg/readers/adapters/amqp.go index 945709f..669a72a 100644 --- a/pkg/readers/adapters/amqp.go +++ b/pkg/readers/adapters/amqp.go @@ -52,9 +52,9 @@ func (a *Amqp) processMessages(writeChannel chan<- messages.Message) { } } -func (a *Amqp) HandleAck(ackChannel <-chan messages.Ack) { +func (a *Amqp) HandleAck(ackChannel <-chan messages.Message) { for ack := range ackChannel { - if ack.Ack { + if ack.Body[0] == 1 { err := a.Ch.Ack(ack.Id.(uint64), false) if err != nil { klog.Error(err) @@ -69,7 +69,8 @@ func (a *Amqp) HandleAck(ackChannel <-chan messages.Ack) { } } -func NewAmqpReaderAdapter(config amqpAdapter.Config) (ReadAdapter, error) { +// NewAMQPAdapter creates a new instance of AMQP adapter. +func NewAMQPAdapter() (ReadAdapter, error) { failOnError := func(err error, msg string) { if err != nil { klog.Errorf("%s: %s", msg, err) @@ -79,6 +80,11 @@ func NewAmqpReaderAdapter(config amqpAdapter.Config) (ReadAdapter, error) { var conn *amqp.Connection caPath := os.Getenv("RABBITMQ_CA_PATH") + config, err := NewAmqpConfig() + if err != nil { + return nil, err + } + if caPath != "" { cfg := new(tls.Config) cfg.RootCAs = x509.NewCertPool() diff --git a/pkg/readers/adapters/amqp_test.go b/pkg/readers/adapters/amqp_test.go index 2ef85ff..05bfd45 100644 --- a/pkg/readers/adapters/amqp_test.go +++ b/pkg/readers/adapters/amqp_test.go @@ -1,12 +1,13 @@ package adapters import ( + "testing" + "time" + "github.com/softonic/homing-pigeon/mocks" "github.com/softonic/homing-pigeon/pkg/messages" "github.com/streadway/amqp" "github.com/stretchr/testify/assert" - "testing" - "time" ) func TestProcessMessage(t *testing.T) { @@ -43,7 +44,7 @@ func TestProcessMessage(t *testing.T) { func TestHandleAck(t *testing.T) { expectedMessages := 1 - ackChannel := make(chan messages.Ack, expectedMessages+1) + ackChannel := make(chan messages.Message, expectedMessages+1) channel := new(mocks.Channel) expectedId := uint64(42) @@ -55,9 +56,9 @@ func TestHandleAck(t *testing.T) { Ch: channel, } - ackChannel <- messages.Ack{ - Id: expectedId, - Ack: true, + ackChannel <- messages.Message{ + Id: expectedId, + Body: []byte{1}, } go obj.HandleAck(ackChannel) @@ -74,7 +75,7 @@ func TestHandleAck(t *testing.T) { func TestHandleNack(t *testing.T) { expectedMessages := 1 - ackChannel := make(chan messages.Ack, expectedMessages+1) + ackChannel := make(chan messages.Message, expectedMessages+1) channel := new(mocks.Channel) expectedId := uint64(42) @@ -86,9 +87,9 @@ func TestHandleNack(t *testing.T) { Ch: channel, } - ackChannel <- messages.Ack{ - Id: expectedId, - Ack: false, + ackChannel <- messages.Message{ + Id: expectedId, + Body: []byte{0}, } go obj.HandleAck(ackChannel) @@ -105,7 +106,7 @@ func TestHandleNack(t *testing.T) { func TestHandleMixedAcks(t *testing.T) { expectedMessages := 1 - ackChannel := make(chan messages.Ack, expectedMessages+1) + ackChannel := make(chan messages.Message, expectedMessages+1) channel := new(mocks.Channel) expectedAckId := uint64(42) @@ -119,13 +120,13 @@ func TestHandleMixedAcks(t *testing.T) { Ch: channel, } - ackChannel <- messages.Ack{ - Id: expectedAckId, - Ack: true, + ackChannel <- messages.Message{ + Id: expectedAckId, + Body: []byte{1}, } - ackChannel <- messages.Ack{ - Id: expectedNackId, - Ack: false, + ackChannel <- messages.Message{ + Id: expectedNackId, + Body: []byte{0}, } go obj.HandleAck(ackChannel) diff --git a/pkg/readers/adapters/dummy.go b/pkg/readers/adapters/dummy.go index b8e4302..4d5ac8d 100644 --- a/pkg/readers/adapters/dummy.go +++ b/pkg/readers/adapters/dummy.go @@ -2,8 +2,9 @@ package adapters import ( "fmt" - "github.com/softonic/homing-pigeon/pkg/messages" "strconv" + + "github.com/softonic/homing-pigeon/pkg/messages" ) type Dummy struct{} @@ -18,7 +19,7 @@ func (d *Dummy) Listen(msgChannel chan<- messages.Message) { } } -func (d *Dummy) HandleAck(ackChannel <-chan messages.Ack) { +func (d *Dummy) HandleAck(ackChannel <-chan messages.Message) { for ack := range ackChannel { fmt.Print("Acked " + strconv.Itoa(int(ack.Id.(uint64))) + "\n") } diff --git a/pkg/readers/adapters/dummy_test.go b/pkg/readers/adapters/dummy_test.go index 5b0ea35..05b0878 100644 --- a/pkg/readers/adapters/dummy_test.go +++ b/pkg/readers/adapters/dummy_test.go @@ -1,10 +1,11 @@ package adapters import ( - "github.com/softonic/homing-pigeon/pkg/messages" - "github.com/stretchr/testify/assert" "testing" "time" + + "github.com/softonic/homing-pigeon/pkg/messages" + "github.com/stretchr/testify/assert" ) func TestProduceMessageQuantity(t *testing.T) { @@ -18,10 +19,10 @@ func TestProduceMessageQuantity(t *testing.T) { } func TestAcksAreRead(t *testing.T) { - ackChannel := make(chan messages.Ack, 2) - ackChannel <- messages.Ack{ - Id: uint64(1), - Ack: true, + ackChannel := make(chan messages.Message, 2) + ackChannel <- messages.Message{ + Id: uint64(1), + Body: []byte{1}, } obj := new(Dummy) diff --git a/pkg/readers/adapters/interface.go b/pkg/readers/adapters/interface.go index 54da8ce..65ec84f 100644 --- a/pkg/readers/adapters/interface.go +++ b/pkg/readers/adapters/interface.go @@ -2,7 +2,8 @@ package adapters import "github.com/softonic/homing-pigeon/pkg/messages" +// ReadAdapter is an interface for specific reader implementations. type ReadAdapter interface { Listen(msgChannel chan<- messages.Message) - HandleAck(ackChannel <-chan messages.Ack) + HandleAck(ackChannel <-chan messages.Message) } diff --git a/pkg/readers/reader.go b/pkg/readers/reader.go index 3be224a..5e56de1 100644 --- a/pkg/readers/reader.go +++ b/pkg/readers/reader.go @@ -7,18 +7,21 @@ import ( "k8s.io/klog" ) +// Reader represents a reader for handling messages. type Reader struct { ReadAdapter adapters.ReadAdapter MsgChannel chan<- messages.Message - AckChannel <-chan messages.Ack + AckChannel <-chan messages.Message } +// Start starts the reader. func (r *Reader) Start() { go r.ReadAdapter.HandleAck(r.AckChannel) r.ReadAdapter.Listen(r.MsgChannel) } -func NewReader(inputChannel chan messages.Message, ackChannel chan messages.Ack) (*Reader, error) { +// NewReader creates a new Reader instance. +func NewReader(inputChannel chan messages.Message, ackChannel chan messages.Message) (*Reader, error) { var err error var readAdapter adapters.ReadAdapter @@ -26,7 +29,7 @@ func NewReader(inputChannel chan messages.Message, ackChannel chan messages.Ack) switch adapter { case "AMQP": - readAdapter, err = NewAMQPAdapter() + readAdapter, err = adapters.NewAMQPAdapter() default: klog.Warning("Reader not defined, using dummy implementation") readAdapter, err = &adapters.Dummy{}, nil @@ -38,12 +41,3 @@ func NewReader(inputChannel chan messages.Message, ackChannel chan messages.Ack) AckChannel: ackChannel, }, err } - -func NewAMQPAdapter() (adapters.ReadAdapter, error) { - - amqpConfig, err := adapters.NewAmqpConfig() - if err != nil { - return nil, err - } - return adapters.NewAmqpReaderAdapter(amqpConfig) -} diff --git a/pkg/readers/reader_test.go b/pkg/readers/reader_test.go index aa81e89..f55e23c 100644 --- a/pkg/readers/reader_test.go +++ b/pkg/readers/reader_test.go @@ -1,10 +1,11 @@ package readers import ( - "github.com/softonic/homing-pigeon/pkg/messages" - "github.com/stretchr/testify/mock" "sync" "testing" + + "github.com/softonic/homing-pigeon/pkg/messages" + "github.com/stretchr/testify/mock" ) type readAdapterMock struct { @@ -15,7 +16,7 @@ type readAdapterMock struct { func (r *readAdapterMock) Listen(msgChannel chan<- messages.Message) { r.Called(msgChannel) } -func (r *readAdapterMock) HandleAck(ackChannel <-chan messages.Ack) { +func (r *readAdapterMock) HandleAck(ackChannel <-chan messages.Message) { r.Called(ackChannel) r.wg.Done() } @@ -23,7 +24,7 @@ func (r *readAdapterMock) HandleAck(ackChannel <-chan messages.Ack) { func TestAdapterIsStarted(t *testing.T) { readAdapterMock := new(readAdapterMock) msgChannel := make(chan<- messages.Message) - ackChannel := make(<-chan messages.Ack) + ackChannel := make(<-chan messages.Message) readAdapterMock.wg.Add(1) readAdapterMock.On("Listen", msgChannel) diff --git a/pkg/writers/adapters/dummy.go b/pkg/writers/adapters/dummy.go index f96c450..2599e81 100644 --- a/pkg/writers/adapters/dummy.go +++ b/pkg/writers/adapters/dummy.go @@ -1,18 +1,19 @@ package adapters import ( - "github.com/softonic/homing-pigeon/pkg/messages" "time" + + "github.com/softonic/homing-pigeon/pkg/messages" ) type Dummy struct{} -func (d *Dummy) ProcessMessages(msgs []messages.Message) []messages.Ack { - var processedMsg []messages.Ack +func (d *Dummy) ProcessMessages(msgs []messages.Message) []messages.Message { + var processedMsg []messages.Message for i := 0; i < len(msgs); i++ { - processedMsg = append(processedMsg, messages.Ack{ - Id: uint64(i), - Ack: true, + processedMsg = append(processedMsg, messages.Message{ + Id: uint64(i), + Body: []byte{1}, }) } return processedMsg diff --git a/pkg/writers/adapters/elasticsearch.go b/pkg/writers/adapters/elasticsearch.go index 1298d4a..b64f82d 100644 --- a/pkg/writers/adapters/elasticsearch.go +++ b/pkg/writers/adapters/elasticsearch.go @@ -4,14 +4,15 @@ import ( "bytes" "encoding/json" "errors" + "os" + "strconv" + "time" + "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" "github.com/softonic/homing-pigeon/pkg/messages" esAdapter "github.com/softonic/homing-pigeon/pkg/writers/adapters/elasticsearch" "k8s.io/klog" - "os" - "strconv" - "time" ) type Elasticsearch struct { @@ -20,8 +21,8 @@ type Elasticsearch struct { Bulk esapi.Bulk } -func (es *Elasticsearch) ProcessMessages(msgs []messages.Message) []messages.Ack { - acks := make([]messages.Ack, len(msgs)) +func (es *Elasticsearch) ProcessMessages(msgs []messages.Message) []messages.Message { + acks := make([]messages.Message, len(msgs)) if len(msgs) == 0 { return acks @@ -61,7 +62,7 @@ func (es *Elasticsearch) ProcessMessages(msgs []messages.Message) []messages.Ack return acks } -func (es *Elasticsearch) setAcksFromResponse(response esAdapter.ElasticSearchBulkResponse, msgs []messages.Message, acks []messages.Ack) { +func (es *Elasticsearch) setAcksFromResponse(response esAdapter.ElasticSearchBulkResponse, msgs []messages.Message, acks []messages.Message) { maxValidStatus := 299 responseItemPos := 0 @@ -103,7 +104,7 @@ func (es *Elasticsearch) getResponseFromResult(result *esapi.Response) esAdapter return response } -func (es *Elasticsearch) setAllNacks(msgs []messages.Message, acks []messages.Ack) { +func (es *Elasticsearch) setAllNacks(msgs []messages.Message, acks []messages.Message) { for i, msg := range msgs { nack, err := msg.Nack() if err == nil { diff --git a/pkg/writers/adapters/elasticsearch_test.go b/pkg/writers/adapters/elasticsearch_test.go index 4afe95d..5b32e42 100644 --- a/pkg/writers/adapters/elasticsearch_test.go +++ b/pkg/writers/adapters/elasticsearch_test.go @@ -2,13 +2,14 @@ package adapters import ( "bytes" + "io" + "strings" + "testing" + "github.com/elastic/go-elasticsearch/v7/esapi" "github.com/softonic/homing-pigeon/pkg/messages" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "io" - "strings" - "testing" ) /** @@ -56,7 +57,7 @@ func TestAdapterReceiveInvalidMessage(t *testing.T) { bulk.AssertNotCalled(t, "func1") assert.Len(t, acks, 1) - assert.False(t, acks[0].Ack) + assert.False(t, acks[0].Body[0] != 0) } func TestBulkActionWithErrorsMustDiscardAllMessages(t *testing.T) { @@ -83,7 +84,7 @@ func TestBulkActionWithErrorsMustDiscardAllMessages(t *testing.T) { bulk.AssertExpectations(t) assert.Len(t, acks, 1) - assert.False(t, acks[0].Ack) + assert.True(t, acks[0].Body[0] == 0) } func TestBulkActionWithSingleItemSucessful(t *testing.T) { @@ -110,7 +111,7 @@ func TestBulkActionWithSingleItemSucessful(t *testing.T) { bulk.AssertExpectations(t) assert.Len(t, acks, 1) - assert.True(t, acks[0].Ack) + assert.True(t, acks[0].Body[0] == 1) } func TestBulkActionWithSingleItemUnsuccessful(t *testing.T) { @@ -137,7 +138,7 @@ func TestBulkActionWithSingleItemUnsuccessful(t *testing.T) { bulk.AssertExpectations(t) assert.Len(t, acks, 1) - assert.False(t, acks[0].Ack) + assert.True(t, acks[0].Body[0] == 0) } func TestBulkActionWithMixedItemStatus(t *testing.T) { @@ -172,9 +173,9 @@ func TestBulkActionWithMixedItemStatus(t *testing.T) { bulk.AssertExpectations(t) assert.Len(t, acks, 3) - assert.False(t, acks[0].Ack) - assert.True(t, acks[1].Ack) - assert.False(t, acks[2].Ack) + assert.True(t, acks[0].Body[0] == 0) + assert.True(t, acks[1].Body[0] == 1) + assert.True(t, acks[2].Body[0] == 0) } func TestBulkActionWithOnlyMetadata(t *testing.T) { @@ -202,7 +203,7 @@ func TestBulkActionWithOnlyMetadata(t *testing.T) { bulk.AssertExpectations(t) assert.Len(t, acks, 1) - assert.True(t, acks[0].Ack) + assert.True(t, acks[0].Body[0] == 1) } func TestBulkActionWithNoMetadata(t *testing.T) { @@ -223,5 +224,5 @@ func TestBulkActionWithNoMetadata(t *testing.T) { bulk.AssertNotCalled(t, "func1", mock.Anything) bulk.AssertExpectations(t) assert.Len(t, acks, 1) - assert.False(t, acks[0].Ack) + assert.Empty(t, acks[0].Body) } diff --git a/pkg/writers/adapters/interface.go b/pkg/writers/adapters/interface.go index 3331b30..9a312ed 100644 --- a/pkg/writers/adapters/interface.go +++ b/pkg/writers/adapters/interface.go @@ -1,12 +1,13 @@ package adapters import ( - "github.com/softonic/homing-pigeon/pkg/messages" "time" + + "github.com/softonic/homing-pigeon/pkg/messages" ) type WriteAdapter interface { - ProcessMessages(msgs []messages.Message) []messages.Ack + ProcessMessages(msgs []messages.Message) []messages.Message ShouldProcess(msgs []messages.Message) bool GetTimeout() time.Duration } diff --git a/pkg/writers/adapters/nop.go b/pkg/writers/adapters/nop.go index 3bf6010..1b1e292 100644 --- a/pkg/writers/adapters/nop.go +++ b/pkg/writers/adapters/nop.go @@ -1,14 +1,15 @@ package adapters import ( - "github.com/softonic/homing-pigeon/pkg/messages" "time" + + "github.com/softonic/homing-pigeon/pkg/messages" ) type Nop struct{} -func (wa *Nop) ProcessMessages(msgs []messages.Message) []messages.Ack { - acks := make([]messages.Ack, 0) +func (wa *Nop) ProcessMessages(msgs []messages.Message) []messages.Message { + acks := make([]messages.Message, 0) for _, msg := range msgs { ack, err := msg.Ack() if err == nil { diff --git a/pkg/writers/writer.go b/pkg/writers/writer.go index d06243e..f766e27 100644 --- a/pkg/writers/writer.go +++ b/pkg/writers/writer.go @@ -1,17 +1,18 @@ package writers import ( + "sync" + "time" + "github.com/softonic/homing-pigeon/pkg/helpers" "github.com/softonic/homing-pigeon/pkg/messages" "github.com/softonic/homing-pigeon/pkg/writers/adapters" "k8s.io/klog" - "sync" - "time" ) type Writer struct { MsgChannel <-chan messages.Message - AckChannel chan<- messages.Ack + AckChannel chan<- messages.Message msgs []messages.Message WriteAdapter adapters.WriteAdapter mutex *sync.Mutex @@ -35,7 +36,7 @@ func (ew *Writer) shouldProcess() bool { return res } -func (ew *Writer) processAllMessages(msgChannel <-chan messages.Message, ackChannel chan<- messages.Ack) { +func (ew *Writer) processAllMessages(msgChannel <-chan messages.Message, ackChannel chan<- messages.Message) { for msg := range msgChannel { ew.appendMessage(msg) if ew.shouldProcess() { @@ -44,14 +45,14 @@ func (ew *Writer) processAllMessages(msgChannel <-chan messages.Message, ackChan } } -func (ew *Writer) timeout(ackChannel chan<- messages.Ack) { +func (ew *Writer) timeout(ackChannel chan<- messages.Message) { for { time.Sleep(ew.WriteAdapter.GetTimeout()) go ew.trigger(ackChannel) } } -func (ew *Writer) trigger(ackChannel chan<- messages.Ack) { +func (ew *Writer) trigger(ackChannel chan<- messages.Message) { ew.mutex.Lock() acks := ew.WriteAdapter.ProcessMessages(ew.msgs) @@ -61,13 +62,13 @@ func (ew *Writer) trigger(ackChannel chan<- messages.Ack) { ew.mutex.Unlock() } -func (ew *Writer) sendAcks(acks []messages.Ack, ackChannel chan<- messages.Ack) { +func (ew *Writer) sendAcks(acks []messages.Message, ackChannel chan<- messages.Message) { for _, ack := range acks { ackChannel <- ack } } -func NewWriter(outputChannel chan messages.Message, ackChannel chan messages.Ack) (*Writer, error) { +func NewWriter(outputChannel chan messages.Message, ackChannel chan messages.Message) (*Writer, error) { var err error var writeAdapter adapters.WriteAdapter diff --git a/pkg/writers/writer_test.go b/pkg/writers/writer_test.go index adab109..1b958bc 100644 --- a/pkg/writers/writer_test.go +++ b/pkg/writers/writer_test.go @@ -1,12 +1,13 @@ package writers import ( + "testing" + "time" + "github.com/softonic/homing-pigeon/mocks" "github.com/softonic/homing-pigeon/pkg/messages" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "testing" - "time" ) func TestAdapterProcessSingleMessage(t *testing.T) { @@ -18,7 +19,7 @@ func TestAdapterProcessSingleMessage(t *testing.T) { mockProcessMessages(writeAdapter) msgChannel := make(chan messages.Message, 1) - ackChannel := make(chan messages.Ack, 1) + ackChannel := make(chan messages.Message, 1) writer := Writer{ WriteAdapter: writeAdapter, @@ -57,7 +58,7 @@ func TestAdapterProcessBulkMessages(t *testing.T) { mockProcessMessages(writeAdapter) msgChannel := make(chan messages.Message, expectedMessages+1) - ackChannel := make(chan messages.Ack, expectedMessages+1) + ackChannel := make(chan messages.Message, expectedMessages+1) writer := Writer{ WriteAdapter: writeAdapter, @@ -90,7 +91,7 @@ func TestAdapterTimeoutProcessMessages(t *testing.T) { mockProcessMessages(writeAdapter) msgChannel := make(chan messages.Message, expectedMessages+1) - ackChannel := make(chan messages.Ack, expectedMessages+1) + ackChannel := make(chan messages.Message, expectedMessages+1) writer := Writer{ WriteAdapter: writeAdapter, @@ -129,12 +130,12 @@ func writeMessagesToChannel(msgChannel *chan messages.Message) { } } -func getAcks(n int) []messages.Ack { - acks := make([]messages.Ack, n) +func getAcks(n int) []messages.Message { + acks := make([]messages.Message, n) for i := 0; i < n; i++ { - acks[i] = messages.Ack{ - Id: uint64(i), - Ack: false, + acks[i] = messages.Message{ + Id: uint64(i), + Body: []byte{0}, } } return acks