From 84ab117add98ac32252e6b294842e3f9568dc39b Mon Sep 17 00:00:00 2001 From: arkbriar Date: Wed, 21 Aug 2024 14:21:49 +0800 Subject: [PATCH] fix: fix bug caused by incorrectly appending compressing file Signed-off-by: arkbriar --- internal/filechannel/filechannel.go | 47 ++++++++++++++++++----------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/internal/filechannel/filechannel.go b/internal/filechannel/filechannel.go index 0b1889f..c40fa44 100644 --- a/internal/filechannel/filechannel.go +++ b/internal/filechannel/filechannel.go @@ -1024,33 +1024,20 @@ func (fc *FileChannel) Open() error { return nil } -func (fc *FileChannel) compress(ctx context.Context, index uint32) error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - if !fc.segmentManager.Pin(index) { - return fmt.Errorf("failed to pin segment index %d", index) - } - defer fc.segmentManager.Unpin(index) - - plainFile := fc.segmentManager.SegmentFile(index, Plain) - compressingFile := fc.segmentManager.SegmentFile(index, Compressing) - - stats, err := os.Stat(plainFile) +func (fc *FileChannel) compressInner(index uint32, from, to string) error { + stats, err := os.Stat(from) if err != nil { return fmt.Errorf("failed to get stats of plain segment file: %w", err) } - f, err := fs.OpenSequentialFile(plainFile) + f, err := fs.OpenSequentialFile(from) if err != nil { return fmt.Errorf("failed to open plain segment file: %w", err) } defer f.Close() - wf, err := fs.OpenFile(compressingFile, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + // Truncate before compressing. + wf, err := fs.OpenFile(to, os.O_CREATE|os.O_APPEND|os.O_WRONLY|os.O_TRUNC, 0644) if err != nil { return fmt.Errorf("failed to open compressing segment file: %w", err) } @@ -1089,6 +1076,30 @@ func (fc *FileChannel) compress(ctx context.Context, index uint32) error { return fmt.Errorf("failed to compress: %w", err) } + return nil +} + +func (fc *FileChannel) compress(ctx context.Context, index uint32) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if !fc.segmentManager.Pin(index) { + return fmt.Errorf("failed to pin segment index %d", index) + } + defer fc.segmentManager.Unpin(index) + + plainFile := fc.segmentManager.SegmentFile(index, Plain) + compressingFile := fc.segmentManager.SegmentFile(index, Compressing) + + // Compress. + err := fc.compressInner(index, plainFile, compressingFile) + if err != nil { + return fmt.Errorf("failed to compress: %w", err) + } + // Rename and delete the original file. err = os.Rename(compressingFile, fc.segmentManager.SegmentFile(index, Compressed)) if err != nil {