Skip to content

Commit

Permalink
sc8s: add collectRight to Either handling and align flattenF with oth…
Browse files Browse the repository at this point in the history
…er impls
  • Loading branch information
Marius Soutier authored and an-tex committed Nov 4, 2024
1 parent 57444e8 commit fa2f4a2
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
10 changes: 8 additions & 2 deletions akka-stream-utils/jvm/src/test/scala/StreamOpsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,18 @@ class StreamOpsSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with

checkTable2[Boolean, Int, Either](input, operations)
}
"Either flattenF" in {
"Either collectRightF" in {
Source(Seq(Right(1), Left(true), Right(2)))
.flattenF
.collectRightF
.runWith(Sink.seq)
.futureValue shouldBe Seq(1, 2)
}
"Either flattenF" in {
Source(Seq(Right(Right(1)), Left(true), Right(Left(false)), Right(Right(2))))
.flattenF
.runWith(Sink.seq)
.futureValue shouldBe Seq(Right(1), Left(true), Left(false), Right(2))
}
"Either flatMapMergeF" in {
Source(Seq(Right(1), Left(true), Right(2)))
.flatMapMergeF(8, element => Source(Seq(element * 2, element * 4)))
Expand Down
8 changes: 7 additions & 1 deletion akka-stream-utils/shared/src/main/scala/StreamOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,16 @@ object StreamOps {
case _ => Left(zero)
})

def flattenF: s.Repr[OutR] = s.collect {
def collectRightF: s.Repr[OutR] = s.collect {
case Right(value) => value
}

def flattenF[OutR2](implicit ev: OutR <:< Either[OutL, OutR2]): s.Repr[Either[OutL, OutR2]] =
s.map {
case Left(outL) => Left(outL)
case Right(outR) => ev.apply(outR)
}

def groupByF[K](maxSubstreams: Int, f: OutR => K): SubFlow[Either[OutL, OutR], Mat, s.Repr, s.Closed] = {
s.groupBy(maxSubstreams, {
case Right(value) => Some(f(value))
Expand Down

0 comments on commit fa2f4a2

Please sign in to comment.