Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(out_kafka2): adds support for AWS IAM authentication to MSK usin… #481

Merged
merged 1 commit into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,16 @@ If `ruby-kafka` doesn't fit your kafka environment, check `rdkafka2` plugin inst
partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32'
share_producer (bool) :default => false

# If you intend to rely on AWS IAM auth to MSK with long lived credentials
# https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html
#
# For AWS STS support, see status in
# - https://github.com/zendesk/ruby-kafka/issues/944
# - https://github.com/zendesk/ruby-kafka/pull/951
sasl_aws_msk_iam_access_key_id (string) :default => nil
sasl_aws_msk_iam_secret_key_id (string) :default => nil
sasl_aws_msk_iam_aws_region (string) :default => nil

<format>
@type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
</format>
Expand Down
2 changes: 1 addition & 1 deletion fluent-plugin-kafka.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Gem::Specification.new do |gem|

gem.add_dependency "fluentd", [">= 0.10.58", "< 2"]
gem.add_dependency 'ltsv'
gem.add_dependency 'ruby-kafka', '>= 1.4.0', '< 2'
gem.add_dependency 'ruby-kafka', '>= 1.5.0', '< 2'
gem.add_development_dependency "rake", ">= 0.9.2"
gem.add_development_dependency "test-unit", ">= 3.0.8"
gem.add_development_dependency "test-unit-rr", "~> 1.0"
Expand Down
13 changes: 13 additions & 0 deletions lib/fluent/plugin/kafka_plugin_util.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
module Fluent
module KafkaPluginUtil
module AwsIamSettings
def self.included(klass)
klass.instance_eval do
config_param :sasl_aws_msk_iam_access_key_id, :string, :default => nil, secret: true,
desc: "AWS access key Id for IAM authentication to MSK."
config_param :sasl_aws_msk_iam_secret_key_id, :string, :default => nil, secret: true,
desc: "AWS access key secret for IAM authentication to MSK."
config_param :sasl_aws_msk_iam_aws_region, :string, :default => nil,
desc: "AWS region for IAM authentication to MSK."
end
end
end

module SSLSettings
def self.included(klass)
klass.instance_eval {
Expand Down
22 changes: 22 additions & 0 deletions lib/fluent/plugin/out_kafka2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class Fluent::Kafka2Output < Output
config_set_default :@type, 'json'
end

include Fluent::KafkaPluginUtil::AwsIamSettings
include Fluent::KafkaPluginUtil::SSLSettings
include Fluent::KafkaPluginUtil::SaslSettings

Expand All @@ -113,6 +114,7 @@ def initialize
def refresh_client(raise_error = true)
begin
logger = @get_kafka_client_log ? log : nil
use_long_lived_aws_credentials = @sasl_aws_msk_iam_access_key_id != nil && @sasl_aws_msk_iam_secret_key_id != nil
if @scram_mechanism != nil && @username != nil && @password != nil
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
Expand All @@ -125,6 +127,26 @@ def refresh_client(raise_error = true)
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl,
ssl_verify_hostname: @ssl_verify_hostname, resolve_seed_brokers: @resolve_seed_brokers,
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
elsif use_long_lived_aws_credentials
@kafka = Kafka.new(
seed_brokers: @seed_brokers,
client_id: @client_id,
logger: logger,
connect_timeout: @connect_timeout,
socket_timeout: @socket_timeout,
ssl_ca_cert_file_path: @ssl_ca_cert,
ssl_client_cert: read_ssl_file(@ssl_client_cert),
ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
sasl_over_ssl: @sasl_over_ssl,
ssl_verify_hostname: @ssl_verify_hostname,
resolve_seed_brokers: @resolve_seed_brokers,
sasl_aws_msk_iam_access_key_id: @sasl_aws_msk_iam_access_key_id,
sasl_aws_msk_iam_secret_key_id: @sasl_aws_msk_iam_secret_key_id,
sasl_aws_msk_iam_aws_region: @sasl_aws_msk_iam_aws_region,
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function)
)
else
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
Expand Down