diff --git a/README.md b/README.md index 277cfc2..eb9a3ab 100644 --- a/README.md +++ b/README.md @@ -212,6 +212,16 @@ If `ruby-kafka` doesn't fit your kafka environment, check `rdkafka2` plugin inst partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32' share_producer (bool) :default => false + # If you intend to rely on AWS IAM auth to MSK with long lived credentials + # https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html + # + # For AWS STS support, see status in + # - https://github.com/zendesk/ruby-kafka/issues/944 + # - https://github.com/zendesk/ruby-kafka/pull/951 + sasl_aws_msk_iam_access_key_id (string) :default => nil + sasl_aws_msk_iam_secret_key_id (string) :default => nil + sasl_aws_msk_iam_aws_region (string) :default => nil + @type (json|ltsv|msgpack|attr:|) :default => json diff --git a/fluent-plugin-kafka.gemspec b/fluent-plugin-kafka.gemspec index 3763922..d3b1eea 100644 --- a/fluent-plugin-kafka.gemspec +++ b/fluent-plugin-kafka.gemspec @@ -18,7 +18,7 @@ Gem::Specification.new do |gem| gem.add_dependency "fluentd", [">= 0.10.58", "< 2"] gem.add_dependency 'ltsv' - gem.add_dependency 'ruby-kafka', '>= 1.4.0', '< 2' + gem.add_dependency 'ruby-kafka', '>= 1.5.0', '< 2' gem.add_development_dependency "rake", ">= 0.9.2" gem.add_development_dependency "test-unit", ">= 3.0.8" gem.add_development_dependency "test-unit-rr", "~> 1.0" diff --git a/lib/fluent/plugin/kafka_plugin_util.rb b/lib/fluent/plugin/kafka_plugin_util.rb index 1695111..6569f61 100644 --- a/lib/fluent/plugin/kafka_plugin_util.rb +++ b/lib/fluent/plugin/kafka_plugin_util.rb @@ -1,5 +1,18 @@ module Fluent module KafkaPluginUtil + module AwsIamSettings + def self.included(klass) + klass.instance_eval do + config_param :sasl_aws_msk_iam_access_key_id, :string, :default => nil, secret: true, + desc: "AWS access key Id for IAM authentication to MSK." + config_param :sasl_aws_msk_iam_secret_key_id, :string, :default => nil, secret: true, + desc: "AWS access key secret for IAM authentication to MSK." + config_param :sasl_aws_msk_iam_aws_region, :string, :default => nil, + desc: "AWS region for IAM authentication to MSK." + end + end + end + module SSLSettings def self.included(klass) klass.instance_eval { diff --git a/lib/fluent/plugin/out_kafka2.rb b/lib/fluent/plugin/out_kafka2.rb index 9f17c09..0402b99 100644 --- a/lib/fluent/plugin/out_kafka2.rb +++ b/lib/fluent/plugin/out_kafka2.rb @@ -95,6 +95,7 @@ class Fluent::Kafka2Output < Output config_set_default :@type, 'json' end + include Fluent::KafkaPluginUtil::AwsIamSettings include Fluent::KafkaPluginUtil::SSLSettings include Fluent::KafkaPluginUtil::SaslSettings @@ -113,6 +114,7 @@ def initialize def refresh_client(raise_error = true) begin logger = @get_kafka_client_log ? log : nil + use_long_lived_aws_credentials = @sasl_aws_msk_iam_access_key_id != nil && @sasl_aws_msk_iam_secret_key_id != nil if @scram_mechanism != nil && @username != nil && @password != nil @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert, ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain), @@ -125,6 +127,26 @@ def refresh_client(raise_error = true) ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname, resolve_seed_brokers: @resolve_seed_brokers, partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function)) + elsif use_long_lived_aws_credentials + @kafka = Kafka.new( + seed_brokers: @seed_brokers, + client_id: @client_id, + logger: logger, + connect_timeout: @connect_timeout, + socket_timeout: @socket_timeout, + ssl_ca_cert_file_path: @ssl_ca_cert, + ssl_client_cert: read_ssl_file(@ssl_client_cert), + ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), + ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain), + ssl_ca_certs_from_system: @ssl_ca_certs_from_system, + sasl_over_ssl: @sasl_over_ssl, + ssl_verify_hostname: @ssl_verify_hostname, + resolve_seed_brokers: @resolve_seed_brokers, + sasl_aws_msk_iam_access_key_id: @sasl_aws_msk_iam_access_key_id, + sasl_aws_msk_iam_secret_key_id: @sasl_aws_msk_iam_secret_key_id, + sasl_aws_msk_iam_aws_region: @sasl_aws_msk_iam_aws_region, + partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function) + ) else @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert, ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),