Skip to content

Commit

Permalink
Add Zstd compression support to S3 plugin (#439)
Browse files Browse the repository at this point in the history
adds support for Zstd compression in the Fluentd S3 plugin.

Changes

- Implemented Zstd compression using the `zstd-ruby` library.
- Introduced the `ZstdCompressor` class to handle log compression before uploading to S3.
- Updated the example configuration to demonstrate the use of `store_as zstd`.
- Ensured that the `Zstd` module is properly loaded to avoid uninitialized constant errors.

Why this feature?

Zstd compression provides a better compression ratio and performance compared to gzip, making it a valuable option for users who want efficient log storage on S3.

---------

Signed-off-by: dependabot[bot] <[email protected]>
Signed-off-by: yongwoo.kim <[email protected]>
Signed-off-by: ddukbg <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: yongwoo.kim <[email protected]>
Co-authored-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
4 people authored Nov 6, 2024
1 parent 803cac2 commit 66c8d72
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ jobs:
env:
CI: true
run: |
gem install bundler rake
gem install rake
bundle install --jobs 4 --retry 3
bundle exec rake test
1 change: 1 addition & 0 deletions fluent-plugin-s3.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Gem::Specification.new do |gem|
gem.add_dependency "fluentd", [">= 0.14.22", "< 2"]
gem.add_dependency "aws-sdk-s3", "~> 1.60"
gem.add_dependency "aws-sdk-sqs", "~> 1.23"
gem.add_dependency 'zstd-ruby'
gem.add_development_dependency "rake", ">= 0.9.2"
gem.add_development_dependency "test-unit", ">= 3.0.8"
gem.add_development_dependency "test-unit-rr", ">= 1.0.3"
Expand Down
30 changes: 30 additions & 0 deletions lib/fluent/plugin/s3_compressor_zstd.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
require 'zstd-ruby'

module Fluent::Plugin
class S3Output
class ZstdCompressor < Compressor
S3Output.register_compressor('zstd', self)

config_section :compress, param_name: :compress_config, init: true, multi: false do
desc "Compression level for zstd (1-22)"
config_param :level, :integer, default: 3
end

def ext
'zst'.freeze
end

def content_type
'application/x-zst'.freeze
end

def compress(chunk, tmp)
compressed = Zstd.compress(chunk.read, level: @compress_config.level)
tmp.write(compressed)
rescue => e
log.warn "zstd compression failed: #{e.message}"
raise
end
end
end
end
39 changes: 39 additions & 0 deletions test/test_out_s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ def test_configure_with_mime_type_lzo
assert(e.is_a?(Fluent::ConfigError))
end

data('level default' => nil,
'level 1' => 1)
def test_configure_with_mime_type_zstd(level)
conf = CONFIG.clone
conf << "\nstore_as zstd\n"
conf << "\n<compress>\nlevel #{level}\n</compress>\n" if level
d = create_driver(conf)
assert_equal 'zst', d.instance.instance_variable_get(:@compressor).ext
assert_equal 'application/x-zst', d.instance.instance_variable_get(:@compressor).content_type
assert_equal (level || 3), d.instance.instance_variable_get(:@compressor).instance_variable_get(:@compress_config).level
end

def test_configure_with_path_style
conf = CONFIG.clone
conf << "\nforce_path_style true\n"
Expand Down Expand Up @@ -456,6 +468,33 @@ def test_write_with_custom_s3_object_key_format_containing_hex_random_placeholde
FileUtils.rm_f(s3_local_file_path)
end

def test_write_with_zstd
setup_mocks(true)
s3_local_file_path = "/tmp/s3-test.zst"

expected_s3path = "log/events/ts=20110102-13/events_0-#{Socket.gethostname}.zst"

setup_s3_object_mocks(s3_local_file_path: s3_local_file_path, s3path: expected_s3path)

config = CONFIG_TIME_SLICE + "\nstore_as zstd\n"
d = create_time_sliced_driver(config)

time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: "test") do
d.feed(time, { "a" => 1 })
d.feed(time, { "a" => 2 })
end

File.open(s3_local_file_path, 'rb') do |file|
compressed_data = file.read
uncompressed_data = Zstd.decompress(compressed_data)
expected_data = %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] +
%[2011-01-02T13:14:15Z\ttest\t{"a":2}\n]
assert_equal expected_data, uncompressed_data
end
FileUtils.rm_f(s3_local_file_path)
end

class MockResponse
attr_reader :data

Expand Down

0 comments on commit 66c8d72

Please sign in to comment.