From f3769f86501754f169f3c7386361b4a465a90db0 Mon Sep 17 00:00:00 2001 From: Michael Bang Date: Wed, 3 Jul 2024 10:01:30 +0200 Subject: [PATCH] sebtopic/s3: fix s3 errors not being propagated This commit fixes an error where the return value of calls to Close() were not used. This was a major problem, since files were uploaded to S3 in a call to S3Storage writer's Close(). An easy way to recreate the bug was to start Seb with credentials that were valid only for long enough to initialize a given topic (allowing Seb to list files in order to understand the current state) and then add a new record once the credentials had expired. --- internal/sebtopic/s3storage_test.go | 29 +++++++++++ internal/sebtopic/topic.go | 10 +++- internal/sebtopic/topic_test.go | 75 +++++++++++++++++++++++++++++ 3 files changed, 112 insertions(+), 2 deletions(-) diff --git a/internal/sebtopic/s3storage_test.go b/internal/sebtopic/s3storage_test.go index 39c4905..939b42c 100644 --- a/internal/sebtopic/s3storage_test.go +++ b/internal/sebtopic/s3storage_test.go @@ -60,6 +60,35 @@ func TestS3WriteToS3(t *testing.T) { require.True(t, s3Mock.PutObjectCalled) } +// TestS3WriteToS3WithError verifies that an error is returned when uploading to +// S3 fails. +func TestS3WriteToS3WithError(t *testing.T) { + bucketName := "mybucket" + recordBatchPath := "topicName/000123.record_batch" + randomBytes := []byte(stringy.RandomN(500)) + + expectedErr := fmt.Errorf("PutObject failed") + s3Mock := &tester.S3Mock{} + s3Mock.MockPutObject = func(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) { + return nil, expectedErr + } + + s3Storage := sebtopic.NewS3Storage(log, s3Mock, bucketName, "") + + // Act + rbWriter, err := s3Storage.Writer(recordBatchPath) + require.NoError(t, err) + + n, err := rbWriter.Write(randomBytes) + require.NoError(t, err) + require.Equal(t, len(randomBytes), n) + + // file should be written to s3 when it's closed + err = rbWriter.Close() + require.ErrorIs(t, err, expectedErr) + require.True(t, s3Mock.PutObjectCalled) +} + // TestS3WriteWithPrefix verifies that the given prefix is used when calling // S3's PutObject. func TestS3WriteWithPrefix(t *testing.T) { diff --git a/internal/sebtopic/topic.go b/internal/sebtopic/topic.go index cf227c0..4c6bf97 100644 --- a/internal/sebtopic/topic.go +++ b/internal/sebtopic/topic.go @@ -121,11 +121,17 @@ func (s *Topic) AddRecords(records []sebrecords.Record) ([]uint64, error) { } if s.compression != nil { - w.Close() + err = w.Close() + if err != nil { + return nil, fmt.Errorf("closing compression writer: %w", err) + } } // once Close() returns, the data has been committed and can be retrieved by // ReadRecord. - backingWriter.Close() + err = backingWriter.Close() + if err != nil { + return nil, fmt.Errorf("closing backing writer: %w", err) + } nextOffset := recordBatchID + uint64(len(records)) diff --git a/internal/sebtopic/topic_test.go b/internal/sebtopic/topic_test.go index c08c676..dea097c 100644 --- a/internal/sebtopic/topic_test.go +++ b/internal/sebtopic/topic_test.go @@ -2,6 +2,8 @@ package sebtopic_test import ( "context" + "fmt" + "io" "testing" "time" @@ -68,6 +70,79 @@ func TestStorageWriteRecordBatchSingleBatch(t *testing.T) { }) } +// TestStorageWriteRecordsBackingStorageWriteFails verifies that an error is +// propagated when a backing storage Writer's Write() fails. +func TestStorageWriteRecordsBackingStorageWriteFails(t *testing.T) { + tester.TestCacheStorage(t, func(t *testing.T, cacheStorage sebcache.Storage) { + expectedErr := fmt.Errorf("failed to write to file") + + backingStorage := &tester.MockTopicStorage{} + backingStorage.ListFilesMock = func(topicName, extension string) ([]sebtopic.File, error) { + return nil, nil + } + backingStorage.WriterMock = func(recordBatchPath string) (io.WriteCloser, error) { + return &tester.MockWriteCloser{ + WriteMock: func(p []byte) (n int, err error) { + return 0, expectedErr + }, + }, nil + } + + cache, err := sebcache.New(log, cacheStorage) + require.NoError(t, err) + + s, err := sebtopic.New(log, backingStorage, "mytopic", cache, sebtopic.WithCompress(nil)) + require.NoError(t, err) + + records := tester.MakeRandomRecords(5) + + // Act + offsets, err := s.AddRecords(records) + + // Assert + require.ErrorIs(t, err, expectedErr) + require.Equal(t, 0, len(offsets)) + }) +} + +// TestStorageWriteRecordsBackingStorageCloseFails verifies that an error is +// propagated when a backing storage Writer's Close() fails. +func TestStorageWriteRecordsBackingStorageCloseFails(t *testing.T) { + tester.TestCacheStorage(t, func(t *testing.T, cacheStorage sebcache.Storage) { + expectedErr := fmt.Errorf("failed to close file") + + backingStorage := &tester.MockTopicStorage{} + backingStorage.ListFilesMock = func(topicName, extension string) ([]sebtopic.File, error) { + return nil, nil + } + backingStorage.WriterMock = func(recordBatchPath string) (io.WriteCloser, error) { + return &tester.MockWriteCloser{ + WriteMock: func(p []byte) (n int, err error) { + return len(p), nil + }, + CloseMock: func() error { + return expectedErr + }, + }, nil + } + + cache, err := sebcache.New(log, cacheStorage) + require.NoError(t, err) + + s, err := sebtopic.New(log, backingStorage, "mytopic", cache, sebtopic.WithCompress(nil)) + require.NoError(t, err) + + records := tester.MakeRandomRecords(5) + + // Act + offsets, err := s.AddRecords(records) + + // Assert + require.ErrorIs(t, err, expectedErr) + require.Equal(t, 0, len(offsets)) + }) +} + // TestStorageWriteRecordBatchMultipleBatches verifies that multiple // RecordBatches can be written to the underlying storage and be read back // again, and that reading beyond the number of existing records yields