Skip to content

Commit

Permalink
Fix retry mechanism for APNS (#54)
Browse files Browse the repository at this point in the history
* Fix data races wip

* wip

* wip

* wip

* wip

* wip

* wip

* Fix timeout

* fix test

* Add missing lock

* Rollback to lower case

* Remove commented code

* Add missing lock/unlock

* Use single lock/unlock on invalid_token_handler

* Add missing lock/unlock

* Prevent deadlock on retry

* More logs on apns initialization

* More logs on apns client

* Try fix infinite loop

* Fix mock

* More logging

* More logs

* Call Resume only when Pause is called

* Remove pause + resume

* Fix data races

* Some more data race fixes

* Add e2e tests for apns

* Fix test

* Try fix test on CI

* Try fix tests

* Fix test

* Fix tests

* Fix tests

* Fix tests

* Fix tests

* Fix tests

* Fix tests

* Remove unnecessary config file

* Increase request timeout

* Add a log + remove code

* Fix test name

* Allow go stats report on e2e

* Fix error log

* Replace images on CI

* Try fix CI

* Try fix CI

* Fix test

* Fix test

* GinkgoRecover

* Try fix kafka on CI

* Fix test

* Try fix Ci on Kafka

* Fix action yaml

* Try fix Kafka address on CI

* Use localhost on CI

* Try fix CI

* Add health check on kafka container on CI

* Increase health check timeout

* Change kafka replication factor on Ci

* Change hostname on kafka health check on CI

* Change eventually assertion

* Try fix test

* Try fix test

* Tweak configs

* Try fix tests

* Try fix test
  • Loading branch information
miguelreiswildlife authored May 10, 2024
1 parent 6b8de27 commit 5eea5ce
Show file tree
Hide file tree
Showing 41 changed files with 1,370 additions and 452 deletions.
14 changes: 10 additions & 4 deletions .github/workflows/integration-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,18 @@ jobs:
image: tfgco/pusher:ci-test
services:
zookeeper:
image: wurstmeister/zookeeper
image: confluentinc/cp-zookeeper:7.4.0
env:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: wurstmeister/kafka:0.10.1.0-2
image: confluentinc/cp-kafka:7.4.0
options: --health-cmd "kafka-topics --list --bootstrap-server kafka:9092" --health-interval 10s --health-timeout 10s --health-retries 15
env:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://kafka:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://0.0.0.0:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_NUM_PARTITIONS: 5
KAFKA_CREATE_TOPICS: "com.games.test:5:1"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/unit-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
- name: Build
run: docker run -v $PWD:/go/src/github.com/topfreegames/pusher tfgco/pusher:ci-test go build -v -o bin/pusher main.go
- name: Test
run: docker run -v $PWD:/go/src/github.com/topfreegames/pusher tfgco/pusher:ci-test ginkgo -v -r --randomizeAllSpecs --randomizeSuites --cover --focus="\[Unit\].*" .
run: docker run -v $PWD:/go/src/github.com/topfreegames/pusher tfgco/pusher:ci-test ginkgo -v -r --randomizeAllSpecs --randomizeSuites --cover --focus="\[Unit\].*" --skipPackage=e2e .
- name: Lint
continue-on-error: true
run: docker run -v $PWD:/go/src/github.com/topfreegames/pusher tfgco/pusher:ci-test golangci-lint run
11 changes: 7 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ test-unit:
@echo "-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-="
@echo
@export $ACK_GINKGO_RC=true
@$(GINKGO) --race -trace -r --randomizeAllSpecs --randomizeSuites --cover --focus="\[Unit\].*" .
@$(MAKE) test-coverage-func
@$(GINKGO) -trace -r --randomizeAllSpecs --randomizeSuites --cover --focus="\[Unit\].*" --skipPackage=e2e .
@#$(MAKE) test-coverage-func
@echo
@echo "-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-="
@echo "= Unit tests finished. ="
Expand All @@ -120,7 +120,7 @@ run-integration-test:
@echo "-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-="
@echo
@export $ACK_GINKGO_RC=true
@$(GINKGO) --race -trace -r -tags=integration --randomizeAllSpecs --randomizeSuites --focus="\[Integration\].*" .
@$(GINKGO) -trace -r -tags=integration --randomizeAllSpecs --randomizeSuites --focus="\[Integration\].*" .
@echo
@echo "-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-="
@echo "= Integration tests finished. ="
Expand Down Expand Up @@ -173,4 +173,7 @@ integration-test-container-dev: build-image-dev start-deps-container-dev test-db

.PHONY: mocks
mocks:
$(MOCKGENERATE) -source=interfaces/client.go -destination=mocks/firebase/client.go
$(MOCKGENERATE) -source=interfaces/client.go -destination=mocks/firebase/client.go
$(MOCKGENERATE) -source=interfaces/apns.go -destination=mocks/interfaces/apns.go
$(MOCKGENERATE) -source=interfaces/statsd.go -destination=mocks/interfaces/statsd.go
$(MOCKGENERATE) -source=interfaces/feedback_reporter.go -destination=mocks/interfaces/feedback_reporter.go
12 changes: 7 additions & 5 deletions cmd/apns.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ import (
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/topfreegames/pusher/config"
"github.com/topfreegames/pusher/interfaces"
"github.com/topfreegames/pusher/pusher"
"github.com/topfreegames/pusher/util"
)

func startApns(
debug, json, production bool,
config *viper.Viper,
vConfig *viper.Viper,
config *config.Config,
statsdClientOrNil interfaces.StatsDClient,
dbOrNil interfaces.DB,
queueOrNil interfaces.APNSPushQueue,
Expand All @@ -49,7 +51,7 @@ func startApns(
} else {
log.Level = logrus.InfoLevel
}
return pusher.NewAPNSPusher(production, config, log, statsdClientOrNil, dbOrNil, queueOrNil)
return pusher.NewAPNSPusher(production, vConfig, config, log, statsdClientOrNil, dbOrNil, queueOrNil)
}

// apnsCmd represents the apns command
Expand All @@ -58,19 +60,19 @@ var apnsCmd = &cobra.Command{
Short: "starts pusher in apns mode",
Long: `starts pusher in apns mode`,
Run: func(cmd *cobra.Command, args []string) {
config, err := util.NewViperWithConfigFile(cfgFile)
config, vConfig, err := config.NewConfigAndViper(cfgFile)
if err != nil {
panic(err)
}

sentryURL := config.GetString("sentry.url")
sentryURL := vConfig.GetString("sentry.url")
if sentryURL != "" {
raven.SetDSN(sentryURL)
}

ctx := context.Background()

apnsPusher, err := startApns(debug, json, production, config, nil, nil, nil)
apnsPusher, err := startApns(debug, json, production, vConfig, config, nil, nil, nil)
if err != nil {
raven.CaptureErrorAndWait(err, map[string]string{
"version": util.Version,
Expand Down
21 changes: 11 additions & 10 deletions cmd/apns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,31 @@ package cmd

import (
"fmt"
"github.com/topfreegames/pusher/config"
"os"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/topfreegames/pusher/mocks"
"github.com/topfreegames/pusher/util"
)

var _ = Describe("APNS", func() {
cfg := os.Getenv("CONFIG_FILE")
if cfg == "" {
cfg = "../config/test.yaml"
configFile := os.Getenv("CONFIG_FILE")
if configFile == "" {
configFile = "../config/test.yaml"
}

var config *viper.Viper
var vConfig *viper.Viper
var cfg *config.Config
var mockPushQueue *mocks.APNSPushQueueMock
var mockDB *mocks.PGMock
var mockStatsDClient *mocks.StatsDClientMock

BeforeEach(func() {
var err error
config, err = util.NewViperWithConfigFile(cfg)
cfg, vConfig, err = config.NewConfigAndViper(configFile)
Expect(err).NotTo(HaveOccurred())
mockDB = mocks.NewPGMock(0, 1)
mockPushQueue = mocks.NewAPNSPushQueueMock()
Expand All @@ -56,7 +57,7 @@ var _ = Describe("APNS", func() {

Describe("[Unit]", func() {
It("Should return apnsPusher without errors", func() {
apnsPusher, err := startApns(false, false, false, config, mockStatsDClient, mockDB, mockPushQueue)
apnsPusher, err := startApns(false, false, false, vConfig, cfg, mockStatsDClient, mockDB, mockPushQueue)
Expect(err).NotTo(HaveOccurred())
Expect(apnsPusher).NotTo(BeNil())
Expect(apnsPusher.ViperConfig).NotTo(BeNil())
Expand All @@ -66,21 +67,21 @@ var _ = Describe("APNS", func() {
})

It("Should set log to json format", func() {
apnsPusher, err := startApns(false, true, false, config, mockStatsDClient, mockDB, mockPushQueue)
apnsPusher, err := startApns(false, true, false, vConfig, cfg, mockStatsDClient, mockDB, mockPushQueue)
Expect(err).NotTo(HaveOccurred())
Expect(apnsPusher).NotTo(BeNil())
Expect(fmt.Sprintf("%T", apnsPusher.Logger.Formatter)).To(Equal(fmt.Sprintf("%T", &logrus.JSONFormatter{})))
})

It("Should set log to debug", func() {
apnsPusher, err := startApns(true, false, false, config, mockStatsDClient, mockDB, mockPushQueue)
apnsPusher, err := startApns(true, false, false, vConfig, cfg, mockStatsDClient, mockDB, mockPushQueue)
Expect(err).NotTo(HaveOccurred())
Expect(apnsPusher).NotTo(BeNil())
Expect(apnsPusher.Logger.Level).To(Equal(logrus.DebugLevel))
})

It("Should set log to production", func() {
apnsPusher, err := startApns(false, false, true, config, mockStatsDClient, mockDB, mockPushQueue)
apnsPusher, err := startApns(false, false, true, vConfig, cfg, mockStatsDClient, mockDB, mockPushQueue)
Expect(err).NotTo(HaveOccurred())
Expect(apnsPusher).NotTo(BeNil())
Expect(apnsPusher.IsProduction).To(BeTrue())
Expand Down
30 changes: 29 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,34 @@ type (
// Config is the struct that holds all the configuration for the Pusher.
Config struct {
GCM GCM
Apns Apns
Queue Kafka
GracefulShutdownTimeout int
}

Kafka struct {
Brokers string
}

GCM struct {
Apps string
PingInterval int
PingTimeout int
MaxPendingMessages int
LogStatsInterval int
}

Apns struct {
Apps string
Certs map[string]Cert
}

Cert struct {
AuthKeyPath string
KeyID string
TeamID string
Topic string
}
)

// NewConfigAndViper returns a new Config object and the corresponding viper instance.
Expand All @@ -45,7 +63,7 @@ func NewConfigAndViper(configFile string) (*Config, *viper.Viper, error) {
return config, v, nil
}

func (c *Config) GetAppsArray() []string {
func (c *Config) GetGcmAppsArray() []string {
arr := strings.Split(c.GCM.Apps, ",")
res := make([]string, 0, len(arr))
for _, a := range arr {
Expand All @@ -55,6 +73,16 @@ func (c *Config) GetAppsArray() []string {
return res
}

func (c *Config) GetApnsAppsArray() []string {
arr := strings.Split(c.Apns.Apps, ",")
res := make([]string, 0, len(arr))
for _, a := range arr {
res = append(res, strings.TrimSpace(a))
}

return res
}

func decodeHookFunc() viper.DecoderConfigOption {
hooks := mapstructure.ComposeDecodeHookFunc(
StringToMapStringHookFunc(),
Expand Down
10 changes: 5 additions & 5 deletions config/docker_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ gcm:
queue:
topics:
- "^push-[^-_]+_(apns|gcm)[_-](single|massive)"
brokers: "kafka:9092"
brokers: "kafka:29092"
group: testGroup
sessionTimeout: 6000
fetch.min.bytes: 1
Expand All @@ -36,10 +36,10 @@ feedback:
- kafka
kafka:
topics: "com.games.test.feedbacks"
brokers: "kafka:9092"
brokers: "kafka:29092"
cache:
requestTimeout: 100
cleaningInterval: 20
requestTimeout: 3000
cleaningInterval: 600
stats:
reporters:
- statsd
Expand All @@ -65,7 +65,7 @@ feedbackListeners:
queue:
topics:
- "^push-[^-_]+-(apns|gcm)-feedbacks"
brokers: "kafka:9092"
brokers: "kafka:29092"
group: testGroup
sessionTimeout: 6000
fetch.min.bytes: 1
Expand Down
4 changes: 2 additions & 2 deletions config/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ feedback:
topics: "com.games.test.feedbacks"
brokers: "localhost:9941"
cache:
requestTimeout: 100
cleaningInterval: 20
requestTimeout: 3000
cleaningInterval: 600
stats:
reporters:
- statsd
Expand Down
Loading

0 comments on commit 5eea5ce

Please sign in to comment.