From 593fc13ff7c5aa8a6fed58afeb859641ae73e03b Mon Sep 17 00:00:00 2001 From: David Taylor Date: Wed, 27 Dec 2023 20:05:41 +0000 Subject: [PATCH] sstable: flush value blocks if 128 are buffered --- sstable/options.go | 13 ++ sstable/testdata/writer_value_blocks | 315 +++++++++++++++++++++++++++ sstable/value_block.go | 78 +++++-- sstable/write_queue.go | 20 +- sstable/writer.go | 13 +- sstable/writer_test.go | 7 +- 6 files changed, 417 insertions(+), 29 deletions(-) diff --git a/sstable/options.go b/sstable/options.go index b7ef59f98e..86d0af9a5a 100644 --- a/sstable/options.go +++ b/sstable/options.go @@ -267,6 +267,16 @@ type WriterOptions struct { // RequiredInPlaceValueBound mirrors // Options.Experimental.RequiredInPlaceValueBound. RequiredInPlaceValueBound UserKeyPrefixBound + + // ValueBlockBufferLimit is the number of value blocks to buffer in-memory + // before flushing them to the underlying writer. Buffering these blocks and + // flushing them in groups, rather than interleaved block-by-block with data + // blocks, potentially improves locality of scans over data blocks in the + // presence of prefetching/read-ahead, page caching, etc. + // + // A value of 0 implies the default of max(8MB/BlockSize, 16) while a value of + // less than 0 disables buffering entirely. + ValueBlockBufferLimit int } func (o WriterOptions) ensureDefaults() WriterOptions { @@ -288,6 +298,9 @@ func (o WriterOptions) ensureDefaults() WriterOptions { if o.IndexBlockSize <= 0 { o.IndexBlockSize = o.BlockSize } + if o.ValueBlockBufferLimit == 0 { + o.ValueBlockBufferLimit = max(16, 8<<20/o.BlockSize) + } if o.MergerName == "" { o.MergerName = base.DefaultMerger.Name } diff --git a/sstable/testdata/writer_value_blocks b/sstable/testdata/writer_value_blocks index e98a4b5527..6562e0a0d2 100644 --- a/sstable/testdata/writer_value_blocks +++ b/sstable/testdata/writer_value_blocks @@ -328,3 +328,318 @@ layout 787 version: 4 791 magic number: 0xf09faab3f09faab3 799 EOF + +# Show value block buffering of 2 causing groups of 2 val blocks in the middle. +build block-size=8 buf-limit=2 +blue@10.SET.20:blue10 +blue@8.SET.18:blue8 +blue@8.SET.16:blue8s +blue@6.SET.15:blue6isverylong +blue@6.SET.14:blue6isverylong1 +blue@6.SET.13:blue6isverylong1 +blue@6.SET.12:blue6isverylong1 +blue@6.SET.11:blue6isverylong1 +---- +value-blocks: num-values 7, num-blocks: 6, size: 149 + +layout +---- + 0 data (33) + 0 record (25 = 3 [0] + 15 + 7) [restart] + blue@10#20,1:blue10 + 25 [restart 0] + 33 [trailer compression=none checksum=0x5fb0d551] + 38 data (29) + 38 record (21 = 3 [0] + 14 + 4) [restart] + blue@8#18,1:value handle {valueLen:5 blockNum:0 offsetInBlock:0} + 59 [restart 38] + 67 [trailer compression=none checksum=0x628e4a10] + 72 data (29) + 72 record (21 = 3 [0] + 14 + 4) [restart] + blue@8#16,1:value handle {valueLen:6 blockNum:0 offsetInBlock:5} + 93 [restart 72] + 101 [trailer compression=none checksum=0x4e65b9b6] + 106 data (29) + 106 record (21 = 3 [0] + 14 + 4) [restart] + blue@6#15,1:value handle {valueLen:15 blockNum:1 offsetInBlock:0} + 127 [restart 106] + 135 [trailer compression=none checksum=0xc992640e] + 140 value-block (11) + 156 value-block (15) + 176 data (29) + 176 record (21 = 3 [0] + 14 + 4) [restart] + blue@6#14,1:value handle {valueLen:16 blockNum:2 offsetInBlock:0} + 197 [restart 176] + 205 [trailer compression=none checksum=0x62a8bb33] + 210 data (29) + 210 record (21 = 3 [0] + 14 + 4) [restart] + blue@6#13,1:value handle {valueLen:16 blockNum:3 offsetInBlock:0} + 231 [restart 210] + 239 [trailer compression=none checksum=0xc0ab3808] + 244 value-block (16) + 265 value-block (16) + 286 data (29) + 286 record (21 = 3 [0] + 14 + 4) [restart] + blue@6#12,1:value handle {valueLen:16 blockNum:4 offsetInBlock:0} + 307 [restart 286] + 315 [trailer compression=none checksum=0xec7ee24d] + 320 data (29) + 320 record (21 = 3 [0] + 14 + 4) [restart] + blue@6#11,1:value handle {valueLen:16 blockNum:5 offsetInBlock:0} + 341 [restart 320] + 349 [trailer compression=none checksum=0xcca2bad9] + 354 index (28) + 354 block:0/33 [restart] + 374 [restart 354] + 382 [trailer compression=none checksum=0x32b37f08] + 387 index (27) + 387 block:38/29 [restart] + 406 [restart 387] + 414 [trailer compression=none checksum=0x21d27815] + 419 index (30) + 419 block:72/29 [restart] + 441 [restart 419] + 449 [trailer compression=none checksum=0xba0b26fe] + 454 index (27) + 454 block:106/29 [restart] + 473 [restart 454] + 481 [trailer compression=none checksum=0xf2c4e3d7] + 486 index (31) + 486 block:176/29 [restart] + 509 [restart 486] + 517 [trailer compression=none checksum=0x1c2b03b0] + 522 index (31) + 522 block:210/29 [restart] + 545 [restart 522] + 553 [trailer compression=none checksum=0x5121de43] + 558 index (31) + 558 block:286/29 [restart] + 581 [restart 558] + 589 [trailer compression=none checksum=0x5670ba6d] + 594 index (26) + 594 block:320/29 [restart] + 612 [restart 594] + 620 [trailer compression=none checksum=0x5d0eec20] + 625 top-index (151) + 625 block:354/28 [restart] + 646 block:387/27 [restart] + 666 block:419/30 [restart] + 689 block:454/27 [restart] + 709 block:486/31 [restart] + 732 block:522/31 [restart] + 755 block:558/31 [restart] + 778 block:594/26 [restart] + 796 [restart 625] + 800 [restart 646] + 804 [restart 666] + 808 [restart 689] + 812 [restart 709] + 816 [restart 732] + 820 [restart 755] + 824 [restart 778] + 776 [trailer compression=snappy checksum=0x1a8319dc] + 781 value-block (16) + 802 value-block (16) + 823 value-index (24) + 852 properties (678) + 852 obsolete-key (16) [restart] + 868 pebble.num.value-blocks (27) + 895 pebble.num.values.in.value-blocks (21) + 916 pebble.value-blocks.size (22) + 938 rocksdb.block.based.table.index.type (43) + 981 rocksdb.block.based.table.prefix.filtering (20) + 1001 rocksdb.block.based.table.whole.key.filtering (23) + 1024 rocksdb.comparator (37) + 1061 rocksdb.compression (16) + 1077 rocksdb.compression_options (106) + 1183 rocksdb.data.size (14) + 1197 rocksdb.deleted.keys (15) + 1212 rocksdb.external_sst_file.global_seqno (41) + 1253 rocksdb.external_sst_file.version (14) + 1267 rocksdb.filter.size (15) + 1282 rocksdb.index.partitions (20) + 1302 rocksdb.index.size (9) + 1311 rocksdb.merge.operands (18) + 1329 rocksdb.merge.operator (24) + 1353 rocksdb.num.data.blocks (19) + 1372 rocksdb.num.entries (11) + 1383 rocksdb.num.range-deletions (19) + 1402 rocksdb.prefix.extractor.name (31) + 1433 rocksdb.property.collectors (34) + 1467 rocksdb.raw.key.size (16) + 1483 rocksdb.raw.value.size (14) + 1497 rocksdb.top-level.index.size (25) + 1522 [restart 852] + 1530 [trailer compression=none checksum=0xe690121f] + 1535 meta-index (64) + 1535 pebble.value_index block:823/24 value-blocks-index-lengths: 1(num), 2(offset), 1(length) [restart] + 1562 rocksdb.properties block:852/678 [restart] + 1587 [restart 1535] + 1591 [restart 1562] + 1599 [trailer compression=none checksum=0x98d2a4dd] + 1604 footer (53) + 1604 checksum type: crc32c + 1605 meta: offset=1535, length=64 + 1608 index: offset=625, length=151 + 1612 [padding] + 1645 version: 4 + 1649 magic number: 0xf09faab3f09faab3 + 1657 EOF + +# Show val block buffering limit of 1 flushing every block. +build block-size=8 buf-limit=1 +blue@10.SET.20:blue10 +blue@8.SET.18:blue8 +blue@8.SET.16:blue8s +blue@6.SET.15:blue6isverylong +blue@6.SET.14:blue6isverylong1 +blue@6.SET.13:blue6isverylong1 +blue@6.SET.12:blue6isverylong1 +blue@6.SET.11:blue6isverylong1 +---- +value-blocks: num-values 7, num-blocks: 6, size: 149 + + +layout +---- + 0 data (33) + 0 record (25 = 3 [0] + 15 + 7) [restart] + blue@10#20,1:blue10 + 25 [restart 0] + 33 [trailer compression=none checksum=0x5fb0d551] + 38 data (29) + 38 record (21 = 3 [0] + 14 + 4) [restart] + blue@8#18,1:value handle {valueLen:5 blockNum:0 offsetInBlock:0} + 59 [restart 38] + 67 [trailer compression=none checksum=0x628e4a10] + 72 data (29) + 72 record (21 = 3 [0] + 14 + 4) [restart] + blue@8#16,1:value handle {valueLen:6 blockNum:0 offsetInBlock:5} + 93 [restart 72] + 101 [trailer compression=none checksum=0x4e65b9b6] + 106 value-block (11) + 122 data (29) + 122 record (21 = 3 [0] + 14 + 4) [restart] + blue@6#15,1:value handle {valueLen:15 blockNum:1 offsetInBlock:0} + 143 [restart 122] + 151 [trailer compression=none checksum=0xc992640e] + 156 value-block (15) + 176 data (29) + 176 record (21 = 3 [0] + 14 + 4) [restart] + blue@6#14,1:value handle {valueLen:16 blockNum:2 offsetInBlock:0} + 197 [restart 176] + 205 [trailer compression=none checksum=0x62a8bb33] + 210 value-block (16) + 231 data (29) + 231 record (21 = 3 [0] + 14 + 4) [restart] + blue@6#13,1:value handle {valueLen:16 blockNum:3 offsetInBlock:0} + 252 [restart 231] + 260 [trailer compression=none checksum=0xc0ab3808] + 265 value-block (16) + 286 data (29) + 286 record (21 = 3 [0] + 14 + 4) [restart] + blue@6#12,1:value handle {valueLen:16 blockNum:4 offsetInBlock:0} + 307 [restart 286] + 315 [trailer compression=none checksum=0xec7ee24d] + 320 data (29) + 320 record (21 = 3 [0] + 14 + 4) [restart] + blue@6#11,1:value handle {valueLen:16 blockNum:5 offsetInBlock:0} + 341 [restart 320] + 349 [trailer compression=none checksum=0xcca2bad9] + 354 index (28) + 354 block:0/33 [restart] + 374 [restart 354] + 382 [trailer compression=none checksum=0x32b37f08] + 387 index (27) + 387 block:38/29 [restart] + 406 [restart 387] + 414 [trailer compression=none checksum=0x21d27815] + 419 index (30) + 419 block:72/29 [restart] + 441 [restart 419] + 449 [trailer compression=none checksum=0xba0b26fe] + 454 index (27) + 454 block:122/29 [restart] + 473 [restart 454] + 481 [trailer compression=none checksum=0xcd162eb6] + 486 index (31) + 486 block:176/29 [restart] + 509 [restart 486] + 517 [trailer compression=none checksum=0x1c2b03b0] + 522 index (31) + 522 block:231/29 [restart] + 545 [restart 522] + 553 [trailer compression=none checksum=0xa8453ba7] + 558 index (31) + 558 block:286/29 [restart] + 581 [restart 558] + 589 [trailer compression=none checksum=0x5670ba6d] + 594 index (26) + 594 block:320/29 [restart] + 612 [restart 594] + 620 [trailer compression=none checksum=0x5d0eec20] + 625 top-index (151) + 625 block:354/28 [restart] + 646 block:387/27 [restart] + 666 block:419/30 [restart] + 689 block:454/27 [restart] + 709 block:486/31 [restart] + 732 block:522/31 [restart] + 755 block:558/31 [restart] + 778 block:594/26 [restart] + 796 [restart 625] + 800 [restart 646] + 804 [restart 666] + 808 [restart 689] + 812 [restart 709] + 816 [restart 732] + 820 [restart 755] + 824 [restart 778] + 776 [trailer compression=snappy checksum=0x1a8319dc] + 781 value-block (16) + 802 value-block (16) + 823 value-index (24) + 852 properties (678) + 852 obsolete-key (16) [restart] + 868 pebble.num.value-blocks (27) + 895 pebble.num.values.in.value-blocks (21) + 916 pebble.value-blocks.size (22) + 938 rocksdb.block.based.table.index.type (43) + 981 rocksdb.block.based.table.prefix.filtering (20) + 1001 rocksdb.block.based.table.whole.key.filtering (23) + 1024 rocksdb.comparator (37) + 1061 rocksdb.compression (16) + 1077 rocksdb.compression_options (106) + 1183 rocksdb.data.size (14) + 1197 rocksdb.deleted.keys (15) + 1212 rocksdb.external_sst_file.global_seqno (41) + 1253 rocksdb.external_sst_file.version (14) + 1267 rocksdb.filter.size (15) + 1282 rocksdb.index.partitions (20) + 1302 rocksdb.index.size (9) + 1311 rocksdb.merge.operands (18) + 1329 rocksdb.merge.operator (24) + 1353 rocksdb.num.data.blocks (19) + 1372 rocksdb.num.entries (11) + 1383 rocksdb.num.range-deletions (19) + 1402 rocksdb.prefix.extractor.name (31) + 1433 rocksdb.property.collectors (34) + 1467 rocksdb.raw.key.size (16) + 1483 rocksdb.raw.value.size (14) + 1497 rocksdb.top-level.index.size (25) + 1522 [restart 852] + 1530 [trailer compression=none checksum=0xe690121f] + 1535 meta-index (64) + 1535 pebble.value_index block:823/24 value-blocks-index-lengths: 1(num), 2(offset), 1(length) [restart] + 1562 rocksdb.properties block:852/678 [restart] + 1587 [restart 1535] + 1591 [restart 1562] + 1599 [trailer compression=none checksum=0x98d2a4dd] + 1604 footer (53) + 1604 checksum type: crc32c + 1605 meta: offset=1535, length=64 + 1608 index: offset=625, length=151 + 1612 [padding] + 1645 version: 4 + 1649 magic number: 0xf09faab3f09faab3 + 1657 EOF diff --git a/sstable/value_block.go b/sstable/value_block.go index ff8b05cf4f..0f03026143 100644 --- a/sstable/value_block.go +++ b/sstable/value_block.go @@ -135,10 +135,12 @@ import ( // compressed. An uncompressed value block is a sequence of values with no // separator or length (we rely on the valueHandle to demarcate). The // valueHandle.offsetInBlock points to the value, of length -// valueHandle.valueLen. While writing a sstable, all the (possibly -// compressed) value blocks need to be held in-memory until they can be -// written. Value blocks are placed after the "meta rangedel" and "meta range -// key" blocks since value blocks are considered less likely to be read. +// valueHandle.valueLen. While writing a sstable, up to the configured limit +// number of (possibly compressed) value blocks need to be held in-memory until +// they can be written contiguously. Remaining value blocks when the file is +// finished, and the value block index, are placed after the "meta rangedel" and +// "meta range key" blocks. Value blocks are buffered and then stored in larger +// groups to improve locality of data blocks. // // Meta Value Index Block: // Since the (key, valueHandle) pair are written before there is any knowledge @@ -386,8 +388,8 @@ type valueBlocksAndIndexStats struct { // valueBlockWriter writes a sequence of value blocks, and the value blocks // index, for a sstable. type valueBlockWriter struct { - // The configured uncompressed block size and size threshold - blockSize, blockSizeThreshold int + // The configured uncompressed block size and size threshold, and buffer size. + blockSize, blockSizeThreshold, maxBufferedBlocks int // Configured compression. compression Compression // checksummer with configured checksum type. @@ -404,6 +406,10 @@ type valueBlockWriter struct { // Cumulative value block bytes written so far. totalBlockBytes uint64 numValues uint64 + // queue's writer and writerMu fields are used to flush blocks. + queue *writeQueue + // unflushedIdx is the first index in `blocks` that has not been flushed. + unflushedIdx int } type blockAndHandle struct { @@ -432,6 +438,9 @@ var compressedValueBlockBufPool = sync.Pool{ } func releaseToValueBlockBufPool(pool *sync.Pool, b *blockBuffer) { + if b == nil { + return // already released when flushed. + } // Don't pool buffers larger than 128KB, in case we had some rare large // values. if len(b.b) > 128*1024 { @@ -459,10 +468,12 @@ var valueBlockWriterPool = sync.Pool{ func newValueBlockWriter( blockSize int, blockSizeThreshold int, + maxBufferedBlocks int, compression Compression, checksumType ChecksumType, // compressedSize should exclude the block trailer. blockFinishedFunc func(compressedSize int), + queue *writeQueue, ) *valueBlockWriter { w := valueBlockWriterPool.Get().(*valueBlockWriter) *w = valueBlockWriter{ @@ -476,6 +487,8 @@ func newValueBlockWriter( buf: uncompressedValueBlockBufPool.Get().(*blockBuffer), compressedBuf: compressedValueBlockBufPool.Get().(*blockBuffer), blocks: w.blocks[:0], + queue: queue, + maxBufferedBlocks: maxBufferedBlocks, } w.buf.b = w.buf.b[:0] w.compressedBuf.b = w.compressedBuf.b[:0] @@ -514,7 +527,9 @@ func (w *valueBlockWriter) addValue(v []byte) (valueHandle, error) { (blockLen > w.blockSizeThreshold && blockLen+valueLen > w.blockSize) { // Block is not currently empty and adding this value will become too big, // so finish this block. - w.compressAndFlush() + if err := w.compressAndFlush(); err != nil { + return valueHandle{}, err + } blockLen = len(w.buf.b) if invariants.Enabled && blockLen != 0 { panic("blockLen of new block should be 0") @@ -548,7 +563,7 @@ func (w *valueBlockWriter) addValue(v []byte) (valueHandle, error) { return vh, nil } -func (w *valueBlockWriter) compressAndFlush() { +func (w *valueBlockWriter) compressAndFlush() error { // Compress the buffer, discarding the result if the improvement isn't at // least 12.5%. blockType := noCompressionBlockType @@ -572,14 +587,18 @@ func (w *valueBlockWriter) compressAndFlush() { } b.b[n] = byte(blockType) w.computeChecksum(b.b) - bh := BlockHandle{Offset: w.totalBlockBytes, Length: uint64(n)} w.totalBlockBytes += uint64(len(b.b)) // blockFinishedFunc length excludes the block trailer. w.blockFinishedFunc(n) compressed := blockType != noCompressionBlockType + if len(w.blocks)-w.unflushedIdx >= w.maxBufferedBlocks { + if err := w.flushBufferedBlocks(); err != nil { + return err + } + } w.blocks = append(w.blocks, blockAndHandle{ block: b, - handle: bh, + handle: BlockHandle{Length: uint64(n)}, // Offset is set at flush. compressed: compressed, }) // Handed off a buffer to w.blocks, so need get a new one. @@ -589,6 +608,27 @@ func (w *valueBlockWriter) compressAndFlush() { w.buf = uncompressedValueBlockBufPool.Get().(*blockBuffer) } w.buf.b = w.buf.b[:0] + return nil +} + +func (w *valueBlockWriter) flushBufferedBlocks() error { + w.queue.writeMu.Lock() + defer w.queue.writeMu.Unlock() + + for i := w.unflushedIdx; i < len(w.blocks); i++ { + w.blocks[i].handle.Offset = w.queue.writer.meta.Size + if _, err := w.queue.writer.Write(w.blocks[i].block.b); err != nil { + return err + } + if w.blocks[i].compressed { + releaseToValueBlockBufPool(&compressedValueBlockBufPool, w.blocks[i].block) + } else { + releaseToValueBlockBufPool(&uncompressedValueBlockBufPool, w.blocks[i].block) + } + w.blocks[i].block = nil + } + w.unflushedIdx = len(w.blocks) + return nil } func (w *valueBlockWriter) computeChecksum(block []byte) { @@ -598,10 +638,12 @@ func (w *valueBlockWriter) computeChecksum(block []byte) { } func (w *valueBlockWriter) finish( - writer io.Writer, fileOffset uint64, + writer *Writer, ) (valueBlocksIndexHandle, valueBlocksAndIndexStats, error) { if len(w.buf.b) > 0 { - w.compressAndFlush() + if err := w.compressAndFlush(); err != nil { + return valueBlocksIndexHandle{}, valueBlocksAndIndexStats{}, err + } } n := len(w.blocks) if n == 0 { @@ -609,18 +651,18 @@ func (w *valueBlockWriter) finish( } largestOffset := uint64(0) largestLength := uint64(0) + + if err := w.flushBufferedBlocks(); err != nil { + return valueBlocksIndexHandle{}, valueBlocksAndIndexStats{}, nil + } + for i := range w.blocks { - _, err := writer.Write(w.blocks[i].block.b) - if err != nil { - return valueBlocksIndexHandle{}, valueBlocksAndIndexStats{}, err - } - w.blocks[i].handle.Offset += fileOffset largestOffset = w.blocks[i].handle.Offset if largestLength < w.blocks[i].handle.Length { largestLength = w.blocks[i].handle.Length } } - vbihOffset := fileOffset + w.totalBlockBytes + vbihOffset := writer.meta.Size vbih := valueBlocksIndexHandle{ h: BlockHandle{ diff --git a/sstable/write_queue.go b/sstable/write_queue.go index 898f32755e..dce5f85984 100644 --- a/sstable/write_queue.go +++ b/sstable/write_queue.go @@ -32,14 +32,22 @@ func (task *writeTask) clear() { } } +// writeQueue represents a queue of writes to the underlying writer. +// +// writes to the underlying writer are performed only while holding the writerMu +// mutex, in case other other users of the underlying writer need to serialize +// operations on the underlying writer with a running writeQueue. +// // Note that only the Writer client goroutine will be adding tasks to the writeQueue. // Both the Writer client and the compression goroutines will be able to write to // writeTask.compressionDone to indicate that the compression job associated with // a writeTask has finished. type writeQueue struct { - tasks chan *writeTask - wg sync.WaitGroup - writer *Writer + tasks chan *writeTask + wg sync.WaitGroup + + writeMu sync.Mutex + writer *Writer // err represents an error which is encountered when the write queue attempts // to write a block to disk. The error is stored here to skip unnecessary block @@ -59,6 +67,9 @@ func newWriteQueue(size int, writer *Writer) *writeQueue { } func (w *writeQueue) performWrite(task *writeTask) error { + w.writeMu.Lock() + defer w.writeMu.Unlock() + var bh BlockHandle var bhp BlockHandleWithProperties @@ -111,8 +122,7 @@ func (w *writeQueue) add(task *writeTask) { w.tasks <- task } -// addSync will perform the writeTask synchronously with the caller goroutine. Calls to addSync -// are no longer valid once writeQueue.add has been called at least once. +// addSync will perform the writeTask synchronously with the caller goroutine. func (w *writeQueue) addSync(task *writeTask) error { // This should instantly return without blocking. <-task.compressionDone diff --git a/sstable/writer.go b/sstable/writer.go index 883ee7cecd..2bd2ac356d 100644 --- a/sstable/writer.go +++ b/sstable/writer.go @@ -2014,7 +2014,7 @@ func (w *Writer) Close() (err error) { } if w.valueBlockWriter != nil { - vbiHandle, vbStats, err := w.valueBlockWriter.finish(w, w.meta.Size) + vbiHandle, vbStats, err := w.valueBlockWriter.finish(w) if err != nil { return err } @@ -2234,13 +2234,18 @@ func NewWriter(writable objstorage.Writable, o WriterOptions, extraOpts ...Write Format: o.Comparer.FormatKey, }, } + + w.coordination.init(o.Parallelism, w) + if w.tableFormat >= TableFormatPebblev3 { w.shortAttributeExtractor = o.ShortAttributeExtractor w.requiredInPlaceValueBound = o.RequiredInPlaceValueBound w.valueBlockWriter = newValueBlockWriter( - w.blockSize, w.blockSizeThreshold, w.compression, w.checksumType, func(compressedSize int) { + w.blockSize, w.blockSizeThreshold, o.ValueBlockBufferLimit, w.compression, w.checksumType, + func(compressedSize int) { w.coordination.sizeEstimate.dataBlockCompressed(compressedSize, 0) - }) + }, + w.coordination.writeQueue) } w.dataBlockBuf = newDataBlockBuf(w.restartInterval, w.checksumType) @@ -2249,8 +2254,6 @@ func NewWriter(writable objstorage.Writable, o WriterOptions, extraOpts ...Write checksummer: checksummer{checksumType: o.Checksum}, } - w.coordination.init(o.Parallelism, w) - if writable == nil { w.err = errors.New("pebble: nil writable") return w diff --git a/sstable/writer_test.go b/sstable/writer_test.go index 20f9e90eec..94f36b1e57 100644 --- a/sstable/writer_test.go +++ b/sstable/writer_test.go @@ -254,6 +254,7 @@ func TestWriterWithValueBlocks(t *testing.T) { parallelism = true } t.Logf("writer parallelism %t", parallelism) + attributeExtractor := func( key []byte, keyPrefixLen int, value []byte) (base.ShortAttribute, error) { require.NotNil(t, key) @@ -271,10 +272,13 @@ func TestWriterWithValueBlocks(t *testing.T) { } var meta *WriterMetadata var err error - var blockSize int + var blockSize, bufLimit int if td.HasArg("block-size") { td.ScanArgs(t, "block-size", &blockSize) } + if td.HasArg("buf-limit") { + td.ScanArgs(t, "buf-limit", &bufLimit) + } var inPlaceValueBound UserKeyPrefixBound if td.HasArg("in-place-bound") { var l, u string @@ -289,6 +293,7 @@ func TestWriterWithValueBlocks(t *testing.T) { Parallelism: parallelism, RequiredInPlaceValueBound: inPlaceValueBound, ShortAttributeExtractor: attributeExtractor, + ValueBlockBufferLimit: bufLimit, }, 0) if err != nil { return err.Error()