Skip to content

Commit

Permalink
Add stream based shard division using a Closeable Stream
Browse files Browse the repository at this point in the history
  • Loading branch information
tokee committed Oct 13, 2023
1 parent 97fa3e4 commit f33f8bb
Show file tree
Hide file tree
Showing 3 changed files with 342 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,29 @@ public Thread newThread(Runnable r) {
*/
private static final Semaphore gatekeeper = new Semaphore(PropertiesLoader.SOLR_STREAM_SHARD_DIVIDE_CONCURRENT_MAX);

/**
* Depending on the backing Solr Cloud topology, the collection and the {@link SRequest#shardDivide} and
* {@link SRequest#autoShardDivideLimit}, either standard collection based document search & delivery or
* shard dividing search & delivery is used to provide an Stream of {@link SolrDocument}s.
* <p>
* Important: This method returns a {@link dk.kb.netarchivesuite.solrwayback.util.CollectionUtils.CloseableStream}
* and the caller <strong>must</strong> ensure that it is either depleted or closed after use, to avoid resource
* leaking. It is highly recommended to use {@code try-with-resources} directly on the returned stream:
* <pre>
* try (CollectionUtils.CloseableStream<SolrDocument> docs =
* SolrStreamShard.streamStrategy(myRequest) {
* long hugeIDs = docs.map(doc -> doc.get("id)).filter(id -> id.length() > 200).count();
* }
* </pre>
* @param request stream setup.
* @return an Stream of {@code SolrDocument}s, as specified in the {@code request}.
*/
public static CollectionUtils.CloseableStream<SolrDocument> streamStrategy(
SRequest request) throws IllegalArgumentException {
return new CollectionUtils.CloseableStream<>(iterateStrategy(request));
}


/**
* Depending on the backing Solr Cloud topology, the collection and the {@link SRequest#shardDivide} and
* {@link SRequest#autoShardDivideLimit}, either standard collection based document search & delivery or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,34 @@
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
import java.util.function.UnaryOperator;
import java.util.stream.BaseStream;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -849,5 +864,289 @@ public static <T> CloseableIterator<T> mergeIteratorsBuffered(
Iterator<T> merged = mergeIterators(constrainedAndBuffered, comparator);
return CloseableIterator.of(merged, continueProcessing);
}

/**
* Special purpose Stream used by {@link dk.kb.netarchivesuite.solrwayback.solr.SolrStreamShard}
* for signalling cancellation in case of early iterator termination.
* <p>
* Important: To avoid resource leaks, this stream must be closed at the root, preferably by a
* try-with-resources, e.g.
* <pre>
* try (CollectionUtils.CloseableStream<SolrDocument> docs =
* SolrStreamShard.streamStrategy(myRequest) {
* long hugeIDs = docs.map(doc -> doc.get("id)).filter(id -> id.length() > 200).count();
* }
* </pre>
*/
public static class CloseableStream<T> implements Stream<T>, Closeable, AutoCloseable {
private final Stream<T> inner;
private final AtomicBoolean continueProcessing;

/**
* Construct a Stream where calls to {@link CloseableStream#close()} sets {@code continueProcessing} to
* {@code false}. A common use case if the us {@code continueProcessing} as a signal for other parts of
* the stream chain to also stop processing.
* @param inner any stream, but often with elements sharing the {@code continueProcessing}.
* @param continueProcessing a signal of whether or not to continue processing in this context.
*/
public CloseableStream(Stream<T> inner, AtomicBoolean continueProcessing) {
this.inner = inner;
this.continueProcessing = continueProcessing;
}

/**
* Specialized constructor converting an {@link CloseableIterator} to a {@code CloseableStream}.
* The {@link CloseableIterator#continueProcessing} signaling boolean will be used for the constructed
* {@code CloseableStream}.
* @param iterator an iterator supporting {@link Closeable#close()}.
*/
public CloseableStream(CloseableIterator<T> iterator) {
this.inner = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false);
this.continueProcessing = iterator.continueProcessing;
}

/**
* Specialized constructor converting an {@link CloseableIterator} to a {@code CloseableStream}.
* The {@link CloseableIterator#continueProcessing} signaling boolean will be used for the constructed
* {@code CloseableStream}.
* @param iterator an iterator supporting {@link Closeable#close()}.
*/
public CloseableStream<T> of(CloseableIterator<T> iterator) {
return new CloseableStream<>(iterator);
}

@Override
public void close() {
continueProcessing.set(false);
inner.close();
}

/* Direct delegates below */

@Override
public Stream<T> filter(Predicate<? super T> predicate) {
return inner.filter(predicate);
}

@Override
public <R> Stream<R> map(Function<? super T, ? extends R> mapper) {
return inner.map(mapper);
}

@Override
public IntStream mapToInt(ToIntFunction<? super T> mapper) {
return inner.mapToInt(mapper);
}

@Override
public LongStream mapToLong(ToLongFunction<? super T> mapper) {
return inner.mapToLong(mapper);
}

@Override
public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {
return inner.mapToDouble(mapper);
}

@Override
public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) {
return inner.flatMap(mapper);
}

