diff --git a/lib/fluent/plugin/in_s3.rb b/lib/fluent/plugin/in_s3.rb index 5c54298a..1244b49d 100644 --- a/lib/fluent/plugin/in_s3.rb +++ b/lib/fluent/plugin/in_s3.rb @@ -76,8 +76,8 @@ def initialize desc "Profile name. Default to 'default' or ENV['AWS_PROFILE']" config_param :profile_name, :string, default: nil end - desc "S3 bucket name" - config_param :s3_bucket, :string + desc "S3 bucket name(s) separated by commas in case of multiple bucket names" + config_param :s3_buckets, :string desc "S3 region name" config_param :s3_region, :string, default: ENV["AWS_REGION"] || "us-east-1" desc "Use 's3_region' instead" @@ -96,6 +96,8 @@ def initialize config_param :queue_name, :string, default: nil desc "SQS Owner Account ID" config_param :queue_owner_aws_account_id, :string, default: nil + desc "SQS queue url, when passed it'll not get the queue URL by name & account ID" + config_param :queue_url, :string, default: nil desc "Use 's3_region' instead" config_param :endpoint, :string, default: nil desc "Skip message deletion" @@ -106,6 +108,7 @@ def initialize config_param :retry_error_interval, :integer, default: 300 end + # Default tag will include input.s3.bucket_name ###Check the function process(body) desc "Tag string" config_param :tag, :string, default: "input.s3" @@ -145,16 +148,29 @@ def start s3_client = create_s3_client log.debug("Succeeded to create S3 client") @s3 = Aws::S3::Resource.new(client: s3_client) - @bucket = @s3.bucket(@s3_bucket) - - raise "#{@bucket.name} is not found." unless @bucket.exists? + @buckets = {} + if (@s3_buckets.include?(",")) + splitted_buckets = @s3_buckets.split(',') + splitted_buckets.each do | bucket | + @buckets[bucket] = @s3.bucket(bucket) + raise "#{bucket} is not found." unless @buckets[bucket].exists? + end + else + @buckets[@s3_buckets] = @s3.bucket(@s3_buckets) + raise "#{@s3_buckets} is not found." unless @buckets[@s3_buckets].exists? + end check_apikeys if @check_apikey_on_start sqs_client = create_sqs_client log.debug("Succeeded to create SQS client") - response = sqs_client.get_queue_url(queue_name: @sqs.queue_name, queue_owner_aws_account_id: @sqs.queue_owner_aws_account_id) - sqs_queue_url = response.queue_url + sqs_queue_url = nil + if (@sqs.queue_url.nil?) + response = sqs_client.get_queue_url(queue_name: @sqs.queue_name, queue_owner_aws_account_id: @sqs.queue_owner_aws_account_id) + sqs_queue_url = response.queue_url + else + sqs_queue_url = @sqs.queue_url + end log.debug("Succeeded to get SQS queue URL") @poller = Aws::SQS::QueuePoller.new(sqs_queue_url, client: sqs_client) @@ -279,8 +295,10 @@ def create_sqs_client end def check_apikeys - @bucket.objects.first - log.debug("Succeeded to verify API keys") + @buckets.each do | bucket_name, bucket_object | + bucket_object.objects.first + log.debug("Succeeded to verify API keys for bucket #{bucket_name}") + end rescue => e raise "can't call S3 API. Please check your credentials or s3_region configuration. error = #{e.inspect}" end @@ -288,20 +306,28 @@ def check_apikeys def process(body) s3 = body["Records"].first["s3"] raw_key = s3["object"]["key"] + raw_bucket_name = s3["bucket"]["name"] key = CGI.unescape(raw_key) - io = @bucket.object(key).get.body + if (!@buckets.key?(raw_bucket_name)) + raise "S3 bucket name: #{raw_bucket_name} returned from SQS was not provided in the input configuration as one of the s3 fluentd sources." + end + + io = @buckets[raw_bucket_name].object(key).get.body content = @extractor.extract(io) es = Fluent::MultiEventStream.new content.each_line do |line| @parser.parse(line) do |time, record| if @add_object_metadata - record['s3_bucket'] = @s3_bucket + record['s3_bucket'] = raw_bucket_name record['s3_key'] = raw_key end es.add(time, record) end end + if (@tag == "input.s3") + @tag = "input.s3.#{raw_bucket_name}" + end router.emit_stream(@tag, es) end diff --git a/test/test_in_s3.rb b/test/test_in_s3.rb index 2c715dd0..6995200c 100644 --- a/test/test_in_s3.rb +++ b/test/test_in_s3.rb @@ -28,7 +28,7 @@ def setup CONFIG = %[ aws_key_id test_key_id aws_sec_key test_sec_key - s3_bucket test_bucket + s3_buckets test_bucket buffer_type memory queue_name test_queue @@ -47,7 +47,7 @@ def test_default actual = { aws_key_id: d.instance.aws_key_id, aws_sec_key: d.instance.aws_sec_key, - s3_bucket: d.instance.s3_bucket, + s3_buckets: d.instance.s3_buckets, s3_region: d.instance.s3_region, sqs_queue_name: d.instance.sqs.queue_name, extractor_ext: extractor.ext, @@ -56,7 +56,7 @@ def test_default expected = { aws_key_id: "test_key_id", aws_sec_key: "test_sec_key", - s3_bucket: "test_bucket", + s3_buckets: "test_bucket", s3_region: "us-east-1", sqs_queue_name: "test_queue", extractor_ext: "gz", @@ -65,6 +65,77 @@ def test_default assert_equal(expected, actual) end + def test_with_multiple_buckets + conf = %[ + aws_key_id test_key_id + aws_sec_key test_sec_key + s3_buckets test_bucket1,test_bucket2 + buffer_type memory + + queue_name test_queue + queue_owner_aws_account_id 123456789123 + + ] + d = create_driver(conf) + extractor = d.instance.instance_variable_get(:@extractor) + actual = { + aws_key_id: d.instance.aws_key_id, + aws_sec_key: d.instance.aws_sec_key, + s3_buckets: d.instance.s3_buckets, + s3_region: d.instance.s3_region, + sqs_queue_name: d.instance.sqs.queue_name, + extractor_ext: extractor.ext, + extractor_content_type: extractor.content_type + } + expected = { + aws_key_id: "test_key_id", + aws_sec_key: "test_sec_key", + s3_buckets: "test_bucket1,test_bucket2", + s3_region: "us-east-1", + sqs_queue_name: "test_queue", + extractor_ext: "gz", + extractor_content_type: "application/x-gzip" + } + assert_equal(expected, actual) + end + + def test_with_multiple_buckets_and_sqs_queue_url_override + conf = %[ + aws_key_id test_key_id + aws_sec_key test_sec_key + s3_buckets test_bucket1,test_bucket2 + buffer_type memory + + queue_name test_queue + queue_owner_aws_account_id 123456789123 + queue_url https://sqs.us-east-1.amazonaws.com/345678912345/test_override_queue + + ] + d = create_driver(conf) + extractor = d.instance.instance_variable_get(:@extractor) + actual = { + aws_key_id: d.instance.aws_key_id, + aws_sec_key: d.instance.aws_sec_key, + s3_buckets: d.instance.s3_buckets, + s3_region: d.instance.s3_region, + sqs_queue_name: d.instance.sqs.queue_name, + extractor_ext: extractor.ext, + extractor_content_type: extractor.content_type, + sqs_queue_url: d.instance.sqs.queue_url + } + expected = { + aws_key_id: "test_key_id", + aws_sec_key: "test_sec_key", + s3_buckets: "test_bucket1,test_bucket2", + s3_region: "us-east-1", + sqs_queue_name: "test_queue", + extractor_ext: "gz", + extractor_content_type: "application/x-gzip", + sqs_queue_url: "https://sqs.us-east-1.amazonaws.com/345678912345/test_override_queue" + } + assert_equal(expected, actual) + end + def test_empty assert_raise(Fluent::ConfigError) do create_driver("") @@ -75,7 +146,7 @@ def test_without_sqs_section conf = %[ aws_key_id test_key_id aws_sec_key test_sec_key - s3_bucket test_bucket + s3_buckets test_bucket ] assert_raise_message("'' sections are required") do create_driver(conf) @@ -137,7 +208,7 @@ def test_sqs_endpoint_with_invalid_endpoint(endpoint) conf = <<"EOS" aws_key_id test_key_id aws_sec_key test_sec_key -s3_bucket test_bucket +s3_buckets test_bucket buffer_type memory queue_name test_queue @@ -157,7 +228,7 @@ def setup_mocks mock(Aws::S3::Client).new(anything).at_least(0) { @s3_client } @s3_resource = mock(Aws::S3::Resource.new(client: @s3_client)) mock(Aws::S3::Resource).new(client: @s3_client) { @s3_resource } - @s3_bucket = mock(Aws::S3::Bucket.new(name: "test", + @s3_bucket = mock(Aws::S3::Bucket.new(name: "test_bucket", client: @s3_client)) @s3_bucket.exists? { true } @s3_resource.bucket(anything) { @s3_bucket } @@ -207,6 +278,9 @@ def test_one_record "s3" => { "object" => { "key" => "test_key" + }, + "bucket" => { + "name"=> "test_bucket" } } } @@ -242,6 +316,9 @@ def test_one_record_with_metadata "s3" => { "object" => { "key" => "test_key" + }, + "bucket" => { + "name"=> "test_bucket" } } } @@ -277,6 +354,9 @@ def test_one_record_url_encoded "s3" => { "object" => { "key" => "test+key" + }, + "bucket" => { + "name"=> "test_bucket" } } } @@ -312,6 +392,9 @@ def test_one_record_url_encoded_with_metadata "s3" => { "object" => { "key" => "test+key" + }, + "bucket" => { + "name"=> "test_bucket" } } } @@ -347,6 +430,9 @@ def test_one_record_multi_line "s3" => { "object" => { "key" => "test_key" + }, + "bucket" => { + "name"=> "test_bucket" } } } @@ -387,6 +473,9 @@ def test_one_record_multi_line_with_metadata "s3" => { "object" => { "key" => "test_key" + }, + "bucket" => { + "name"=> "test_bucket" } } } @@ -435,6 +524,9 @@ def test_gzip_single_stream "s3" => { "object" => { "key" => "test_key" + }, + "bucket" => { + "name"=> "test_bucket" } } } @@ -486,6 +578,9 @@ def test_gzip_multiple_steams "s3" => { "object" => { "key" => "test_key" + }, + "bucket" => { + "name"=> "test_bucket" } } }