Skip to content

Commit

Permalink
Merge branch 'master' into limited-joiner
Browse files Browse the repository at this point in the history
  • Loading branch information
amaembo committed Nov 1, 2015
2 parents d8bc415 + 678e8f7 commit daa1e80
Show file tree
Hide file tree
Showing 24 changed files with 1,105 additions and 335 deletions.
12 changes: 11 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# StreamEx changes

### 0.4.1

* Added: `StreamEx/IntStreamEx/LongStreamEx/DoubleStreamEx.mapLast/mapFirst` methods.
* Added: `MoreCollectors.flatMapping` collector.
* Fixed: `StreamEx.cross(mapper)` now correctly handles the case when mapper returns null instead of empty stream.
* Optimized: ordered stateful short-circuit collectors now may process less elements in parallel.
* Optimized: `StreamEx/EntryStream.toList()/toListAndThen()/foldRight()/scanRight()` now faster, especially for sized stream.
* Updated documentation.

### 0.4.0

* Introduced the concept of short-circuiting collectors.
Expand All @@ -17,7 +26,8 @@
* Added `IntStreamEx/LongStreamEx.range/rangeClosed` methods with additional step parameter.
* Added `IntStreamEx/LongStreamEx/DoubleStreamEx.foldLeft` methods.
* Methods `StreamEx/EntryStream.toMap/toSortedMap/toCustomMap` without merge function now produce better exception message in the case of duplicate keys.
* Methods `StreamEx/EntryStream.toMap/toSortedMap/toCustomMap` accepting merge function do not return ConcurrentMap for parallel streams now (this caused incorrect merging for non-commutative merger functions).
* Methods `StreamEx/EntryStream.toMap/toSortedMap/toCustomMap` accepting merge function are not guaranteed to return ConcurrentMap for parallel streams now. They however guarantee now the correct merging order for non-commutative merger functions.
* Methods `StreamEx/EntryStream.grouping*` are not guaranteed to return the ConcurrentMap for parallel streams now. They however guarantee now the correct order of downstream collection.
* Methods `StreamEx.ofEntries` are declared as deprecated and may be removed in future releases!
* Deprecated methods `EntryStream.mapEntryKeys`/`mapEntryValues` are removed!
* Updated documentation
Expand Down
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# StreamEx
## Note for the users

I'm thinking about changing the package name for the library. If you like StreamEx, please take a minute to visit [this issue](https://github.com/amaembo/streamex/issues/8) and submit your thoughts. Thank you!

# StreamEx 0.4.0
Enhancing Java 8 Streams.