@Override
public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) {
return inner.flatMapToInt(mapper);
}

@Override
public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) {
return inner.flatMapToLong(mapper);
}

@Override
public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) {
return inner.flatMapToDouble(mapper);
}

@Override
public Stream<T> distinct() {
return inner.distinct();
}

@Override
public Stream<T> sorted() {
return inner.sorted();
}

@Override
public Stream<T> sorted(Comparator<? super T> comparator) {
return inner.sorted(comparator);
}

@Override
public Stream<T> peek(Consumer<? super T> action) {
return inner.peek(action);
}

@Override
public Stream<T> limit(long maxSize) {
return inner.limit(maxSize);
}

@Override
public Stream<T> skip(long n) {
return inner.skip(n);
}

@Override
public void forEach(Consumer<? super T> action) {
inner.forEach(action);
}

@Override
public void forEachOrdered(Consumer<? super T> action) {
inner.forEachOrdered(action);
}

@Override
public Object[] toArray() {
return inner.toArray();
}

@Override
public <A> A[] toArray(IntFunction<A[]> generator) {
return inner.toArray(generator);
}

@Override
public T reduce(T identity, BinaryOperator<T> accumulator) {
return inner.reduce(identity, accumulator);
}

@Override
public Optional<T> reduce(BinaryOperator<T> accumulator) {
return inner.reduce(accumulator);
}

@Override
public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
return inner.reduce(identity, accumulator, combiner);
}

@Override
public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
return inner.collect(supplier, accumulator, combiner);
}

@Override
public <R, A> R collect(Collector<? super T, A, R> collector) {
return inner.collect(collector);
}

@Override
public Optional<T> min(Comparator<? super T> comparator) {
return inner.min(comparator);
}

@Override
public Optional<T> max(Comparator<? super T> comparator) {
return inner.max(comparator);
}

@Override
public long count() {
return inner.count();
}

@Override
public boolean anyMatch(Predicate<? super T> predicate) {
return inner.anyMatch(predicate);
}

@Override
public boolean allMatch(Predicate<? super T> predicate) {
return inner.allMatch(predicate);
}

@Override
public boolean noneMatch(Predicate<? super T> predicate) {
return inner.noneMatch(predicate);
}

@Override
public Optional<T> findFirst() {
return inner.findFirst();
}

@Override
public Optional<T> findAny() {
return inner.findAny();
}

public static <T1> Builder<T1> builder() {
return Stream.builder();
}

public static <T1> Stream<T1> empty() {
return Stream.empty();
}

public static <T1> Stream<T1> of(T1 t1) {
return Stream.of(t1);
}

@SafeVarargs
public static <T1> Stream<T1> of(T1... values) {
return Stream.of(values);
}

public static <T1> Stream<T1> iterate(T1 seed, UnaryOperator<T1> f) {
return Stream.iterate(seed, f);
}

public static <T1> Stream<T1> generate(Supplier<T1> s) {
return Stream.generate(s);
}

public static <T1> Stream<T1> concat(Stream<? extends T1> a, Stream<? extends T1> b) {
return Stream.concat(a, b);
}

@Override
public Iterator<T> iterator() {
return inner.iterator();
}

@Override
public Spliterator<T> spliterator() {
return inner.spliterator();
}

@Override
public boolean isParallel() {
return inner.isParallel();
}

@Override
public Stream<T> sequential() {
return inner.sequential();
}

@Override
public Stream<T> parallel() {
return inner.parallel();
}

@Override
public Stream<T> unordered() {
return inner.unordered();
}

@Override
public Stream<T> onClose(Runnable closeHandler) {
return inner.onClose(closeHandler);
}

}
}

Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.junit.Assert.*;

Expand Down Expand Up @@ -243,11 +244,29 @@ public void testShardDivideDeduplicate() {
.query("*:*")
.fields("id")
.shardDivide("always")
.maxResults(5)
.maxResults(50)
.deduplicateField("domain");
assertDocsEquals(request);
}

@Test
public void testShardDivideStreaming() {
if (!AVAILABLE) {
return;
}
PropertiesLoader.SOLR_SERVER = LOCAL_SOLR + "/" + COLLECTION;
SRequest request = new SRequest()
.solrClient(solrClient)
.query("*:*")
.fields("id")
.shardDivide("always")
.maxResults(50)
.deduplicateField("domain");
try (Stream<SolrDocument> docs = SolrStreamShard.streamStrategy(request)) {
assertTrue("More than 1 documents should be returned", docs.count() > 1);
}
}

@Test
public void testShardDivideDeduplicateDump() {
if (!AVAILABLE) {
Expand Down

0 comments on commit f33f8bb

Please sign in to comment.