Skip to content

Commit

Permalink
Use a confined Arena for IOContext.READONCE (apache#13535)
Browse files Browse the repository at this point in the history
Use a confined Arena for IOContext.READONCE.

This change will require inputs opened with READONCE to be consumed and closed on the creating thread. Further testing and assertions can be added as a follow up.
  • Loading branch information
ChrisHegarty authored Jul 10, 2024
1 parent ef215d8 commit da41215
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ public void checkIntegrity() throws IOException {
clone.seek(0);
// checksum is fixed-width encoded with 20 bytes, plus 1 byte for newline (the space is included
// in SimpleTextUtil.CHECKSUM):
long footerStartPos = data.length() - (SimpleTextUtil.CHECKSUM.length + 21);
long footerStartPos = clone.length() - (SimpleTextUtil.CHECKSUM.length + 21);
ChecksumIndexInput input = new BufferedChecksumIndexInput(clone);
while (true) {
SimpleTextUtil.readLine(input, scratch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public void checkIntegrity() throws IOException {

// checksum is fixed-width encoded with 20 bytes, plus 1 byte for newline (the space is included
// in SimpleTextUtil.CHECKSUM):
long footerStartPos = dataIn.length() - (SimpleTextUtil.CHECKSUM.length + 21);
long footerStartPos = clone.length() - (SimpleTextUtil.CHECKSUM.length + 21);
ChecksumIndexInput input = new BufferedChecksumIndexInput(clone);
while (true) {
SimpleTextUtil.readLine(input, scratch);
Expand Down
7 changes: 6 additions & 1 deletion lucene/core/src/java/org/apache/lucene/store/IOContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ public enum Context {
*/
public static final IOContext DEFAULT = new IOContext(Constants.DEFAULT_READADVICE);

/** A default context for reads with {@link ReadAdvice#SEQUENTIAL}. */
/**
* A default context for reads with {@link ReadAdvice#SEQUENTIAL}.
*
* <p>This context should only be used when the read operations will be performed in the same
* thread as the thread that opens the underlying storage.
*/
public static final IOContext READONCE = new IOContext(ReadAdvice.SEQUENTIAL);

@SuppressWarnings("incomplete-switch")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ abstract class MemorySegmentIndexInput extends IndexInput
final long length;
final long chunkSizeMask;
final int chunkSizePower;
final boolean confined;
final Arena arena;
final MemorySegment[] segments;

Expand All @@ -67,12 +68,15 @@ public static MemorySegmentIndexInput newInstance(
Arena arena,
MemorySegment[] segments,
long length,
int chunkSizePower) {
int chunkSizePower,
boolean confined) {
assert Arrays.stream(segments).map(MemorySegment::scope).allMatch(arena.scope()::equals);
if (segments.length == 1) {
return new SingleSegmentImpl(resourceDescription, arena, segments[0], length, chunkSizePower);
return new SingleSegmentImpl(
resourceDescription, arena, segments[0], length, chunkSizePower, confined);
} else {
return new MultiSegmentImpl(resourceDescription, arena, segments, 0, length, chunkSizePower);
return new MultiSegmentImpl(
resourceDescription, arena, segments, 0, length, chunkSizePower, confined);
}
}

Expand All @@ -81,12 +85,14 @@ private MemorySegmentIndexInput(
Arena arena,
MemorySegment[] segments,
long length,
int chunkSizePower) {
int chunkSizePower,
boolean confined) {
super(resourceDescription);
this.arena = arena;
this.segments = segments;
this.length = length;
this.chunkSizePower = chunkSizePower;
this.confined = confined;
this.chunkSizeMask = (1L << chunkSizePower) - 1L;
this.curSegment = segments[0];
}
Expand All @@ -97,6 +103,12 @@ void ensureOpen() {
}
}

void ensureAccessible() {
if (confined && curSegment.isAccessibleBy(Thread.currentThread()) == false) {
throw new IllegalStateException("confined");
}
}

// the unused parameter is just to silence javac about unused variables
RuntimeException handlePositionalIOOBE(RuntimeException unused, String action, long pos)
throws IOException {
Expand Down Expand Up @@ -570,6 +582,7 @@ public final MemorySegmentIndexInput slice(
/** Builds the actual sliced IndexInput (may apply extra offset in subclasses). * */
MemorySegmentIndexInput buildSlice(String sliceDescription, long offset, long length) {
ensureOpen();
ensureAccessible();

final long sliceEnd = offset + length;
final int startIndex = (int) (offset >>> chunkSizePower);
Expand All @@ -591,15 +604,17 @@ MemorySegmentIndexInput buildSlice(String sliceDescription, long offset, long le
null, // clones don't have an Arena, as they can't close)
slices[0].asSlice(offset, length),
length,
chunkSizePower);
chunkSizePower,
confined);
} else {
return new MultiSegmentImpl(
newResourceDescription,
null, // clones don't have an Arena, as they can't close)
slices,
offset,
length,
chunkSizePower);
chunkSizePower,
confined);
}
}

Expand Down Expand Up @@ -643,8 +658,15 @@ static final class SingleSegmentImpl extends MemorySegmentIndexInput {
Arena arena,
MemorySegment segment,
long length,
int chunkSizePower) {
super(resourceDescription, arena, new MemorySegment[] {segment}, length, chunkSizePower);
int chunkSizePower,
boolean confined) {
super(
resourceDescription,
arena,
new MemorySegment[] {segment},
length,
chunkSizePower,
confined);
this.curSegmentIndex = 0;
}

Expand Down Expand Up @@ -740,8 +762,9 @@ static final class MultiSegmentImpl extends MemorySegmentIndexInput {
MemorySegment[] segments,
long offset,
long length,
int chunkSizePower) {
super(resourceDescription, arena, segments, length, chunkSizePower);
int chunkSizePower,
boolean confined) {
super(resourceDescription, arena, segments, length, chunkSizePower, confined);
this.offset = offset;
try {
seek(0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public IndexInput openInput(Path path, IOContext context, int chunkSizePower, bo
path = Unwrappable.unwrapAll(path);

boolean success = false;
final Arena arena = Arena.ofShared();
final boolean confined = context == IOContext.READONCE;
final Arena arena = confined ? Arena.ofConfined() : Arena.ofShared();
try (var fc = FileChannel.open(path, StandardOpenOption.READ)) {
final long fileSize = fc.size();
final IndexInput in =
Expand All @@ -61,7 +62,8 @@ public IndexInput openInput(Path path, IOContext context, int chunkSizePower, bo
preload,
fileSize),
fileSize,
chunkSizePower);
chunkSizePower,
confined);
success = true;
return in;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@
import java.io.IOException;
import java.nio.file.Path;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.lucene.tests.store.BaseDirectoryTestCase;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.NamedThreadFactory;

/** Tests MMapDirectory */
// See: https://issues.apache.org/jira/browse/SOLR-12028 Tests cannot remove files on Windows
Expand Down Expand Up @@ -117,4 +122,54 @@ public void testWithNormal() throws Exception {
}
}
}

// Opens the input with ReadAdvice.READONCE to ensure slice and clone are appropriately confined
public void testConfined() throws Exception {
final int size = 16;
byte[] bytes = new byte[size];
random().nextBytes(bytes);

try (Directory dir = new MMapDirectory(createTempDir("testConfined"))) {
try (IndexOutput out = dir.createOutput("test", IOContext.DEFAULT)) {
out.writeBytes(bytes, 0, bytes.length);
}

try (var in = dir.openInput("test", IOContext.READONCE);
var executor = Executors.newFixedThreadPool(1, new NamedThreadFactory("testConfined"))) {
// ensure accessible
assertEquals(16L, in.slice("test", 0, in.length()).length());
assertEquals(15L, in.slice("test", 1, in.length() - 1).length());

// ensure not accessible
Callable<Object> task1 = () -> in.slice("test", 0, in.length());
var x = expectThrows(ISE, () -> getAndUnwrap(executor.submit(task1)));
assertTrue(x.getMessage().contains("confined"));

int offset = random().nextInt((int) in.length());
int length = (int) in.length() - offset;
Callable<Object> task2 = () -> in.slice("test", offset, length);
x = expectThrows(ISE, () -> getAndUnwrap(executor.submit(task2)));
assertTrue(x.getMessage().contains("confined"));

// slice.slice
var slice = in.slice("test", 0, in.length());
Callable<Object> task3 = () -> slice.slice("test", 0, in.length());
x = expectThrows(ISE, () -> getAndUnwrap(executor.submit(task3)));
assertTrue(x.getMessage().contains("confined"));
// slice.clone
x = expectThrows(ISE, () -> getAndUnwrap(executor.submit(slice::clone)));
assertTrue(x.getMessage().contains("confined"));
}
}
}

static final Class<IllegalStateException> ISE = IllegalStateException.class;

static Object getAndUnwrap(Future<Object> future) throws Throwable {
try {
return future.get();
} catch (ExecutionException ee) {
throw ee.getCause();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -812,8 +812,9 @@ public synchronized IndexInput openInput(String name, IOContext context) throws
false);
}

IndexInput delegateInput =
in.openInput(name, LuceneTestCase.newIOContext(randomState, context));
context = LuceneTestCase.newIOContext(randomState, context);
final boolean confined = context == IOContext.READONCE;
IndexInput delegateInput = in.openInput(name, context);

final IndexInput ii;
int randomInt = randomState.nextInt(500);
Expand All @@ -822,15 +823,15 @@ public synchronized IndexInput openInput(String name, IOContext context) throws
System.out.println(
"MockDirectoryWrapper: using SlowClosingMockIndexInputWrapper for file " + name);
}
ii = new SlowClosingMockIndexInputWrapper(this, name, delegateInput);
ii = new SlowClosingMockIndexInputWrapper(this, name, delegateInput, confined);
} else if (useSlowOpenClosers && randomInt == 1) {
if (LuceneTestCase.VERBOSE) {
System.out.println(
"MockDirectoryWrapper: using SlowOpeningMockIndexInputWrapper for file " + name);
}
ii = new SlowOpeningMockIndexInputWrapper(this, name, delegateInput);
ii = new SlowOpeningMockIndexInputWrapper(this, name, delegateInput, confined);
} else {
ii = new MockIndexInputWrapper(this, name, delegateInput, null);
ii = new MockIndexInputWrapper(this, name, delegateInput, null, confined);
}
addFileHandle(ii, name, Handle.Input);
return ii;
Expand Down
Loading

0 comments on commit da41215

Please sign in to comment.