diff --git a/config/test.yaml b/config/test.yaml index a01e0c6..0c8b1e2 100644 --- a/config/test.yaml +++ b/config/test.yaml @@ -1,23 +1,24 @@ --- -queue: - topics: - - "com.games.test" - brokers: "localhost:9941" - group: testGroup - sessionTimeout: 6000 - offsetResetStrategy: latest - handleAllMessagesBeforeExiting: true -statsd: - host: "localhost:40001" - prefix: "push" - flushIntervalMs: 5000 -pg: - table: "test_table" - host: localhost - port: 8585 - user: pg_user - pass: "" - poolSize: 20 - maxRetries: 3 - database: test_db - connectionTimeout: 100 +extensions: + kafkaconsumer: + topics: + - "com.games.test" + brokers: "localhost:9941" + group: testGroup + sessionTimeout: 6000 + offsetResetStrategy: latest + handleAllMessagesBeforeExiting: true + statsd: + host: "localhost:40001" + prefix: "push" + flushIntervalMs: 5000 + pg: + table: "test_table" + host: localhost + port: 8585 + user: pg_user + pass: "" + poolSize: 20 + maxRetries: 3 + database: test_db + connectionTimeout: 100 diff --git a/extensions.coverprofile b/extensions.coverprofile index 2cdf0f7..01405cc 100644 --- a/extensions.coverprofile +++ b/extensions.coverprofile @@ -58,20 +58,20 @@ github.com/topfreegames/extensions/kafka_consumer.go:313.17,315.4 1 1 github.com/topfreegames/extensions/kafka_producer.go:43.135,49.27 3 2 github.com/topfreegames/extensions/kafka_producer.go:52.2,53.15 2 2 github.com/topfreegames/extensions/kafka_producer.go:49.27,51.3 1 2 -github.com/topfreegames/extensions/kafka_producer.go:56.53,59.2 2 2 -github.com/topfreegames/extensions/kafka_producer.go:61.82,72.21 6 2 -github.com/topfreegames/extensions/kafka_producer.go:82.2,84.12 3 2 -github.com/topfreegames/extensions/kafka_producer.go:72.21,75.17 3 0 -github.com/topfreegames/extensions/kafka_producer.go:75.17,78.4 2 0 -github.com/topfreegames/extensions/kafka_producer.go:79.3,81.3 1 2 -github.com/topfreegames/extensions/kafka_producer.go:87.51,91.37 2 2 -github.com/topfreegames/extensions/kafka_producer.go:91.37,92.25 1 2 -github.com/topfreegames/extensions/kafka_producer.go:93.3,95.37 2 2 -github.com/topfreegames/extensions/kafka_producer.go:108.4,108.9 1 2 -github.com/topfreegames/extensions/kafka_producer.go:109.3,110.65 1 0 -github.com/topfreegames/extensions/kafka_producer.go:95.37,101.5 2 0 -github.com/topfreegames/extensions/kafka_producer.go:101.5,107.5 1 2 -github.com/topfreegames/extensions/kafka_producer.go:116.67,125.2 2 1 +github.com/topfreegames/extensions/kafka_producer.go:56.53,58.2 1 2 +github.com/topfreegames/extensions/kafka_producer.go:60.82,71.21 6 2 +github.com/topfreegames/extensions/kafka_producer.go:81.2,83.12 3 2 +github.com/topfreegames/extensions/kafka_producer.go:71.21,74.17 3 0 +github.com/topfreegames/extensions/kafka_producer.go:74.17,77.4 2 0 +github.com/topfreegames/extensions/kafka_producer.go:78.3,80.3 1 2 +github.com/topfreegames/extensions/kafka_producer.go:86.51,90.37 2 2 +github.com/topfreegames/extensions/kafka_producer.go:90.37,91.25 1 2 +github.com/topfreegames/extensions/kafka_producer.go:92.3,94.37 2 2 +github.com/topfreegames/extensions/kafka_producer.go:107.4,107.9 1 2 +github.com/topfreegames/extensions/kafka_producer.go:108.3,109.65 1 0 +github.com/topfreegames/extensions/kafka_producer.go:94.37,100.5 2 0 +github.com/topfreegames/extensions/kafka_producer.go:100.5,106.5 1 2 +github.com/topfreegames/extensions/kafka_producer.go:115.67,124.2 2 1 github.com/topfreegames/extensions/pg.go:43.99,49.23 3 10 github.com/topfreegames/extensions/pg.go:52.2,53.16 2 10 github.com/topfreegames/extensions/pg.go:57.2,59.16 3 10 @@ -79,7 +79,7 @@ github.com/topfreegames/extensions/pg.go:63.2,63.20 1 10 github.com/topfreegames/extensions/pg.go:49.23,51.3 1 10 github.com/topfreegames/extensions/pg.go:53.16,55.3 1 0 github.com/topfreegames/extensions/pg.go:59.16,61.3 1 0 -github.com/topfreegames/extensions/pg.go:67.67,85.15 9 10 +github.com/topfreegames/extensions/pg.go:67.52,85.15 9 10 github.com/topfreegames/extensions/pg.go:91.2,91.12 1 10 github.com/topfreegames/extensions/pg.go:85.15,87.3 1 0 github.com/topfreegames/extensions/pg.go:87.3,89.3 1 10 diff --git a/kafka_consumer.go b/kafka_consumer.go index 75b5e1c..09d5002 100644 --- a/kafka_consumer.go +++ b/kafka_consumer.go @@ -76,23 +76,23 @@ func NewKafkaConsumer( } func (q *KafkaConsumer) loadConfigurationDefaults() { - q.Config.SetDefault("queue.topics", []string{"com.games.test"}) - q.Config.SetDefault("queue.brokers", "localhost:9092") - q.Config.SetDefault("queue.channelSize", 100) - q.Config.SetDefault("queue.group", "test") - q.Config.SetDefault("queue.sessionTimeout", 6000) - q.Config.SetDefault("queue.offsetResetStrategy", "latest") - q.Config.SetDefault("queue.handleAllMessagesBeforeExiting", true) + q.Config.SetDefault("extensions.kafkaconsumer.topics", []string{"com.games.test"}) + q.Config.SetDefault("extensions.kafkaconsumer.brokers", "localhost:9092") + q.Config.SetDefault("extensions.kafkaconsumer.channelSize", 100) + q.Config.SetDefault("extensions.kafkaconsumer.group", "test") + q.Config.SetDefault("extensions.kafkaconsumer.sessionTimeout", 6000) + q.Config.SetDefault("extensions.kafkaconsumer.offsetResetStrategy", "latest") + q.Config.SetDefault("extensions.kafkaconsumer.handleAllMessagesBeforeExiting", true) } func (q *KafkaConsumer) configure(client interfaces.KafkaConsumerClient) error { - q.OffsetResetStrategy = q.Config.GetString("queue.offsetResetStrategy") - q.Brokers = q.Config.GetString("queue.brokers") - q.ConsumerGroup = q.Config.GetString("queue.group") - q.SessionTimeout = q.Config.GetInt("queue.sessionTimeout") - q.Topics = q.Config.GetStringSlice("queue.topics") - q.ChannelSize = q.Config.GetInt("queue.channelSize") - q.HandleAllMessagesBeforeExiting = q.Config.GetBool("queue.handleAllMessagesBeforeExiting") + q.OffsetResetStrategy = q.Config.GetString("extensions.kafkaconsumer.offsetResetStrategy") + q.Brokers = q.Config.GetString("extensions.kafkaconsumer.brokers") + q.ConsumerGroup = q.Config.GetString("extensions.kafkaconsumer.group") + q.SessionTimeout = q.Config.GetInt("extensions.kafkaconsumer.sessionTimeout") + q.Topics = q.Config.GetStringSlice("extensions.kafkaconsumer.topics") + q.ChannelSize = q.Config.GetInt("extensions.kafkaconsumer.channelSize") + q.HandleAllMessagesBeforeExiting = q.Config.GetBool("extensions.kafkaconsumer.handleAllMessagesBeforeExiting") q.msgChan = make(chan []byte, q.ChannelSize) diff --git a/kafka_consumer_test.go b/kafka_consumer_test.go index 814d7e4..8f3e608 100644 --- a/kafka_consumer_test.go +++ b/kafka_consumer_test.go @@ -67,12 +67,12 @@ var _ = Describe("Kafka Extension", func() { BeforeEach(func() { kafkaConsumerClientMock = mocks.NewKafkaConsumerClientMock() config := viper.New() - config.Set("queue.topics", []string{"com.games.test"}) - config.Set("queue.brokers", "localhost:9941") - config.Set("queue.group", "testGroup") - config.Set("queue.sessionTimeout", 6000) - config.Set("queue.offsetResetStrategy", "latest") - config.Set("queue.handleAllMessagesBeforeExiting", true) + config.Set("extensions.kafkaconsumer.topics", []string{"com.games.test"}) + config.Set("extensions.kafkaconsumer.brokers", "localhost:9941") + config.Set("extensions.kafkaconsumer.group", "testGroup") + config.Set("extensions.kafkaconsumer.sessionTimeout", 6000) + config.Set("extensions.kafkaconsumer.offsetResetStrategy", "latest") + config.Set("extensions.kafkaconsumer.handleAllMessagesBeforeExiting", true) var err error consumer, err = NewKafkaConsumer(config, logger, kafkaConsumerClientMock) @@ -112,7 +112,7 @@ var _ = Describe("Kafka Extension", func() { }) It("should assign partition", func() { - topic := consumer.Config.GetStringSlice("queue.topics")[0] + topic := consumer.Config.GetStringSlice("extensions.kafkaconsumer.topics")[0] startConsuming() defer consumer.StopConsuming() part := kafka.TopicPartition{ @@ -128,7 +128,7 @@ var _ = Describe("Kafka Extension", func() { }) It("should log error if fails to assign partition", func() { - topic := consumer.Config.GetStringSlice("queue.topics")[0] + topic := consumer.Config.GetStringSlice("extensions.kafkaconsumer.topics")[0] startConsuming() defer consumer.StopConsuming() @@ -149,7 +149,7 @@ var _ = Describe("Kafka Extension", func() { }) It("should revoke partitions", func() { - topic := consumer.Config.GetStringSlice("queue.topics")[0] + topic := consumer.Config.GetStringSlice("extensions.kafkaconsumer.topics")[0] startConsuming() defer consumer.StopConsuming() part := kafka.TopicPartition{ @@ -165,7 +165,7 @@ var _ = Describe("Kafka Extension", func() { }) It("should stop loop if fails to revoke partitions", func() { - topic := consumer.Config.GetStringSlice("queue.topics")[0] + topic := consumer.Config.GetStringSlice("extensions.kafkaconsumer.topics")[0] startConsuming() defer consumer.StopConsuming() part := kafka.TopicPartition{ @@ -182,7 +182,7 @@ var _ = Describe("Kafka Extension", func() { }) It("should receive message", func() { - topic := consumer.Config.GetStringSlice("queue.topics")[0] + topic := consumer.Config.GetStringSlice("extensions.kafkaconsumer.topics")[0] startConsuming() defer consumer.StopConsuming() part := kafka.TopicPartition{ @@ -247,12 +247,12 @@ var _ = Describe("Kafka Extension", func() { Expect(err).NotTo(HaveOccurred()) cons.loadConfigurationDefaults() - Expect(cnf.GetStringSlice("queue.topics")).To(Equal([]string{"com.games.test"})) - Expect(cnf.GetString("queue.brokers")).To(Equal("localhost:9092")) - Expect(cnf.GetString("queue.group")).To(Equal("test")) - Expect(cnf.GetInt("queue.sessionTimeout")).To(Equal(6000)) - Expect(cnf.GetString("queue.offsetResetStrategy")).To(Equal("latest")) - Expect(cnf.GetBool("queue.handleAllMessagesBeforeExiting")).To(BeTrue()) + Expect(cnf.GetStringSlice("extensions.kafkaconsumer.topics")).To(Equal([]string{"com.games.test"})) + Expect(cnf.GetString("extensions.kafkaconsumer.brokers")).To(Equal("localhost:9092")) + Expect(cnf.GetString("extensions.kafkaconsumer.group")).To(Equal("test")) + Expect(cnf.GetInt("extensions.kafkaconsumer.sessionTimeout")).To(Equal(6000)) + Expect(cnf.GetString("extensions.kafkaconsumer.offsetResetStrategy")).To(Equal("latest")) + Expect(cnf.GetBool("extensions.kafkaconsumer.handleAllMessagesBeforeExiting")).To(BeTrue()) }) }) diff --git a/kafka_producer.go b/kafka_producer.go index a324dac..5e15226 100644 --- a/kafka_producer.go +++ b/kafka_producer.go @@ -54,13 +54,12 @@ func NewKafkaProducer(config *viper.Viper, logger *log.Logger, clientOrNil ...in } func (q *KafkaProducer) loadConfigurationDefaults() { - q.Config.SetDefault("feedback.kafka.topic", "com.games.test.feedbacks") - q.Config.SetDefault("feedback.kafka.brokers", "localhost:9941") + q.Config.SetDefault("extensions.kafkaproducer.brokers", "localhost:9941") } func (q *KafkaProducer) configure(producer interfaces.KafkaProducerClient) error { q.loadConfigurationDefaults() - q.Brokers = q.Config.GetString("feedback.kafka.brokers") + q.Brokers = q.Config.GetString("extensions.kafkaproducer.brokers") c := &kafka.ConfigMap{ "bootstrap.servers": q.Brokers, } diff --git a/pg.go b/pg.go index 41a8b50..fa332b8 100644 --- a/pg.go +++ b/pg.go @@ -49,12 +49,12 @@ func NewPGClient(prefix string, config *viper.Viper, pgOrNil ...interfaces.DB) ( if len(pgOrNil) == 1 { db = pgOrNil[0] } - err := client.Connect(prefix, db) + err := client.Connect(db) if err != nil { return nil, err } - timeout := config.GetInt(fmt.Sprintf("%s.connectionTimeout", prefix)) + timeout := config.GetInt("extensions.pg.connectionTimeout") err = client.WaitForConnection(timeout) if err != nil { return nil, err @@ -64,14 +64,14 @@ func NewPGClient(prefix string, config *viper.Viper, pgOrNil ...interfaces.DB) ( } // Connect to PG -func (c *PGClient) Connect(prefix string, db interfaces.DB) error { - user := c.Config.GetString(fmt.Sprintf("%s.user", prefix)) - pass := c.Config.GetString(fmt.Sprintf("%s.pass", prefix)) - host := c.Config.GetString(fmt.Sprintf("%s.host", prefix)) - database := c.Config.GetString(fmt.Sprintf("%s.database", prefix)) - port := c.Config.GetInt(fmt.Sprintf("%s.port", prefix)) - poolSize := c.Config.GetInt(fmt.Sprintf("%s.poolSize", prefix)) - maxRetries := c.Config.GetInt(fmt.Sprintf("%s.maxRetries", prefix)) +func (c *PGClient) Connect(db interfaces.DB) error { + user := c.Config.GetString("extensions.pg.user") + pass := c.Config.GetString("extensions.pg.pass") + host := c.Config.GetString("extensions.pg.host") + database := c.Config.GetString("extensions.pg.database") + port := c.Config.GetInt("extensions.pg.port") + poolSize := c.Config.GetInt("extensions.pg.poolSize") + maxRetries := c.Config.GetInt("extensions.pg.maxRetries") c.Options = &pg.Options{ Addr: fmt.Sprintf("%s:%d", host, port), diff --git a/statsd.go b/statsd.go index 9bf8db1..ab9bff2 100644 --- a/statsd.go +++ b/statsd.go @@ -53,17 +53,17 @@ func NewStatsD(config *viper.Viper, logger *logrus.Logger, clientOrNil ...interf } func (s *StatsD) loadConfigurationDefaults() { - s.Config.SetDefault("stats.statsd.host", "localhost:8125") - s.Config.SetDefault("stats.statsd.prefix", "test") - s.Config.SetDefault("stats.statsd.flushIntervalMs", 5000) + s.Config.SetDefault("extensions.statsd.host", "localhost:8125") + s.Config.SetDefault("extensions.statsd.prefix", "test") + s.Config.SetDefault("extensions.statsd.flushIntervalMs", 5000) } func (s *StatsD) configure(client interfaces.StatsDClient) error { s.loadConfigurationDefaults() - host := s.Config.GetString("stats.statsd.host") - prefix := s.Config.GetString("stats.statsd.prefix") - flushIntervalMs := s.Config.GetInt("stats.statsd.flushIntervalMs") + host := s.Config.GetString("extensions.statsd.host") + prefix := s.Config.GetString("extensions.statsd.prefix") + flushIntervalMs := s.Config.GetInt("extensions.statsd.flushIntervalMs") flushPeriod := time.Duration(flushIntervalMs) * time.Millisecond l := s.Logger.WithFields(logrus.Fields{