Skip to content

Commit

Permalink
sebtopic/s3: fix s3 errors not being propagated
Browse files Browse the repository at this point in the history
This commit fixes an error where the return value of calls to Close()
were not used. This was a major problem, since files were uploaded to S3
in a call to S3Storage writer's Close().

An easy way to recreate the bug was to start Seb with credentials that
were valid only for long enough to initialize a given topic (allowing
Seb to list files in order to understand the current state) and then add
a new record once the credentials had expired.
  • Loading branch information
micvbang committed Jul 3, 2024
1 parent 79d9be7 commit f3769f8
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 2 deletions.
29 changes: 29 additions & 0 deletions internal/sebtopic/s3storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,35 @@ func TestS3WriteToS3(t *testing.T) {
require.True(t, s3Mock.PutObjectCalled)
}

// TestS3WriteToS3WithError verifies that an error is returned when uploading to
// S3 fails.
func TestS3WriteToS3WithError(t *testing.T) {
bucketName := "mybucket"
recordBatchPath := "topicName/000123.record_batch"
randomBytes := []byte(stringy.RandomN(500))

expectedErr := fmt.Errorf("PutObject failed")
s3Mock := &tester.S3Mock{}
s3Mock.MockPutObject = func(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) {
return nil, expectedErr
}

s3Storage := sebtopic.NewS3Storage(log, s3Mock, bucketName, "")

// Act
rbWriter, err := s3Storage.Writer(recordBatchPath)
require.NoError(t, err)

n, err := rbWriter.Write(randomBytes)
require.NoError(t, err)
require.Equal(t, len(randomBytes), n)

// file should be written to s3 when it's closed
err = rbWriter.Close()
require.ErrorIs(t, err, expectedErr)
require.True(t, s3Mock.PutObjectCalled)
}

// TestS3WriteWithPrefix verifies that the given prefix is used when calling
// S3's PutObject.
func TestS3WriteWithPrefix(t *testing.T) {
Expand Down
10 changes: 8 additions & 2 deletions internal/sebtopic/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,17 @@ func (s *Topic) AddRecords(records []sebrecords.Record) ([]uint64, error) {
}

if s.compression != nil {
w.Close()
err = w.Close()
if err != nil {
return nil, fmt.Errorf("closing compression writer: %w", err)
}
}
// once Close() returns, the data has been committed and can be retrieved by
// ReadRecord.
backingWriter.Close()
err = backingWriter.Close()
if err != nil {
return nil, fmt.Errorf("closing backing writer: %w", err)
}

nextOffset := recordBatchID + uint64(len(records))

Expand Down
75 changes: 75 additions & 0 deletions internal/sebtopic/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package sebtopic_test

import (
"context"
"fmt"
"io"
"testing"
"time"

Expand Down Expand Up @@ -68,6 +70,79 @@ func TestStorageWriteRecordBatchSingleBatch(t *testing.T) {
})
}

// TestStorageWriteRecordsBackingStorageWriteFails verifies that an error is
// propagated when a backing storage Writer's Write() fails.
func TestStorageWriteRecordsBackingStorageWriteFails(t *testing.T) {
tester.TestCacheStorage(t, func(t *testing.T, cacheStorage sebcache.Storage) {
expectedErr := fmt.Errorf("failed to write to file")

backingStorage := &tester.MockTopicStorage{}
backingStorage.ListFilesMock = func(topicName, extension string) ([]sebtopic.File, error) {
return nil, nil
}
backingStorage.WriterMock = func(recordBatchPath string) (io.WriteCloser, error) {
return &tester.MockWriteCloser{
WriteMock: func(p []byte) (n int, err error) {
return 0, expectedErr
},
}, nil
}

cache, err := sebcache.New(log, cacheStorage)
require.NoError(t, err)

s, err := sebtopic.New(log, backingStorage, "mytopic", cache, sebtopic.WithCompress(nil))
require.NoError(t, err)

records := tester.MakeRandomRecords(5)

// Act
offsets, err := s.AddRecords(records)

// Assert
require.ErrorIs(t, err, expectedErr)
require.Equal(t, 0, len(offsets))
})
}

// TestStorageWriteRecordsBackingStorageCloseFails verifies that an error is
// propagated when a backing storage Writer's Close() fails.
func TestStorageWriteRecordsBackingStorageCloseFails(t *testing.T) {
tester.TestCacheStorage(t, func(t *testing.T, cacheStorage sebcache.Storage) {
expectedErr := fmt.Errorf("failed to close file")

backingStorage := &tester.MockTopicStorage{}
backingStorage.ListFilesMock = func(topicName, extension string) ([]sebtopic.File, error) {
return nil, nil
}
backingStorage.WriterMock = func(recordBatchPath string) (io.WriteCloser, error) {
return &tester.MockWriteCloser{
WriteMock: func(p []byte) (n int, err error) {
return len(p), nil
},
CloseMock: func() error {
return expectedErr
},
}, nil
}

cache, err := sebcache.New(log, cacheStorage)
require.NoError(t, err)

s, err := sebtopic.New(log, backingStorage, "mytopic", cache, sebtopic.WithCompress(nil))
require.NoError(t, err)

records := tester.MakeRandomRecords(5)

// Act
offsets, err := s.AddRecords(records)

// Assert
require.ErrorIs(t, err, expectedErr)
require.Equal(t, 0, len(offsets))
})
}

// TestStorageWriteRecordBatchMultipleBatches verifies that multiple
// RecordBatches can be written to the underlying storage and be read back
// again, and that reading beyond the number of existing records yields
Expand Down

0 comments on commit f3769f8

Please sign in to comment.