Skip to content

Commit

Permalink
CancellableCollector is an abstract class now (at least until it will be
Browse files Browse the repository at this point in the history
converted to public API)
  • Loading branch information
amaembo committed Nov 1, 2015
1 parent d9a2932 commit 678e8f7
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 46 deletions.
21 changes: 10 additions & 11 deletions src/main/java/javax/util/streamex/AbstractStreamEx.java
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,13 @@ public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator,
*/
@Override
public <R, A> R collect(Collector<? super T, A, R> collector) {
if (collector instanceof CancellableCollector) {
CancellableCollector<? super T, A, R> c = (CancellableCollector<? super T, A, R>) collector;
BiConsumer<A, ? super T> acc = c.accumulator();
Predicate<A> finished = c.finished();
BinaryOperator<A> combiner = c.combiner();
Predicate<A> finished = finished(collector);
if (finished != null) {
BiConsumer<A, ? super T> acc = collector.accumulator();
BinaryOperator<A> combiner = collector.combiner();
Spliterator<T> spliterator = stream.spliterator();
if (!isParallel()) {
A a = c.supplier().get();
A a = collector.supplier().get();
if (!finished.test(a)) {
try {
// forEachRemaining can be much faster
Expand All @@ -294,16 +293,16 @@ public <R, A> R collect(Collector<? super T, A, R> collector) {
// ignore
}
}
return c.finisher().apply(a);
return collector.finisher().apply(a);
}
Spliterator<A> spltr;
if (!spliterator.hasCharacteristics(Spliterator.ORDERED)
|| c.characteristics().contains(Characteristics.UNORDERED)) {
spltr = new UnorderedCancellableSpliterator<>(spliterator, c.supplier(), acc, combiner, finished);
|| collector.characteristics().contains(Characteristics.UNORDERED)) {
spltr = new UnorderedCancellableSpliterator<>(spliterator, collector.supplier(), acc, combiner, finished);
} else {
spltr = new OrderedCancellableSpliterator<>(spliterator, c.supplier(), acc, combiner, finished);
spltr = new OrderedCancellableSpliterator<>(spliterator, collector.supplier(), acc, combiner, finished);
}
return c.finisher().apply(strategy().newStreamEx(StreamSupport.stream(spltr, true)).findFirst().get());
return collector.finisher().apply(strategy().newStreamEx(StreamSupport.stream(spltr, true)).findFirst().get());
}
return rawCollect(collector);
}
Expand Down
29 changes: 0 additions & 29 deletions src/main/java/javax/util/streamex/CancellableCollector.java

This file was deleted.

10 changes: 7 additions & 3 deletions src/main/java/javax/util/streamex/StreamExInternals.java
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ <T> Collector<T, A, R> asRef(BiConsumer<A, T> accumulator) {
characteristics.toArray(new Characteristics[characteristics.size()]));
}

<T> CancellableCollector<T, A, R> asCancellable(BiConsumer<A, T> accumulator, Predicate<A> finished) {
<T> Collector<T, A, R> asCancellable(BiConsumer<A, T> accumulator, Predicate<A> finished) {
return new CancellableCollectorImpl<>(supplier, accumulator, combiner(), finisher, finished,
characteristics);
}
Expand Down Expand Up @@ -553,8 +553,12 @@ static PartialCollector<StringBuilder, String> joining(CharSequence delimiter, C
return new PartialCollector<>(supplier, merger, StringBuilder::toString, NO_CHARACTERISTICS);
}
}

static abstract class CancellableCollector<T, A, R> implements Collector<T, A, R> {
abstract Predicate<A> finished();
}

static final class CancellableCollectorImpl<T, A, R> implements CancellableCollector<T, A, R> {
static final class CancellableCollectorImpl<T, A, R> extends CancellableCollector<T, A, R> {
private final Supplier<A> supplier;
private final BiConsumer<A, T> accumulator;
private final BinaryOperator<A> combiner;
Expand Down Expand Up @@ -599,7 +603,7 @@ public Set<Characteristics> characteristics() {
}

@Override
public Predicate<A> finished() {
Predicate<A> finished() {
return finished;
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/test/java/javax/util/streamex/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package javax.util.streamex;

import static org.junit.Assert.*;
import static javax.util.streamex.StreamExInternals.*;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -101,7 +102,7 @@ static <T> List<StreamExSupplier<T>> streamEx(Supplier<Stream<T>> base) {
}

static <T, R> void checkCollectorEmpty(String message, R expected, Collector<T, ?, R> collector) {
if (collector instanceof CancellableCollector)
if (finished(collector) != null)
checkShortCircuitCollector(message, expected, 0, Stream::empty, collector);
else
checkCollector(message, expected, Stream::empty, collector);
Expand All @@ -114,7 +115,7 @@ static <T, R> void checkShortCircuitCollector(String message, R expected, int ex

static <T, R> void checkShortCircuitCollector(String message, R expected, int expectedConsumedElements,
Supplier<Stream<T>> base, Collector<T, ?, R> collector, boolean skipIdentity) {
assertTrue(message, collector instanceof CancellableCollector);
assertNotNull(message, finished(collector));
Collector<T, ?, R> withIdentity = Collectors.collectingAndThen(collector, Function.identity());
for (StreamExSupplier<T> supplier : streamEx(base)) {
AtomicInteger counter = new AtomicInteger();
Expand All @@ -129,7 +130,7 @@ static <T, R> void checkShortCircuitCollector(String message, R expected, int ex

static <T, R> void checkCollector(String message, R expected, Supplier<Stream<T>> base, Collector<T, ?, R> collector) {
// use checkShortCircuitCollector for CancellableCollector
assertFalse(message, collector instanceof CancellableCollector);
assertNull(message, finished(collector));
for (StreamExSupplier<T> supplier : streamEx(base)) {
assertEquals(message + ": " + supplier, expected, supplier.get().collect(collector));
}
Expand Down

0 comments on commit 678e8f7

Please sign in to comment.