Skip to content

Commit

Permalink
fix: code review comments, renaming client to consumer, modifying con…
Browse files Browse the repository at this point in the history
…f parameters to fill only with prefix params
  • Loading branch information
hlegarda committed Oct 23, 2024
1 parent 22360e4 commit 7f50d55
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 23 deletions.
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,25 +91,25 @@ The table below describes all the available configuration values for Drone Fly.
| endpoint.port | Port on which Drone Fly Spring Boot app will start. | `string` | `8008` | no |

### Additional configuration parameters
The Kafka message reader supports client properties that are passed to the Kafka consumer builder.
These are environment variables with the PREFIX apiary.messaging.client.
The Kafka message reader supports properties that are passed to the Kafka consumer builder.
These are environment variables with the PREFIX apiary.messaging.consumer.

#### Example for sending client parameters when using a Kafka cloud provider
- apiary.messaging.client.security.protocol=SSL
- apiary.messaging.client.sasl.mechanism=AWS_MSK_IAM
- apiary.messaging.client.sasl_jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
- apiary.messaging.client.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
#### Example for sending consumer parameters when using a Kafka cloud provider
- apiary.messaging.consumer.security.protocol=SSL
- apiary.messaging.consumer.sasl.mechanism=AWS_MSK_IAM
- apiary.messaging.consumer.sasl_jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
- apiary.messaging.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

In this case we are sending the properties to Kafka's consumer to be able to connect to AWS MSK which also requires the IAM library included as a dependency in the POM.xml file

java -Dloader.path=lib/ -jar drone-fly-app-<version>-exec.jar \
--apiary.bootstrap.servers=localhost:9092 \
--apiary.kafka.topic.name=apiary \
--apiary.listener.list="com.expediagroup.sampleListener1,com.expediagroup.sampleListener2" \
--apiary.messaging.client.security.protocol=SSL \
--apiary.messaging.client.sasl.mechanism=AWS_MSK_IAM \
--apiary.messaging.client.sasl_jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; \
--apiary.messaging.client.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
--apiary.messaging.consumer.security.protocol=SSL \
--apiary.messaging.consumer.sasl.mechanism=AWS_MSK_IAM \
--apiary.messaging.consumer.sasl_jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; \
--apiary.messaging.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

## Metrics

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.Properties;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
import org.apache.hadoop.hive.metastore.api.MetaException;
Expand All @@ -40,7 +39,7 @@
@Configuration
public class CommonBeans {
private static final Logger log = LoggerFactory.getLogger(CommonBeans.class);
public static final String PREFIX = "apiary.messaging.client.";
public static final String CONSUMER_PROPERTIES_PREFIX = "apiary.messaging.consumer";

@Value("${instance.name:drone-fly}")
private String instanceName;
Expand All @@ -61,7 +60,7 @@ public HiveConf hiveConf() {

@Bean
@Primary
@ConfigurationProperties
@ConfigurationProperties(CONSUMER_PROPERTIES_PREFIX)
public Properties getEnvProperties() {
return new Properties();
}
Expand All @@ -77,24 +76,21 @@ public ListenerCatalog listenerCatalog(HiveConf conf) throws MetaException {

@Bean
public MessageReaderAdapter messageReaderAdapter() {
Properties clientProperties = getClientProperties();
Properties clientProperties = getConsumerProperties();
KafkaMessageReader delegate = KafkaMessageReaderBuilder.
builder(bootstrapServers, topicName, instanceName).
withConsumerProperties(clientProperties).
build();
return new MessageReaderAdapter(delegate);
}

private Properties getClientProperties() {
Properties clientProperties = new Properties();
private Properties getConsumerProperties() {
Properties consumerProperties = new Properties();
getEnvProperties().forEach((key, value) -> {
if (key.toString().startsWith(PREFIX)) {
String keyWithoutPrefix = StringUtils.replace(key.toString(), PREFIX, "");
clientProperties.put(keyWithoutPrefix, value.toString());
log.info("Client property {} set with value: {}", keyWithoutPrefix, value);
}
consumerProperties.put(key.toString(), value.toString());
log.info("Client property {} set with value: {}", key, value);
} );
return clientProperties;
return consumerProperties;
}

}

0 comments on commit 7f50d55

Please sign in to comment.