Skip to content

Commit

Permalink
fix: fix bug caused by incorrectly appending compressing file
Browse files Browse the repository at this point in the history
Signed-off-by: arkbriar <[email protected]>
  • Loading branch information
arkbriar committed Aug 21, 2024
1 parent f9f68a5 commit 84ab117
Showing 1 changed file with 29 additions and 18 deletions.
47 changes: 29 additions & 18 deletions internal/filechannel/filechannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,33 +1024,20 @@ func (fc *FileChannel) Open() error {
return nil
}

func (fc *FileChannel) compress(ctx context.Context, index uint32) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

if !fc.segmentManager.Pin(index) {
return fmt.Errorf("failed to pin segment index %d", index)
}
defer fc.segmentManager.Unpin(index)

plainFile := fc.segmentManager.SegmentFile(index, Plain)
compressingFile := fc.segmentManager.SegmentFile(index, Compressing)

stats, err := os.Stat(plainFile)
func (fc *FileChannel) compressInner(index uint32, from, to string) error {
stats, err := os.Stat(from)
if err != nil {
return fmt.Errorf("failed to get stats of plain segment file: %w", err)
}

f, err := fs.OpenSequentialFile(plainFile)
f, err := fs.OpenSequentialFile(from)
if err != nil {
return fmt.Errorf("failed to open plain segment file: %w", err)
}
defer f.Close()

wf, err := fs.OpenFile(compressingFile, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
// Truncate before compressing.
wf, err := fs.OpenFile(to, os.O_CREATE|os.O_APPEND|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return fmt.Errorf("failed to open compressing segment file: %w", err)
}
Expand Down Expand Up @@ -1089,6 +1076,30 @@ func (fc *FileChannel) compress(ctx context.Context, index uint32) error {
return fmt.Errorf("failed to compress: %w", err)
}

return nil
}

func (fc *FileChannel) compress(ctx context.Context, index uint32) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

if !fc.segmentManager.Pin(index) {
return fmt.Errorf("failed to pin segment index %d", index)
}
defer fc.segmentManager.Unpin(index)

plainFile := fc.segmentManager.SegmentFile(index, Plain)
compressingFile := fc.segmentManager.SegmentFile(index, Compressing)

// Compress.
err := fc.compressInner(index, plainFile, compressingFile)
if err != nil {
return fmt.Errorf("failed to compress: %w", err)
}

// Rename and delete the original file.
err = os.Rename(compressingFile, fc.segmentManager.SegmentFile(index, Compressed))
if err != nil {
Expand Down

0 comments on commit 84ab117

Please sign in to comment.