Skip to content

Commit

Permalink
config should be less specific
Browse files Browse the repository at this point in the history
  • Loading branch information
felipejfc committed Jan 24, 2017
1 parent 58d93dc commit ad48c9e
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 87 deletions.
45 changes: 23 additions & 22 deletions config/test.yaml
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
---
queue:
topics:
- "com.games.test"
brokers: "localhost:9941"
group: testGroup
sessionTimeout: 6000
offsetResetStrategy: latest
handleAllMessagesBeforeExiting: true
statsd:
host: "localhost:40001"
prefix: "push"
flushIntervalMs: 5000
pg:
table: "test_table"
host: localhost
port: 8585
user: pg_user
pass: ""
poolSize: 20
maxRetries: 3
database: test_db
connectionTimeout: 100
extensions:
kafkaconsumer:
topics:
- "com.games.test"
brokers: "localhost:9941"
group: testGroup
sessionTimeout: 6000
offsetResetStrategy: latest
handleAllMessagesBeforeExiting: true
statsd:
host: "localhost:40001"
prefix: "push"
flushIntervalMs: 5000
pg:
table: "test_table"
host: localhost
port: 8585
user: pg_user
pass: ""
poolSize: 20
maxRetries: 3
database: test_db
connectionTimeout: 100
30 changes: 15 additions & 15 deletions extensions.coverprofile
Original file line number Diff line number Diff line change
Expand Up @@ -58,28 +58,28 @@ github.com/topfreegames/extensions/kafka_consumer.go:313.17,315.4 1 1
github.com/topfreegames/extensions/kafka_producer.go:43.135,49.27 3 2
github.com/topfreegames/extensions/kafka_producer.go:52.2,53.15 2 2
github.com/topfreegames/extensions/kafka_producer.go:49.27,51.3 1 2
github.com/topfreegames/extensions/kafka_producer.go:56.53,59.2 2 2
github.com/topfreegames/extensions/kafka_producer.go:61.82,72.21 6 2
github.com/topfreegames/extensions/kafka_producer.go:82.2,84.12 3 2
github.com/topfreegames/extensions/kafka_producer.go:72.21,75.17 3 0
github.com/topfreegames/extensions/kafka_producer.go:75.17,78.4 2 0
github.com/topfreegames/extensions/kafka_producer.go:79.3,81.3 1 2
github.com/topfreegames/extensions/kafka_producer.go:87.51,91.37 2 2
github.com/topfreegames/extensions/kafka_producer.go:91.37,92.25 1 2
github.com/topfreegames/extensions/kafka_producer.go:93.3,95.37 2 2
github.com/topfreegames/extensions/kafka_producer.go:108.4,108.9 1 2
github.com/topfreegames/extensions/kafka_producer.go:109.3,110.65 1 0
github.com/topfreegames/extensions/kafka_producer.go:95.37,101.5 2 0
github.com/topfreegames/extensions/kafka_producer.go:101.5,107.5 1 2
github.com/topfreegames/extensions/kafka_producer.go:116.67,125.2 2 1
github.com/topfreegames/extensions/kafka_producer.go:56.53,58.2 1 2
github.com/topfreegames/extensions/kafka_producer.go:60.82,71.21 6 2
github.com/topfreegames/extensions/kafka_producer.go:81.2,83.12 3 2
github.com/topfreegames/extensions/kafka_producer.go:71.21,74.17 3 0
github.com/topfreegames/extensions/kafka_producer.go:74.17,77.4 2 0
github.com/topfreegames/extensions/kafka_producer.go:78.3,80.3 1 2
github.com/topfreegames/extensions/kafka_producer.go:86.51,90.37 2 2
github.com/topfreegames/extensions/kafka_producer.go:90.37,91.25 1 2
github.com/topfreegames/extensions/kafka_producer.go:92.3,94.37 2 2
github.com/topfreegames/extensions/kafka_producer.go:107.4,107.9 1 2
github.com/topfreegames/extensions/kafka_producer.go:108.3,109.65 1 0
github.com/topfreegames/extensions/kafka_producer.go:94.37,100.5 2 0
github.com/topfreegames/extensions/kafka_producer.go:100.5,106.5 1 2
github.com/topfreegames/extensions/kafka_producer.go:115.67,124.2 2 1
github.com/topfreegames/extensions/pg.go:43.99,49.23 3 10
github.com/topfreegames/extensions/pg.go:52.2,53.16 2 10
github.com/topfreegames/extensions/pg.go:57.2,59.16 3 10
github.com/topfreegames/extensions/pg.go:63.2,63.20 1 10
github.com/topfreegames/extensions/pg.go:49.23,51.3 1 10
github.com/topfreegames/extensions/pg.go:53.16,55.3 1 0
github.com/topfreegames/extensions/pg.go:59.16,61.3 1 0
github.com/topfreegames/extensions/pg.go:67.67,85.15 9 10
github.com/topfreegames/extensions/pg.go:67.52,85.15 9 10
github.com/topfreegames/extensions/pg.go:91.2,91.12 1 10
github.com/topfreegames/extensions/pg.go:85.15,87.3 1 0
github.com/topfreegames/extensions/pg.go:87.3,89.3 1 10
Expand Down
28 changes: 14 additions & 14 deletions kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,23 +76,23 @@ func NewKafkaConsumer(
}

