Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ox to 0.5.1 #2310

Merged
merged 1 commit into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ val zio2Version = "2.1.11"
val zio1InteropRsVersion = "1.3.12"
val zio2InteropRsVersion = "2.0.2"

val oxVersion = "0.4.0"
val oxVersion = "0.5.1"
val sttpModelVersion = "1.7.11"
val sttpSharedVersion = "1.3.22"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package sttp.client4.impl.ox.sse

import ox.*
import ox.channels.Source
import ox.flow.Flow
import sttp.model.sse.ServerSentEvent

import java.io.InputStream

object OxServerSentEvents:
def parse(is: InputStream)(using Ox): Source[ServerSentEvent] =
Source
def parse(is: InputStream): Flow[ServerSentEvent] =
Flow
.fromInputStream(is)
.linesUtf8
.mapStatefulConcat(() => List.empty[String])(
Expand All @@ -18,5 +17,5 @@ object OxServerSentEvents:
else None
}
)
.filterAsView(_.nonEmpty)
.filter(_.nonEmpty)
.map(ServerSentEvent.parse)
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,23 @@ import sttp.client4.ws.SyncWebSocket
import sttp.ws.WebSocketFrame

import scala.util.control.NonFatal
import ox.flow.Flow

/** Converts a [[SyncWebSocket]] into a pair of `Source` of server responses and a `Sink` for client requests. The
/** Converts a [[SyncWebSocket]] into a pair of [[Source]] of server responses and a [[Sink]] for client requests. The
* `Source` starts receiving frames immediately, its internal buffer size can be adjusted with an implicit
* [[ox.channels.StageCapacity]]. Make sure that the `Source` is contiunually read. This will guarantee that
* server-side Close signal is received and handled. If you don't want to process frames from the server, you can at
* [[ox.channels.BufferCapacity]]. Make sure that the `Source` is contiunually read. This will guarantee that
* server-side close signal is received and handled. If you don't want to process frames from the server, you can at
* least handle it with a `fork { source.drain() }`.
*
* You don't need to manually call `ws.close()` when using this approach, this will be handled automatically
* underneath, according to following rules:
* - If the request `Sink` is closed due to an upstream error, a Close frame is sent, and the `Source` with incoming
* responses gets completed as `Done`.
* - If the request `Sink` completes as `Done`, a `Close` frame is sent, and the response `Sink` keeps receiving
* - If the request sink is closed due to an upstream error, a close frame is sent. The response sink keeps receiving
* responses, until the enclosing [[Ox]] scope ends (that is controlled by the caller). When this happens, the fork
* which populates the response channel will be interrupted.
* - If the request sink completes as done, a close frame is sent. As above, the response sink keeps receiving
* responses until the server closes communication.
* - If the response `Source` is closed by a Close frome from the server or due to an error, the request Sink is
* closed as `Done`, which will still send all outstanding buffered frames, and then finish.
* - If the response source is closed by a close frame from the server or due to an error, the request sink is closed
* as done. This will attempt to send all outstanding buffered frames, unless the enclosing scope ends beforehand).
*
* @param ws
* a `SyncWebSocket` where the underlying `Sink` will send requests, and where the `Source` will pull responses from.
Expand All @@ -29,38 +31,30 @@ import scala.util.control.NonFatal
*/
def asSourceAndSink(ws: SyncWebSocket, concatenateFragmented: Boolean = true)(using
Ox,
StageCapacity
BufferCapacity
): (Source[WebSocketFrame], Sink[WebSocketFrame]) =
val requestsChannel = StageCapacity.newChannel[WebSocketFrame]
val responsesChannel = StageCapacity.newChannel[WebSocketFrame]
fork {
try
val requestsChannel = BufferCapacity.newChannel[WebSocketFrame]

val responsesChannel = Flow
.usingEmit[WebSocketFrame] { emit =>
repeatWhile {
ws.receive() match
case frame: WebSocketFrame.Data[_] =>
responsesChannel.sendOrClosed(frame) match
case _: ChannelClosed => false
case _ => true
case WebSocketFrame.Close(status, msg) if status > 1001 =>
responsesChannel.errorOrClosed(new WebSocketClosedWithError(status, msg)).discard
false
case _: WebSocketFrame.Close =>
responsesChannel.doneOrClosed().discard
false
case frame: WebSocketFrame.Data[_] => emit(frame); true
case WebSocketFrame.Close(status, msg) if status > 1001 => throw new WebSocketClosedWithError(status, msg)
case _: WebSocketFrame.Close => false
case ping: WebSocketFrame.Ping =>
requestsChannel.sendOrClosed(WebSocketFrame.Pong(ping.payload)).discard
// Keep receiving even if pong couldn't be send due to closed request channel. We want to process
// Keep receiving even if pong couldn't be sent due to closed request channel. We want to process
// whatever responses there are still coming from the server until it signals the end with a Close frome.
true
case _: WebSocketFrame.Pong =>
// ignore pongs
true
}
catch
case NonFatal(err) =>
responsesChannel.errorOrClosed(err).discard
finally requestsChannel.doneOrClosed().discard
}.discard
}
.pipe(optionallyConcatenateFrames(_, concatenateFragmented))
.onComplete(requestsChannel.doneOrClosed().discard)
.runToChannel()