This library defines four classes: `StreamEx`, `IntStreamEx`, `LongStreamEx`, `DoubleStreamEx`
Expand Down Expand Up @@ -115,7 +119,7 @@ To use from maven add this snippet to the pom.xml `dependencies` section:
<dependency>
<groupId>io.github.amaembo</groupId>
<artifactId>streamex</artifactId>
<version>0.3.8</version>
<version>0.4.0</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>io.github.amaembo</groupId>
<artifactId>streamex</artifactId>
<version>0.4.0-SNAPSHOT</version>
<version>0.4.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>StreamEx</name>
Expand Down
58 changes: 32 additions & 26 deletions src/main/java/javax/util/streamex/AbstractStreamEx.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,15 @@ final <K, V, M extends Map<K, V>> M toMapThrowing(Function<? super T, ? extends
final <K, V, M extends Map<K, V>> void addToMap(M map, K key, V val) {
V oldVal = map.putIfAbsent(key, val);
if (oldVal != null) {
throw new IllegalStateException("Duplicate entry for key '" + key + "' (attempt to merge values '"
+ oldVal + "' and '" + val + "')");
throw new IllegalStateException("Duplicate entry for key '" + key + "' (attempt to merge values '" + oldVal
+ "' and '" + val + "')");
}
}

<R, A> R rawCollect(Collector<? super T, A, R> collector) {
return stream.collect(collector);
}

abstract S supply(Stream<T> stream);

@Override
Expand Down Expand Up @@ -268,14 +272,13 @@ public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator,
*/
@Override
public <R, A> R collect(Collector<? super T, A, R> collector) {
if (collector instanceof CancellableCollector) {
CancellableCollector<? super T, A, R> c = (CancellableCollector<? super T, A, R>) collector;
BiConsumer<A, ? super T> acc = c.accumulator();
Predicate<A> finished = c.finished();
BinaryOperator<A> combiner = c.combiner();
Predicate<A> finished = finished(collector);
if (finished != null) {
BiConsumer<A, ? super T> acc = collector.accumulator();
BinaryOperator<A> combiner = collector.combiner();
Spliterator<T> spliterator = stream.spliterator();
if (!isParallel()) {
A a = c.supplier().get();
A a = collector.supplier().get();
if (!finished.test(a)) {
try {
// forEachRemaining can be much faster
Expand All @@ -290,20 +293,18 @@ public <R, A> R collect(Collector<? super T, A, R> collector) {
// ignore
}
}
return c.finisher().apply(a);
return collector.finisher().apply(a);
}
Spliterator<A> spltr;
if (!spliterator.hasCharacteristics(Spliterator.ORDERED)
|| c.characteristics().contains(Characteristics.UNORDERED)) {
spltr = new UnorderedCancellableSpliterator<>(spliterator, c.supplier(), acc, combiner, finished);
return c.finisher().apply(strategy().newStreamEx(StreamSupport.stream(spltr, true)).findAny().get());
|| collector.characteristics().contains(Characteristics.UNORDERED)) {
spltr = new UnorderedCancellableSpliterator<>(spliterator, collector.supplier(), acc, combiner, finished);
} else {
spltr = new OrderedCancellableSpliterator<>(spliterator, c.supplier(), acc, finished);
return c.finisher().apply(
strategy().newStreamEx(StreamSupport.stream(spltr, true)).reduce(combiner).get());
spltr = new OrderedCancellableSpliterator<>(spliterator, collector.supplier(), acc, combiner, finished);
}
return collector.finisher().apply(strategy().newStreamEx(StreamSupport.stream(spltr, true)).findFirst().get());
}
return stream.collect(collector);
return rawCollect(collector);
}

@Override
Expand Down Expand Up @@ -1010,19 +1011,21 @@ public S prepend(Stream<? extends T> other) {
}

/**
* Returns a {@link List} containing the elements of this stream. There are
* no guarantees on the type, mutability, serializability, or thread-safety
* of the {@code List} returned; if more control over the returned
* {@code List} is required, use {@link #toCollection(Supplier)}.
* Returns a {@link List} containing the elements of this stream. The
* returned {@code List} is guaranteed to be mutable, but there are no
* guarantees on the type, serializability, or thread-safety; if more
* control over the returned {@code List} is required, use
* {@link #toCollection(Supplier)}.
*
* <p>
* This is a terminal operation.
*
* @return a {@code List} containing the elements of this stream
* @see Collectors#toList()
*/
@SuppressWarnings("unchecked")
public List<T> toList() {
return collect(Collectors.toList());
return new ArrayList<T>((Collection<T>) new ArrayCollection(toArray()));
}

/**
Expand All @@ -1039,16 +1042,18 @@ public List<T> toList() {
* @return result of applying the finisher transformation to the list of the
* stream elements.
* @since 0.2.3
* @see #toList()
*/
public <R> R toListAndThen(Function<List<T>, R> finisher) {
return collect(Collectors.collectingAndThen(Collectors.toList(), finisher));
return finisher.apply(toList());
}

/**
* Returns a {@link Set} containing the elements of this stream. There are
* no guarantees on the type, mutability, serializability, or thread-safety
* of the {@code Set} returned; if more control over the returned
* {@code Set} is required, use {@link #toCollection(Supplier)}.
* Returns a {@link Set} containing the elements of this stream. The
* returned {@code Set} is guaranteed to be mutable, but there are no
* guarantees on the type, serializability, or thread-safety; if more
* control over the returned {@code Set} is required, use
* {@link #toCollection(Supplier)}.
*
* <p>
* This is a terminal operation.
Expand All @@ -1074,6 +1079,7 @@ public Set<T> toSet() {
* @return result of applying the finisher transformation to the set of the
* stream elements.
* @since 0.2.3
* @see #toSet()
*/
public <R> R toSetAndThen(Function<Set<T>, R> finisher) {
return collect(Collectors.collectingAndThen(Collectors.toSet(), finisher));
Expand Down
29 changes: 0 additions & 29 deletions src/main/java/javax/util/streamex/CancellableCollector.java

This file was deleted.

18 changes: 9 additions & 9 deletions src/main/java/javax/util/streamex/CollapseSpliterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public boolean tryAdvance(Consumer<? super R> action) {
return true;
}
}
if(cur == none()) {// start
if(cur == NONE) {// start
if(!source.tryAdvance(this)) {
return accept(pushRight(none(), none()), action);
}
Expand All @@ -124,23 +124,23 @@ public void forEachRemaining(Consumer<? super R> action) {
while (left != null) {
accept(handleLeft(), action);
}
if(cur == none()) {
if(!source.tryAdvance(this)) {
accept(pushRight(none(), none()), action);
return;
}
if(cur != NONE) {
acc = mapper.apply(cur);
}
acc = mapper.apply(cur);
source.forEachRemaining(next -> {
if(!this.mergeable.test(cur, next)) {
if(cur == NONE) {
acc = mapper.apply(next);
} else if(!this.mergeable.test(cur, next)) {
action.accept(acc);
acc = mapper.apply(next);
} else {
acc = accumulator.apply(acc, next);
}
cur = next;
});
if(accept(pushRight(acc, cur), action)) {
if(cur == NONE) {
accept(pushRight(none(), none()), action);
} else if(accept(pushRight(acc, cur), action)) {
if(right != null) {
action.accept(right.acc);
right = null;
Expand Down
42 changes: 42 additions & 0 deletions src/main/java/javax/util/streamex/DoubleStreamEx.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,48 @@ public DoubleStreamEx map(DoubleUnaryOperator mapper) {
return strategy().newDoubleStreamEx(stream.map(mapper));
}

/**
* Returns a stream where the first element is the replaced with the result
* of applying the given function while the other elements are left intact.
*
* <p>
* This is an <a href="package-summary.html#StreamOps">quasi-intermediate
* operation</a>.
*
* @param mapper
* a <a
* href="package-summary.html#NonInterference">non-interfering
* </a>, <a
* href="package-summary.html#Statelessness">stateless</a>
* function to apply to the first element
* @return the new stream
* @since 0.4.1
*/
public DoubleStreamEx mapFirst(DoubleUnaryOperator mapper) {
return boxed().mapFirst(mapper::applyAsDouble).mapToDouble(Double::doubleValue);
}

/**
* Returns a stream where the last element is the replaced with the result
* of applying the given function while the other elements are left intact.
*
* <p>
* This is an <a href="package-summary.html#StreamOps">quasi-intermediate
* operation</a>.
*
* @param mapper
* a <a
* href="package-summary.html#NonInterference">non-interfering
* </a>, <a
* href="package-summary.html#Statelessness">stateless</a>
* function to apply to the first element
* @return the new stream
* @since 0.4.1
*/
public DoubleStreamEx mapLast(DoubleUnaryOperator mapper) {
return boxed().mapLast(mapper::applyAsDouble).mapToDouble(Double::doubleValue);
}

@Override
public <U> StreamEx<U> mapToObj(DoubleFunction<? extends U> mapper) {
return strategy().newStreamEx(stream.mapToObj(mapper));
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/javax/util/streamex/EntryStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collector.Characteristics;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -1015,7 +1016,7 @@ public <M extends Map<K, List<V>>> M grouping(Supplier<M> mapSupplier) {
public <A, D> Map<K, D> grouping(Collector<? super V, A, D> downstream) {
Function<Entry<K, V>, K> keyMapper = Entry::getKey;
Collector<Entry<K, V>, ?, D> mapping = Collectors.mapping(Entry::getValue, downstream);
if (stream.isParallel()) {
if (stream.isParallel() && downstream.characteristics().contains(Characteristics.UNORDERED)) {
return collect(Collectors.groupingByConcurrent(keyMapper, mapping));
}
return collect(Collectors.groupingBy(keyMapper, mapping));
Expand All @@ -1025,7 +1026,8 @@ public <A, D> Map<K, D> grouping(Collector<? super V, A, D> downstream) {
public <A, D, M extends Map<K, D>> M grouping(Supplier<M> mapSupplier, Collector<? super V, A, D> downstream) {
Function<Entry<K, V>, K> keyMapper = Entry::getKey;
Collector<Entry<K, V>, ?, D> mapping = Collectors.mapping(Entry::getValue, downstream);
if (stream.isParallel() && mapSupplier.get() instanceof ConcurrentMap) {
if (stream.isParallel() && downstream.characteristics().contains(Characteristics.UNORDERED)
&& mapSupplier.get() instanceof ConcurrentMap) {
return (M) collect(Collectors.groupingByConcurrent(keyMapper,
(Supplier<? extends ConcurrentMap<K, D>>) mapSupplier, mapping));
}
Expand Down
42 changes: 42 additions & 0 deletions src/main/java/javax/util/streamex/IntStreamEx.java
Original file line number Diff line number Diff line change
Expand Up @@ -1509,7 +1509,49 @@ public IntStreamEx dropWhile(IntPredicate predicate) {
}
return delegate(new TDOfInt(stream.spliterator(), true, predicate));
}

/**
* Returns a stream where the first element is the replaced with the result
* of applying the given function while the other elements are left intact.
*
* <p>
* This is an <a href="package-summary.html#StreamOps">quasi-intermediate
* operation</a>.
*
* @param mapper
* a <a
* href="package-summary.html#NonInterference">non-interfering
* </a>, <a
* href="package-summary.html#Statelessness">stateless</a>
* function to apply to the first element
* @return the new stream
* @since 0.4.1
*/
public IntStreamEx mapFirst(IntUnaryOperator mapper) {
return mapToObj(Integer::new).mapFirst(mapper::applyAsInt).mapToInt(Integer::intValue);
}

/**
* Returns a stream where the last element is the replaced with the result
* of applying the given function while the other elements are left intact.
*
* <p>
* This is an <a href="package-summary.html#StreamOps">quasi-intermediate
* operation</a>.
*
* @param mapper
* a <a
* href="package-summary.html#NonInterference">non-interfering
* </a>, <a
* href="package-summary.html#Statelessness">stateless</a>
* function to apply to the first element
* @return the new stream
* @since 0.4.1
*/
public IntStreamEx mapLast(IntUnaryOperator mapper) {
return mapToObj(Integer::new).mapLast(mapper::applyAsInt).mapToInt(Integer::intValue);
}

/**
* Returns an empty sequential {@code IntStreamEx}.
*
Expand Down
Loading

0 comments on commit daa1e80

Please sign in to comment.