func (q *KafkaConsumer) loadConfigurationDefaults() {
q.Config.SetDefault("queue.topics", []string{"com.games.test"})
q.Config.SetDefault("queue.brokers", "localhost:9092")
q.Config.SetDefault("queue.channelSize", 100)
q.Config.SetDefault("queue.group", "test")
q.Config.SetDefault("queue.sessionTimeout", 6000)
q.Config.SetDefault("queue.offsetResetStrategy", "latest")
q.Config.SetDefault("queue.handleAllMessagesBeforeExiting", true)
q.Config.SetDefault("extensions.kafkaconsumer.topics", []string{"com.games.test"})
q.Config.SetDefault("extensions.kafkaconsumer.brokers", "localhost:9092")
q.Config.SetDefault("extensions.kafkaconsumer.channelSize", 100)
q.Config.SetDefault("extensions.kafkaconsumer.group", "test")
q.Config.SetDefault("extensions.kafkaconsumer.sessionTimeout", 6000)
q.Config.SetDefault("extensions.kafkaconsumer.offsetResetStrategy", "latest")
q.Config.SetDefault("extensions.kafkaconsumer.handleAllMessagesBeforeExiting", true)
}

func (q *KafkaConsumer) configure(client interfaces.KafkaConsumerClient) error {
q.OffsetResetStrategy = q.Config.GetString("queue.offsetResetStrategy")
q.Brokers = q.Config.GetString("queue.brokers")
q.ConsumerGroup = q.Config.GetString("queue.group")
q.SessionTimeout = q.Config.GetInt("queue.sessionTimeout")
q.Topics = q.Config.GetStringSlice("queue.topics")
q.ChannelSize = q.Config.GetInt("queue.channelSize")
q.HandleAllMessagesBeforeExiting = q.Config.GetBool("queue.handleAllMessagesBeforeExiting")
q.OffsetResetStrategy = q.Config.GetString("extensions.kafkaconsumer.offsetResetStrategy")
q.Brokers = q.Config.GetString("extensions.kafkaconsumer.brokers")
q.ConsumerGroup = q.Config.GetString("extensions.kafkaconsumer.group")
q.SessionTimeout = q.Config.GetInt("extensions.kafkaconsumer.sessionTimeout")
q.Topics = q.Config.GetStringSlice("extensions.kafkaconsumer.topics")
q.ChannelSize = q.Config.GetInt("extensions.kafkaconsumer.channelSize")
q.HandleAllMessagesBeforeExiting = q.Config.GetBool("extensions.kafkaconsumer.handleAllMessagesBeforeExiting")

q.msgChan = make(chan []byte, q.ChannelSize)

Expand Down
34 changes: 17 additions & 17 deletions kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ var _ = Describe("Kafka Extension", func() {
BeforeEach(func() {
kafkaConsumerClientMock = mocks.NewKafkaConsumerClientMock()
config := viper.New()
config.Set("queue.topics", []string{"com.games.test"})
config.Set("queue.brokers", "localhost:9941")
config.Set("queue.group", "testGroup")
config.Set("queue.sessionTimeout", 6000)
config.Set("queue.offsetResetStrategy", "latest")
config.Set("queue.handleAllMessagesBeforeExiting", true)
config.Set("extensions.kafkaconsumer.topics", []string{"com.games.test"})
config.Set("extensions.kafkaconsumer.brokers", "localhost:9941")
config.Set("extensions.kafkaconsumer.group", "testGroup")
config.Set("extensions.kafkaconsumer.sessionTimeout", 6000)
config.Set("extensions.kafkaconsumer.offsetResetStrategy", "latest")
config.Set("extensions.kafkaconsumer.handleAllMessagesBeforeExiting", true)

var err error
consumer, err = NewKafkaConsumer(config, logger, kafkaConsumerClientMock)
Expand Down Expand Up @@ -112,7 +112,7 @@ var _ = Describe("Kafka Extension", func() {
})

It("should assign partition", func() {
topic := consumer.Config.GetStringSlice("queue.topics")[0]
topic := consumer.Config.GetStringSlice("extensions.kafkaconsumer.topics")[0]
startConsuming()
defer consumer.StopConsuming()
part := kafka.TopicPartition{
Expand All @@ -128,7 +128,7 @@ var _ = Describe("Kafka Extension", func() {
})

It("should log error if fails to assign partition", func() {
topic := consumer.Config.GetStringSlice("queue.topics")[0]
topic := consumer.Config.GetStringSlice("extensions.kafkaconsumer.topics")[0]
startConsuming()
defer consumer.StopConsuming()

Expand All @@ -149,7 +149,7 @@ var _ = Describe("Kafka Extension", func() {
})

It("should revoke partitions", func() {
topic := consumer.Config.GetStringSlice("queue.topics")[0]
topic := consumer.Config.GetStringSlice("extensions.kafkaconsumer.topics")[0]
startConsuming()
defer consumer.StopConsuming()
part := kafka.TopicPartition{
Expand All @@ -165,7 +165,7 @@ var _ = Describe("Kafka Extension", func() {
})

It("should stop loop if fails to revoke partitions", func() {
topic := consumer.Config.GetStringSlice("queue.topics")[0]
topic := consumer.Config.GetStringSlice("extensions.kafkaconsumer.topics")[0]
startConsuming()
defer consumer.StopConsuming()
part := kafka.TopicPartition{
Expand All @@ -182,7 +182,7 @@ var _ = Describe("Kafka Extension", func() {
})

It("should receive message", func() {
topic := consumer.Config.GetStringSlice("queue.topics")[0]
topic := consumer.Config.GetStringSlice("extensions.kafkaconsumer.topics")[0]
startConsuming()
defer consumer.StopConsuming()
part := kafka.TopicPartition{
Expand Down Expand Up @@ -247,12 +247,12 @@ var _ = Describe("Kafka Extension", func() {
Expect(err).NotTo(HaveOccurred())
cons.loadConfigurationDefaults()

Expect(cnf.GetStringSlice("queue.topics")).To(Equal([]string{"com.games.test"}))
Expect(cnf.GetString("queue.brokers")).To(Equal("localhost:9092"))
Expect(cnf.GetString("queue.group")).To(Equal("test"))
Expect(cnf.GetInt("queue.sessionTimeout")).To(Equal(6000))
Expect(cnf.GetString("queue.offsetResetStrategy")).To(Equal("latest"))
Expect(cnf.GetBool("queue.handleAllMessagesBeforeExiting")).To(BeTrue())
Expect(cnf.GetStringSlice("extensions.kafkaconsumer.topics")).To(Equal([]string{"com.games.test"}))
Expect(cnf.GetString("extensions.kafkaconsumer.brokers")).To(Equal("localhost:9092"))
Expect(cnf.GetString("extensions.kafkaconsumer.group")).To(Equal("test"))
Expect(cnf.GetInt("extensions.kafkaconsumer.sessionTimeout")).To(Equal(6000))
Expect(cnf.GetString("extensions.kafkaconsumer.offsetResetStrategy")).To(Equal("latest"))
Expect(cnf.GetBool("extensions.kafkaconsumer.handleAllMessagesBeforeExiting")).To(BeTrue())
})
})

Expand Down
5 changes: 2 additions & 3 deletions kafka_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,12 @@ func NewKafkaProducer(config *viper.Viper, logger *log.Logger, clientOrNil ...in
}

func (q *KafkaProducer) loadConfigurationDefaults() {
q.Config.SetDefault("feedback.kafka.topic", "com.games.test.feedbacks")
q.Config.SetDefault("feedback.kafka.brokers", "localhost:9941")
q.Config.SetDefault("extensions.kafkaproducer.brokers", "localhost:9941")
}

func (q *KafkaProducer) configure(producer interfaces.KafkaProducerClient) error {
q.loadConfigurationDefaults()
q.Brokers = q.Config.GetString("feedback.kafka.brokers")
q.Brokers = q.Config.GetString("extensions.kafkaproducer.brokers")
c := &kafka.ConfigMap{
"bootstrap.servers": q.Brokers,
}
Expand Down
20 changes: 10 additions & 10 deletions pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ func NewPGClient(prefix string, config *viper.Viper, pgOrNil ...interfaces.DB) (
if len(pgOrNil) == 1 {
db = pgOrNil[0]
}
err := client.Connect(prefix, db)
err := client.Connect(db)
if err != nil {
return nil, err
}

timeout := config.GetInt(fmt.Sprintf("%s.connectionTimeout", prefix))
timeout := config.GetInt("extensions.pg.connectionTimeout")
err = client.WaitForConnection(timeout)
if err != nil {
return nil, err
Expand All @@ -64,14 +64,14 @@ func NewPGClient(prefix string, config *viper.Viper, pgOrNil ...interfaces.DB) (
}

// Connect to PG
func (c *PGClient) Connect(prefix string, db interfaces.DB) error {
user := c.Config.GetString(fmt.Sprintf("%s.user", prefix))
pass := c.Config.GetString(fmt.Sprintf("%s.pass", prefix))
host := c.Config.GetString(fmt.Sprintf("%s.host", prefix))
database := c.Config.GetString(fmt.Sprintf("%s.database", prefix))
port := c.Config.GetInt(fmt.Sprintf("%s.port", prefix))
poolSize := c.Config.GetInt(fmt.Sprintf("%s.poolSize", prefix))
maxRetries := c.Config.GetInt(fmt.Sprintf("%s.maxRetries", prefix))
func (c *PGClient) Connect(db interfaces.DB) error {
user := c.Config.GetString("extensions.pg.user")
pass := c.Config.GetString("extensions.pg.pass")
host := c.Config.GetString("extensions.pg.host")
database := c.Config.GetString("extensions.pg.database")
port := c.Config.GetInt("extensions.pg.port")
poolSize := c.Config.GetInt("extensions.pg.poolSize")
maxRetries := c.Config.GetInt("extensions.pg.maxRetries")

c.Options = &pg.Options{
Addr: fmt.Sprintf("%s:%d", host, port),
Expand Down
12 changes: 6 additions & 6 deletions statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,17 @@ func NewStatsD(config *viper.Viper, logger *logrus.Logger, clientOrNil ...interf
}

func (s *StatsD) loadConfigurationDefaults() {
s.Config.SetDefault("stats.statsd.host", "localhost:8125")
s.Config.SetDefault("stats.statsd.prefix", "test")
s.Config.SetDefault("stats.statsd.flushIntervalMs", 5000)
s.Config.SetDefault("extensions.statsd.host", "localhost:8125")
s.Config.SetDefault("extensions.statsd.prefix", "test")
s.Config.SetDefault("extensions.statsd.flushIntervalMs", 5000)
}

func (s *StatsD) configure(client interfaces.StatsDClient) error {
s.loadConfigurationDefaults()

host := s.Config.GetString("stats.statsd.host")
prefix := s.Config.GetString("stats.statsd.prefix")
flushIntervalMs := s.Config.GetInt("stats.statsd.flushIntervalMs")
host := s.Config.GetString("extensions.statsd.host")
prefix := s.Config.GetString("extensions.statsd.prefix")
flushIntervalMs := s.Config.GetInt("extensions.statsd.flushIntervalMs")
flushPeriod := time.Duration(flushIntervalMs) * time.Millisecond

l := s.Logger.WithFields(logrus.Fields{
Expand Down

0 comments on commit ad48c9e

Please sign in to comment.