From 7f6e51c23863ba6bac53b8f11e76b16a73ff8623 Mon Sep 17 00:00:00 2001 From: ddukbg Date: Fri, 25 Oct 2024 18:42:24 +0900 Subject: [PATCH 1/3] Remove duplicate ZstdCompressor class as per maintainer's comments Signed-off-by: ddukbg --- lib/fluent/plugin/s3_compressor_zstd.rb | 37 ------------------------- 1 file changed, 37 deletions(-) delete mode 100644 lib/fluent/plugin/s3_compressor_zstd.rb diff --git a/lib/fluent/plugin/s3_compressor_zstd.rb b/lib/fluent/plugin/s3_compressor_zstd.rb deleted file mode 100644 index cb20a8c..0000000 --- a/lib/fluent/plugin/s3_compressor_zstd.rb +++ /dev/null @@ -1,37 +0,0 @@ -require 'zstd-ruby' - -module Fluent::Plugin - class S3Output - class ZstdCompressor < Compressor - S3Output.register_compressor('zstd', self) - - config_param :level, :integer, default: 3, desc: "Compression level for zstd (1-22)" - - def initialize(opts = {}) - super() - @buffer_type = opts[:buffer_type] - @log = opts[:log] - end - - def ext - 'zst'.freeze - end - - def content_type - 'application/x-zstd'.freeze - end - - def compress(chunk, tmp) - uncompressed_data = '' - chunk.open do |io| - uncompressed_data = io.read - end - compressed_data = Zstd.compress(uncompressed_data, level: @level) - tmp.write(compressed_data) - rescue => e - log.warn "zstd compression failed: #{e.message}" - raise e - end - end - end -end From 6aa84f43cfec7d706bb81ca8aea4aec4cfc20885 Mon Sep 17 00:00:00 2001 From: ddukbg Date: Fri, 25 Oct 2024 18:45:22 +0900 Subject: [PATCH 2/3] Remove ZstdCompressor tests from test_in_s3.rb as per maintainer's comments Moved ZstdCompressor tests from test_in_s3.rb to test_out_s3.rb as they relate to the out_s3 plugin. Signed-off-by: ddukbg --- test/test_in_s3.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/test/test_in_s3.rb b/test/test_in_s3.rb index dde048e..2bd2d9d 100644 --- a/test/test_in_s3.rb +++ b/test/test_in_s3.rb @@ -93,7 +93,6 @@ def test_unknown_store_as "text" => ["text", "txt", "text/plain"], "gzip" => ["gzip", "gz", "application/x-gzip"], "gzip_command" => ["gzip_command", "gz", "application/x-gzip"], - "zstd" => ["zstd", "zst", "application/x-zstd"], "lzo" => ["lzo", "lzo", "application/x-lzop"], "lzma2" => ["lzma2", "xz", "application/x-xz"]) def test_extractor(data) From dd3bafb3bc7f0c0e4cb9d7ac26d948efdef38414 Mon Sep 17 00:00:00 2001 From: ddukbg Date: Fri, 25 Oct 2024 18:45:56 +0900 Subject: [PATCH 3/3] Add ZstdCompressor test cases to test_out_s3.rb as per maintainer's comments Added tests for ZstdCompressor to test_out_s3.rb following the maintainer's suggestions. Signed-off-by: ddukbg --- test/test_out_s3.rb | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/test/test_out_s3.rb b/test/test_out_s3.rb index cbf7860..1f62b10 100644 --- a/test/test_out_s3.rb +++ b/test/test_out_s3.rb @@ -109,6 +109,14 @@ def test_configure_with_mime_type_lzo assert(e.is_a?(Fluent::ConfigError)) end + def test_configure_with_mime_type_zstd + conf = CONFIG.clone + conf << "\nstore_as zstd\n" + d = create_driver(conf) + assert_equal 'zst', d.instance.instance_variable_get(:@compressor).ext + assert_equal 'application/x-zst', d.instance.instance_variable_get(:@compressor).content_type + end + def test_configure_with_path_style conf = CONFIG.clone conf << "\nforce_path_style true\n" @@ -456,6 +464,33 @@ def test_write_with_custom_s3_object_key_format_containing_hex_random_placeholde FileUtils.rm_f(s3_local_file_path) end + def test_write_with_zstd + setup_mocks(true) + s3_local_file_path = "/tmp/s3-test.zst" + + expected_s3path = "log/events/ts=20110102-13/events_0-#{Socket.gethostname}.zst" + + setup_s3_object_mocks(s3_local_file_path: s3_local_file_path, s3path: expected_s3path) + + config = CONFIG_TIME_SLICE + "\nstore_as zstd\n" + d = create_time_sliced_driver(config) + + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: "test") do + d.feed(time, { "a" => 1 }) + d.feed(time, { "a" => 2 }) + end + + File.open(s3_local_file_path, 'rb') do |file| + compressed_data = file.read + uncompressed_data = Zstd.decompress(compressed_data) + expected_data = %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n] + assert_equal expected_data, uncompressed_data + end + FileUtils.rm_f(s3_local_file_path) + end + class MockResponse attr_reader :data