fork {
try
Expand All @@ -78,7 +72,7 @@ def asSourceAndSink(ws: SyncWebSocket, concatenateFragmented: Boolean = true)(us
case ChannelClosed.Error(err) =>
// There's no proper "client error" status. Statuses 4000+ are available for custom cases
ws.send(WebSocketFrame.Close(4000, "Client error"))
responsesChannel.doneOrClosed().discard
// Assuming the responsesChannel fork will get interrupted because the enclosing scope will end
false
}
catch
Expand All @@ -87,17 +81,17 @@ def asSourceAndSink(ws: SyncWebSocket, concatenateFragmented: Boolean = true)(us
if (!responsesChannel.isClosedForReceive) requestsChannel.errorOrClosed(err).discard
}.discard

(optionallyConcatenateFrames(responsesChannel, concatenateFragmented), requestsChannel)
(responsesChannel, requestsChannel)

final case class WebSocketClosedWithError(statusCode: Int, msg: String)
extends Exception(s"WebSocket closed with status $statusCode: $msg")

private def optionallyConcatenateFrames(s: Source[WebSocketFrame], doConcatenate: Boolean)(using
private def optionallyConcatenateFrames(f: Flow[WebSocketFrame], doConcatenate: Boolean)(using
Ox
): Source[WebSocketFrame] =
): Flow[WebSocketFrame] =
if doConcatenate then
type Accumulator = Option[Either[Array[Byte], String]]
s.mapStateful(() => None: Accumulator) {
f.mapStateful(() => None: Accumulator) {
case (None, f: WebSocketFrame.Ping) => (None, Some(f))
case (None, f: WebSocketFrame.Pong) => (None, Some(f))
case (None, f: WebSocketFrame.Close) => (None, Some(f))
Expand All @@ -115,5 +109,5 @@ private def optionallyConcatenateFrames(s: Source[WebSocketFrame], doConcatenate
s"Unexpected WebSocket frame received during concatenation. Frame received: ${f.getClass
.getSimpleName()}, accumulator type: ${acc.map(_.getClass.getSimpleName)}"
)
}.collectAsView { case Some(f: WebSocketFrame) => f }
else s
}.collect { case Some(f: WebSocketFrame) => f }
else f
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package sttp.client4.impl.ox.sse
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*
import sttp.client4.*
import sttp.client4.testing.HttpTest.*
import sttp.model.sse.ServerSentEvent
Expand All @@ -14,7 +13,7 @@ class OxServerSentEventsTest extends AnyFlatSpec with Matchers with BeforeAndAft

behavior of "OxServerSentEvents"

it should "parse SSEs" in supervised {
it should "parse SSEs" in {
val sseData = "text1 in line1\ntext2 in line2"
val expectedEvent = ServerSentEvent(data = Some(sseData), eventType = Some("test-event"), retry = Some(42000))
val expectedEvents =
Expand All @@ -23,7 +22,7 @@ class OxServerSentEventsTest extends AnyFlatSpec with Matchers with BeforeAndAft
.post(uri"$endpoint/sse/echo3")
.body(sseData)
.response(asInputStreamAlways { is =>
OxServerSentEvents.parse(is).take(3).toList shouldBe expectedEvents
OxServerSentEvents.parse(is).take(3).runToList() shouldBe expectedEvents
()
})
.send(backend)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.scalatest.concurrent.Eventually
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*
import ox.flow.Flow
import ox.channels.ChannelClosed
import ox.channels.Sink
import ox.channels.Source
Expand Down Expand Up @@ -62,7 +63,7 @@ class OxWebSocketTest extends AnyFlatSpec with BeforeAndAfterAll with Matchers w
val (wsSource, wsSink) = asSourceAndSink(ws)
wsSink.send(WebSocketFrame.text("test1"))
wsSink.error(new Exception("failed source"))
eventually(wsSource.isClosedForReceiveDetail shouldBe Some(ChannelClosed.Done))
eventually(wsSource.isClosedForReceiveDetail should matchPattern { case Some(ChannelClosed.Error(_)) => })
})
.send(backend)
}
Expand Down Expand Up @@ -117,7 +118,7 @@ class OxWebSocketTest extends AnyFlatSpec with BeforeAndAfterAll with Matchers w
.response(asWebSocket { ws =>
val (wsSource, wsSink) = asSourceAndSink(ws, concatenateFragmented = false)
sendText(wsSink, 1)
wsSource.take(3).toList shouldBe List(
Flow.fromSource(wsSource).take(3).runToList() shouldBe List(
WebSocketFrame.Text("echo: ", false, None),
WebSocketFrame.Text("test1", false, None),
WebSocketFrame.Text("", true, None)
Expand Down
Loading