Skip to content

Commit

Permalink
Merge pull request #33 from hlegarda/feature/msk-support
Browse files Browse the repository at this point in the history
feat: adding config properties map for kafka consumer
  • Loading branch information
githubjianli authored Oct 23, 2024
2 parents a4bad31 + 8c3b786 commit 2881010
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 2 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [1.0.1] - 2024-10-24
### Added
* Support for consumer properties allowing connecting to Kafka cloud provider.

## [1.0.0] - 2023-04-27
### Changed
* Upgrade `Springboot` version from `2.3.3.RELEASE` to `2.7.10`.
Expand Down
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,26 @@ The table below describes all the available configuration values for Drone Fly.
| instance.name | Instance name for a Drone Fly instance. `instance.name` is also used to derive the Kafka consumer group. Therefore, in a multi-instance deployment, a unique `instance.name` for each Drone Fly instance needs to be provided to avoid all instances ending up in the same Kafka consumer group. | `string` | `drone-fly` | no |
| endpoint.port | Port on which Drone Fly Spring Boot app will start. | `string` | `8008` | no |

### Additional configuration parameters
The Kafka message reader supports properties that are passed to the Kafka consumer builder.
These are environment variables with the PREFIX apiary.messaging.consumer.

#### Example for sending consumer parameters when using a Kafka cloud provider
- apiary.messaging.consumer.security.protocol=SSL
- apiary.messaging.consumer.sasl.mechanism=AWS_MSK_IAM
- apiary.messaging.consumer.sasl_jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
- apiary.messaging.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

In this case we are sending the properties to Kafka's consumer to be able to connect to AWS MSK which also requires the IAM library included as a dependency in the POM.xml file

java -Dloader.path=lib/ -jar drone-fly-app-<version>-exec.jar \
--apiary.bootstrap.servers=localhost:9092 \
--apiary.kafka.topic.name=apiary \
--apiary.listener.list="com.expediagroup.sampleListener1,com.expediagroup.sampleListener2" \
--apiary.messaging.consumer.security.protocol=SSL \
--apiary.messaging.consumer.sasl.mechanism=AWS_MSK_IAM \
--apiary.messaging.consumer.sasl_jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; \
--apiary.messaging.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

## Metrics

Expand Down
5 changes: 5 additions & 0 deletions drone-fly-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.expediagroup.dataplatform.dronefly.app.context;

import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -24,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

Expand All @@ -32,10 +34,12 @@
import com.expediagroup.dataplatform.dronefly.app.messaging.MessageReaderAdapter;
import com.expediagroup.dataplatform.dronefly.app.service.ListenerCatalog;
import com.expediagroup.dataplatform.dronefly.app.service.factory.ListenerCatalogFactory;
import org.springframework.context.annotation.Primary;

@Configuration
public class CommonBeans {
private static final Logger log = LoggerFactory.getLogger(CommonBeans.class);
public static final String CONSUMER_PROPERTIES_PREFIX = "apiary.messaging.consumer";

@Value("${instance.name:drone-fly}")
private String instanceName;
Expand All @@ -54,6 +58,13 @@ public HiveConf hiveConf() {
return new HiveConf();
}

@Bean
@Primary
@ConfigurationProperties(CONSUMER_PROPERTIES_PREFIX)
public Properties getEnvProperties() {
return new Properties();
}

@Bean
public ListenerCatalog listenerCatalog(HiveConf conf) throws MetaException {
ListenerCatalog listenerCatalog = new ListenerCatalogFactory(conf).newInstance(confListenerList);
Expand All @@ -65,8 +76,21 @@ public ListenerCatalog listenerCatalog(HiveConf conf) throws MetaException {

@Bean
public MessageReaderAdapter messageReaderAdapter() {
KafkaMessageReader delegate = KafkaMessageReaderBuilder.builder(bootstrapServers, topicName, instanceName).build();
Properties consumerProperties = getConsumerProperties();
KafkaMessageReader delegate = KafkaMessageReaderBuilder.
builder(bootstrapServers, topicName, instanceName).
withConsumerProperties(consumerProperties).
build();
return new MessageReaderAdapter(delegate);
}

}
private Properties getConsumerProperties() {
Properties consumerProperties = new Properties();
getEnvProperties().forEach((key, value) -> {
consumerProperties.put(key.toString(), value.toString());
log.info("Consumer property {} set with value: {}", key, value);
} );
return consumerProperties;
}

}
5 changes: 5 additions & 0 deletions drone-fly-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,10 @@
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
</project>

0 comments on commit 2881010

Please sign in to comment.