diff --git a/benchmark/src/java/main/one/util/streamex/benchmark/withFirst/WithFirstBenchmark.java b/benchmark/src/java/main/one/util/streamex/benchmark/withFirst/WithFirstBenchmark.java
new file mode 100644
index 00000000..b649ad3f
--- /dev/null
+++ b/benchmark/src/java/main/one/util/streamex/benchmark/withFirst/WithFirstBenchmark.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2015, 2020 StreamEx contributors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package one.util.streamex.benchmark.withFirst;
+
+import one.util.streamex.IntStreamEx;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+@State(Scope.Benchmark)
+public class WithFirstBenchmark {
+ @Param({"100000"})
+ private int N;
+
+ @Benchmark
+ public Object[] parallelOld() {
+ return IntStreamEx.range(N).boxed().parallel().withFirstOld().toArray();
+ }
+
+ @Benchmark
+ public boolean parallelOldShortCircuit() {
+ return IntStreamEx.range(N).boxed().parallel().withFirstOld().anyMatch(x -> x.getKey() == -1);
+ }
+
+ @Benchmark
+ public Object[] parallelNew() {
+ return IntStreamEx.range(N).boxed().parallel().withFirst().toArray();
+ }
+
+ @Benchmark
+ public boolean parallelNewShortCircuit() {
+ return IntStreamEx.range(N).boxed().parallel().withFirst().anyMatch(x -> x.getKey() == -1);
+ }
+
+ @Benchmark
+ public Object[] sequentialOld() {
+ return IntStreamEx.range(N).boxed().withFirstOld().toArray();
+ }
+
+ @Benchmark
+ public boolean sequentialOldShortCircuit() {
+ return IntStreamEx.range(N).boxed().withFirstOld().anyMatch(x -> x.getKey() == -1);
+ }
+
+ @Benchmark
+ public Object[] sequentialNew() {
+ return IntStreamEx.range(N).boxed().withFirst().toArray();
+ }
+
+ @Benchmark
+ public boolean sequentialNewShortCircuit() {
+ return IntStreamEx.range(N).boxed().withFirst().anyMatch(x -> x.getKey() == -1);
+ }
+
+ public static void main(String[] args) throws RunnerException {
+ Options opt = new OptionsBuilder()
+ .include(WithFirstBenchmark.class.getSimpleName())
+ .build();
+
+ new Runner(opt).run();
+ }
+}
diff --git a/src/main/java/one/util/streamex/StreamEx.java b/src/main/java/one/util/streamex/StreamEx.java
index 0a0b4688..559b80da 100644
--- a/src/main/java/one/util/streamex/StreamEx.java
+++ b/src/main/java/one/util/streamex/StreamEx.java
@@ -1764,11 +1764,6 @@ public StreamEx intervalMap(BiPredicate super T, ? super T> sameInterva
*
* This is a quasi-intermediate
* operation.
- *
- *
- * The size of the resulting stream is one element less than the input
- * stream. If the input stream is empty or contains just one element, then
- * the output stream will be empty.
*
* @param The element type of the new stream
* @param mapper a non-interfering, stateless function to apply to the first
@@ -1791,11 +1786,6 @@ public StreamEx withFirst(BiFunction super T, ? super T, ? extends R> m
*
* This is a quasi-intermediate
* operation.
- *
- *
- * The size of the resulting stream is one element less than the input
- * stream. If the input stream is empty or contains just one element, then
- * the output stream will be empty.
*
* @return the new stream
* @see #withFirst(BiFunction)
@@ -1807,6 +1797,19 @@ public EntryStream withFirst() {
SimpleImmutableEntry::new);
return new EntryStream<>(spliterator, context);
}
+
+ @Deprecated
+ public StreamEx withFirstOld(BiFunction super T, ? super T, ? extends R> mapper) {
+ WithFirstSpliteratorOld spliterator = new WithFirstSpliteratorOld<>(spliterator(), mapper);
+ return new StreamEx<>(spliterator, context);
+ }
+
+ @Deprecated
+ public EntryStream withFirstOld() {
+ WithFirstSpliteratorOld> spliterator = new WithFirstSpliteratorOld<>(spliterator(),
+ SimpleImmutableEntry::new);
+ return new EntryStream<>(spliterator, context);
+ }
/**
* Creates a new {@link StreamEx} which is the result of applying of the
diff --git a/src/main/java/one/util/streamex/WithFirstSpliterator.java b/src/main/java/one/util/streamex/WithFirstSpliterator.java
index 295ce92e..ad96d45f 100644
--- a/src/main/java/one/util/streamex/WithFirstSpliterator.java
+++ b/src/main/java/one/util/streamex/WithFirstSpliterator.java
@@ -16,153 +16,139 @@
package one.util.streamex;
import java.util.Spliterator;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import one.util.streamex.Internals.CloneableSpliterator;
+import static one.util.streamex.Internals.NONE;
+
/**
* @author Tagir Valeev
*/
-/* package */final class WithFirstSpliterator extends CloneableSpliterator> implements Consumer {
- private static final int STATE_NONE = 0;
- private static final int STATE_FIRST_READ = 1;
- private static final int STATE_INIT = 2;
- private static final int STATE_EMPTY = 3;
-
- private ReentrantLock lock;
+/* package */final class WithFirstSpliterator extends CloneableSpliterator> {
+ private final Object lock = new Object();
private Spliterator source;
private WithFirstSpliterator prefix;
- private volatile T first;
- private volatile int state = STATE_NONE;
+ private T first = none();
+ private T[] firstRefHolder;
+ boolean firstToBeConsumed;
private final BiFunction super T, ? super T, ? extends R> mapper;
- private Consumer super R> action;
WithFirstSpliterator(Spliterator source, BiFunction super T, ? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
- private void acquire() {
- if (lock != null && state == STATE_NONE) {
- lock.lock();
- }
+ @SuppressWarnings("unchecked")
+ static T[] arrayOfNone() {
+ return (T[]) new Object[]{NONE};
}
- private void release() {
- if (lock != null && lock.isHeldByCurrentThread()) {
- lock.unlock();
+ @SuppressWarnings("unchecked")
+ static T none() {
+ return (T) NONE;
+ }
+
+ @Override
+ public Spliterator trySplit() {
+ boolean isInit = first != NONE || (firstRefHolder != null && (first = firstRefHolder[0]) != NONE);
+ if (isInit) {
+ return null;
+ }
+ if (firstRefHolder == null) {
+ firstRefHolder = arrayOfNone();
+ }
+ synchronized (lock) {
+ if (isInit()) {
+ return null;
+ }
+ Spliterator prefix = source.trySplit();
+ if (prefix == null) {
+ return null;
+ }
+ WithFirstSpliterator result = doClone();
+ result.source = prefix;
+ return this.prefix = result;
}
}
-
+
+ private boolean isInit() {
+ return first != NONE || (first = firstRefHolder[0]) != NONE;
+ }
+
@Override
public boolean tryAdvance(Consumer super R> action) {
- if (state == STATE_NONE) {
- acquire();
- try {
+ boolean isNotInit = first == NONE && (firstRefHolder == null || (first = firstRefHolder[0]) == NONE);
+ if (isNotInit) {
+ if (firstRefHolder == null) {
+ return source.tryAdvance(x -> {
+ first = x;
+ action.accept(mapper.apply(first, x));
+ });
+ } else {
doInit();
- } finally {
- release();
+ if (first == NONE) {
+ return false;
+ }
}
}
- if (state == STATE_FIRST_READ) {
- state = STATE_INIT;
+ if (firstToBeConsumed) {
+ firstToBeConsumed = false;
action.accept(mapper.apply(first, first));
return true;
}
- if (state != STATE_INIT)
- return false;
- this.action = action;
- boolean hasNext = source.tryAdvance(this);
- this.action = null;
- return hasNext;
+ return source.tryAdvance(x -> action.accept(mapper.apply(first, x)));
}
-
+
private void doInit() {
- int prefixState = state;
- if (prefixState != STATE_NONE)
- return;
- if (prefix != null) {
- prefix.doInit();
- prefixState = prefix.state;
- }
- if (prefixState == STATE_FIRST_READ || prefixState == STATE_INIT) {
- first = prefix.first;
- state = STATE_INIT;
- return;
+ synchronized (lock) {
+ if (prefix != null) {
+ prefix.doInit();
+ }
+ if (!isInit()) {
+ source.tryAdvance(x -> {
+ firstToBeConsumed = true;
+ first = x;
+ firstRefHolder[0] = x;
+ });
+ }
}
- state = source.tryAdvance(x -> first = x) ? STATE_FIRST_READ : STATE_EMPTY;
}
-
+
@Override
public void forEachRemaining(Consumer super R> action) {
- acquire();
- int myState = state;
- this.action = action;
- if (myState == STATE_FIRST_READ || myState == STATE_INIT) {
- release();
- if (myState == STATE_FIRST_READ) {
- state = STATE_INIT;
- accept(first);
- }
- source.forEachRemaining(this);
- this.action = null;
- return;
- }
- try {
+ if (firstRefHolder == null) {
Consumer init = x -> {
- if (state == STATE_NONE) {
- if (prefix != null) {
- prefix.doInit();
- }
- this.first = (prefix == null || prefix.state == STATE_EMPTY) ? x : prefix.first;
- state = STATE_INIT;
+ if (first == NONE) {
+ first = x;
}
- release();
};
- source.forEachRemaining(init.andThen(this));
- this.action = null;
- } finally {
- release();
+ source.forEachRemaining(init.andThen(x -> action.accept(mapper.apply(first, x))));
+ } else {
+ init();
+ if (firstToBeConsumed) {
+ firstToBeConsumed = false;
+ action.accept(mapper.apply(first, first));
+ }
+ source.forEachRemaining(x -> action.accept(mapper.apply(first, x)));
}
}
-
- @Override
- public Spliterator trySplit() {
- if (state != STATE_NONE)
- return null;
- Spliterator prefix;
- if (lock == null)
- lock = new ReentrantLock();
- acquire();
- try {
- if (state != STATE_NONE)
- return null;
- prefix = source.trySplit();
- if (prefix == null)
- return null;
- WithFirstSpliterator result = doClone();
- result.source = prefix;
- return this.prefix = result;
- } finally {
- release();
+
+ private void init() {
+ if (!isInit()) {
+ doInit();
}
}
-
+
@Override
public long estimateSize() {
return source.estimateSize();
}
-
+
@Override
public int characteristics() {
return NONNULL
- | (source.characteristics() & (DISTINCT | IMMUTABLE | CONCURRENT | ORDERED | (lock == null ? SIZED : 0)));
- }
-
- @Override
- public void accept(T x) {
- action.accept(mapper.apply(first, x));
+ | (source.characteristics() & (DISTINCT | IMMUTABLE | CONCURRENT | ORDERED | (firstRefHolder == null ? SIZED : 0)));
}
}
diff --git a/src/main/java/one/util/streamex/WithFirstSpliteratorOld.java b/src/main/java/one/util/streamex/WithFirstSpliteratorOld.java
new file mode 100644
index 00000000..aa9c5f77
--- /dev/null
+++ b/src/main/java/one/util/streamex/WithFirstSpliteratorOld.java
@@ -0,0 +1,168 @@
+/*
+ * Copyright 2015, 2019 StreamEx contributors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package one.util.streamex;
+
+import java.util.Spliterator;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+
+import one.util.streamex.Internals.CloneableSpliterator;
+
+/**
+ * @author Tagir Valeev
+ */
+/* package */final @Deprecated class WithFirstSpliteratorOld extends CloneableSpliterator> implements Consumer {
+ private static final int STATE_NONE = 0;
+ private static final int STATE_FIRST_READ = 1;
+ private static final int STATE_INIT = 2;
+ private static final int STATE_EMPTY = 3;
+
+ private ReentrantLock lock;
+ private Spliterator source;
+ private WithFirstSpliteratorOld prefix;
+ private volatile T first;
+ private volatile int state = STATE_NONE;
+ private final BiFunction super T, ? super T, ? extends R> mapper;
+ private Consumer super R> action;
+
+ WithFirstSpliteratorOld(Spliterator source, BiFunction super T, ? super T, ? extends R> mapper) {
+ this.source = source;
+ this.mapper = mapper;
+ }
+
+ private void acquire() {
+ if (lock != null && state == STATE_NONE) {
+ lock.lock();
+ }
+ }
+
+ private void release() {
+ if (lock != null && lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean tryAdvance(Consumer super R> action) {
+ if (state == STATE_NONE) {
+ acquire();
+ try {
+ doInit();
+ } finally {
+ release();
+ }
+ }
+ if (state == STATE_FIRST_READ) {
+ state = STATE_INIT;
+ action.accept(mapper.apply(first, first));
+ return true;
+ }
+ if (state != STATE_INIT)
+ return false;
+ this.action = action;
+ boolean hasNext = source.tryAdvance(this);
+ this.action = null;
+ return hasNext;
+ }
+
+ private void doInit() {
+ int prefixState = state;
+ if (prefixState != STATE_NONE)
+ return;
+ if (prefix != null) {
+ prefix.doInit();
+ prefixState = prefix.state;
+ }
+ if (prefixState == STATE_FIRST_READ || prefixState == STATE_INIT) {
+ first = prefix.first;
+ state = STATE_INIT;
+ return;
+ }
+ state = source.tryAdvance(x -> first = x) ? STATE_FIRST_READ : STATE_EMPTY;
+ }
+
+ @Override
+ public void forEachRemaining(Consumer super R> action) {
+ acquire();
+ int myState = state;
+ this.action = action;
+ if (myState == STATE_FIRST_READ || myState == STATE_INIT) {
+ release();
+ if (myState == STATE_FIRST_READ) {
+ state = STATE_INIT;
+ accept(first);
+ }
+ source.forEachRemaining(this);
+ this.action = null;
+ return;
+ }
+ try {
+ Consumer init = x -> {
+ if (state == STATE_NONE) {
+ if (prefix != null) {
+ prefix.doInit();
+ }
+ this.first = (prefix == null || prefix.state == STATE_EMPTY) ? x : prefix.first;
+ state = STATE_INIT;
+ }
+ release();
+ };
+ source.forEachRemaining(init.andThen(this));
+ this.action = null;
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public Spliterator trySplit() {
+ if (state != STATE_NONE)
+ return null;
+ Spliterator prefix;
+ if (lock == null)
+ lock = new ReentrantLock();
+ acquire();
+ try {
+ if (state != STATE_NONE)
+ return null;
+ prefix = source.trySplit();
+ if (prefix == null)
+ return null;
+ WithFirstSpliteratorOld result = doClone();
+ result.source = prefix;
+ return this.prefix = result;
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public long estimateSize() {
+ return source.estimateSize();
+ }
+
+ @Override
+ public int characteristics() {
+ return NONNULL
+ | (source.characteristics() & (DISTINCT | IMMUTABLE | CONCURRENT | ORDERED | (lock == null ? SIZED : 0)));
+ }
+
+ @Override
+ public void accept(T x) {
+ action.accept(mapper.apply(first, x));
+ }
+}
diff --git a/src/test/java/one/util/streamex/StreamExTest.java b/src/test/java/one/util/streamex/StreamExTest.java
index 532dd92c..d23dd61a 100644
--- a/src/test/java/one/util/streamex/StreamExTest.java
+++ b/src/test/java/one/util/streamex/StreamExTest.java
@@ -1953,6 +1953,8 @@ public void testWithFirst() {
streamEx(() -> StreamEx.of(5, 10, 13, 12, 11), s -> assertEquals(asList("5+0", "5+5", "5+8", "5+7", "5+6"), s
.get().withFirst((a, b) -> a + "+" + (b - a)).toList()));
+
+ streamEx(StreamEx::empty, s -> assertEquals(Optional.empty(), s.get().withFirst().findAny()));
}
@Test