Skip to content

Commit

Permalink
finos#1434 making sure web socket test times out if expected message …
Browse files Browse the repository at this point in the history
…is not returned in time
  • Loading branch information
naleeha committed Sep 9, 2024
1 parent 2c683ac commit 7cebe08
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class CoreServerApiHandler(val viewPortContainer: ViewPortContainer,
override def process(msg: ViewPortRpcCall)(ctx: RequestContext): Option[ViewServerMessage] = {
Try(viewPortContainer.callRpcService(msg.vpId, msg.rpcName, msg.params, msg.namedParams, ctx.session)(ctx)) match {
case Success(action) =>
logger.info("Processed VP RPC call" + msg)
logger.info("Processed VP RPC call " + msg)
vsMsg(ViewPortRpcResponse(msg.vpId, msg.rpcName, action))(ctx)
case Failure(e) =>
logger.info("Failed to process VP RPC call", e)
Expand Down Expand Up @@ -418,15 +418,15 @@ class CoreServerApiHandler(val viewPortContainer: ViewPortContainer,
private def handleViewPortRpcRequest(msg: RpcRequest, viewPortId: String, ctx: RequestContext) = {
val response = Try(viewPortContainer.handleRpcRequest(viewPortId, msg.rpcName, msg.params)(ctx)) match {
case Success(functionResult) =>
logger.info(s"Processed VP RPC call ${ctx.requestId}" + msg)
logger.info(s"Processed Rpc Request ${ctx.requestId} " + msg)
functionResult match {
case RpcFunctionSuccess(data) =>
RpcResponseNew(rpcName = msg.rpcName, result = RpcSuccessResult(data), NoneAction())
case RpcFunctionFailure(errorCode, error, exception) =>
createErrorRpcResponse(msg, error)
}
case Failure(e) =>
logger.info(s"Failed to process VP RPC call ${ctx.requestId}", e)
logger.info(s"Failed to process Rpc Request ${ctx.requestId}", e)
createErrorRpcResponse(msg, e.toString)
}
vsMsg(response)(ctx)
Expand Down
6 changes: 4 additions & 2 deletions vuu/src/main/scala/org/finos/vuu/net/ViewServerClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ class WebSocketViewServerClient(ws: WebSocketClient, serializer: Serializer[Stri

}

logger.info("Websocket should be up.")
logger.info(s"[WSClient] Websocket on ${ws.uri} should be up.")
}

override def doStop(): Unit = {}
override def doStop(): Unit = {
logger.info(s"[WSClient] Websocket on ${ws.uri} stopping.")
}

override def doInitialize(): Unit = {}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
package org.finos.vuu.wsapi.helpers

import com.typesafe.scalalogging.StrictLogging
import org.finos.toolbox.jmx.{MetricsProvider, MetricsProviderImpl}
import org.finos.toolbox.lifecycle.LifecycleContainer
import org.finos.toolbox.time.Clock
import org.finos.vuu.core.{VuuClientConnectionOptions, VuuSecurityOptions, VuuServer, VuuServerConfig, VuuThreadingOptions, VuuWebSocketOptions}
import org.finos.vuu.core.module.{TableDefContainer, ViewServerModule}
import org.finos.vuu.net.{AlwaysHappyLoginValidator, ViewServerClient, WebSocketViewServerClient}
import org.finos.vuu.core._
import org.finos.vuu.net.auth.AlwaysHappyAuthenticator
import org.finos.vuu.net.http.VuuHttp2ServerOptions
import org.finos.vuu.net.json.JsonVsSerializer
import org.finos.vuu.net.ws.WebSocketClient
import org.finos.vuu.net.{AlwaysHappyLoginValidator, ViewServerClient, WebSocketViewServerClient}

class TestStartUp(moduleFactoryFunc: () => ViewServerModule)(
implicit val timeProvider: Clock,
implicit val lifecycle: LifecycleContainer,
implicit val tableDefContainer: TableDefContainer) extends StrictLogging {
implicit val tableDefContainer: TableDefContainer){


def startServerAndClient(): TestVuuClient = {
Expand Down Expand Up @@ -62,7 +61,6 @@ class TestStartUp(moduleFactoryFunc: () => ViewServerModule)(
//lifecycle registration is done in constructor of service classes, so sequence of create is important
lifecycle.start()

logger.info(s"[TEST CLIENT] Starting vuu client at ${client.uri}")
vuuClient
}
}
44 changes: 32 additions & 12 deletions vuu/src/test/scala/org/finos/vuu/wsapi/helpers/TestVuuClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import com.typesafe.scalalogging.StrictLogging
import org.finos.vuu.client.messages.{RequestId, TokenId}
import org.finos.vuu.net._
import org.scalatest.concurrent.TimeLimits.failAfter
import org.scalatest.concurrent.{Signaler, ThreadSignaler}
import org.scalatest.time.Span
import org.scalatest.time.SpanSugar._

import java.util.concurrent.ConcurrentHashMap
import scala.annotation.tailrec
import scala.language.postfixOps
import scala.reflect.ClassTag

Expand All @@ -29,28 +31,46 @@ class TestVuuClient(vsClient: ViewServerClient) extends StrictLogging {
def awaitForMsgWithBody[T <: AnyRef](implicit t: ClassTag[T]): Option[T] =
awaitForMsg.map(msg => msg.body.asInstanceOf[T])


implicit val signaler: Signaler = ThreadSignaler

def awaitForMsg[T <: AnyRef](implicit t: ClassTag[T]): Option[ViewServerMessage] = {
failAfter(timeout){
val msg = vsClient.awaitMsg
if (msg != null) { //null indicate error or timeout
if (isExpectedBodyType(t, msg))
Some(msg)
else
awaitForMsg
failAfter(timeout) {
getNextMessageUntilBodyIsExpectedType()
}
}

@tailrec
private def getNextMessageUntilBodyIsExpectedType[T <: AnyRef]()(implicit t: ClassTag[T]): Option[ViewServerMessage] = {
val msg = vsClient.awaitMsg
if (msg != null) { //null indicate error or timeout
if (isExpectedBodyType(t, msg)) {
Some(msg)
} else {
logger.info(s"Received ${msg.body.getClass} but was expecting ${t.runtimeClass}. Dismissing message and waiting for next one.")
getNextMessageUntilBodyIsExpectedType()
}
else
None
}
else {
logger.info(s"Did not receive any message in response. Try waiting again.")
getNextMessageUntilBodyIsExpectedType()
}
}

val responsesMap: ConcurrentHashMap[String, ViewServerMessage] = new ConcurrentHashMap

def awaitForResponse(requestId: String): Option[ViewServerMessage] = {
failAfter(timeout) {
getNextMessageUntilResponseForRequestId(requestId)
}
}

@tailrec
private def getNextMessageUntilResponseForRequestId(requestId: String): Option[ViewServerMessage] = {
lookupFromReceivedResponses(requestId)
.map(msg => {
logger.info(s"Found response for $requestId in cache")
return Some(msg)
Some(msg)
})

val msg = vsClient.awaitMsg
Expand All @@ -61,11 +81,11 @@ class TestVuuClient(vsClient: ViewServerClient) extends StrictLogging {
} else {
responsesMap.put(msg.requestId, msg)
logger.info(s"Added response for $requestId in cache")
awaitForResponse(requestId)
getNextMessageUntilResponseForRequestId(requestId)
}
else {
logger.error(s"Failed or timed out while waiting for response for $requestId")
None
getNextMessageUntilResponseForRequestId(requestId)
}
}

Expand Down

0 comments on commit 7cebe08

Please sign in to comment.