Skip to content

Commit

Permalink
Merge pull request #1480 from UBS-IB/approvedpush_20240909_01
Browse files Browse the repository at this point in the history
#1434 initialising lifecycle at start up
  • Loading branch information
heswell authored Sep 10, 2024
2 parents b92b210 + 7cebe08 commit 09ef16f
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@ public abstract class WebSocketApiJavaTestBase {
protected String tokenId;
protected String sessionId;

protected Clock clock = new DefaultClock();
protected LifecycleContainer lifecycle = new LifecycleContainer(clock);
protected TableDefContainer tableDefContainer = new TableDefContainer();
protected Clock clock;
protected LifecycleContainer lifecycle;
protected TableDefContainer tableDefContainer;

@BeforeAll
public void setUp() {
clock = new DefaultClock();
lifecycle = new LifecycleContainer(clock);
tableDefContainer = new TableDefContainer();

vuuClient = testStartUp();
tokenId = vuuClient.createAuthToken();
var sessionOption = OptionConverters.toJava(vuuClient.login(tokenId, "testUser"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class CoreServerApiHandler(val viewPortContainer: ViewPortContainer,
override def process(msg: ViewPortRpcCall)(ctx: RequestContext): Option[ViewServerMessage] = {
Try(viewPortContainer.callRpcService(msg.vpId, msg.rpcName, msg.params, msg.namedParams, ctx.session)(ctx)) match {
case Success(action) =>
logger.info("Processed VP RPC call" + msg)
logger.info("Processed VP RPC call " + msg)
vsMsg(ViewPortRpcResponse(msg.vpId, msg.rpcName, action))(ctx)
case Failure(e) =>
logger.info("Failed to process VP RPC call", e)
Expand Down Expand Up @@ -418,15 +418,15 @@ class CoreServerApiHandler(val viewPortContainer: ViewPortContainer,
private def handleViewPortRpcRequest(msg: RpcRequest, viewPortId: String, ctx: RequestContext) = {
val response = Try(viewPortContainer.handleRpcRequest(viewPortId, msg.rpcName, msg.params)(ctx)) match {
case Success(functionResult) =>
logger.info(s"Processed VP RPC call ${ctx.requestId}" + msg)
logger.info(s"Processed Rpc Request ${ctx.requestId} " + msg)
functionResult match {
case RpcFunctionSuccess(data) =>
RpcResponseNew(rpcName = msg.rpcName, result = RpcSuccessResult(data), NoneAction())
case RpcFunctionFailure(errorCode, error, exception) =>
createErrorRpcResponse(msg, error)
}
case Failure(e) =>
logger.info(s"Failed to process VP RPC call ${ctx.requestId}", e)
logger.info(s"Failed to process Rpc Request ${ctx.requestId}", e)
createErrorRpcResponse(msg, e.toString)
}
vsMsg(response)(ctx)
Expand Down
6 changes: 4 additions & 2 deletions vuu/src/main/scala/org/finos/vuu/net/ViewServerClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ class WebSocketViewServerClient(ws: WebSocketClient, serializer: Serializer[Stri

}

logger.info("Websocket should be up.")
logger.info(s"[WSClient] Websocket on ${ws.uri} should be up.")
}

override def doStop(): Unit = {}
override def doStop(): Unit = {
logger.info(s"[WSClient] Websocket on ${ws.uri} stopping.")
}

override def doInitialize(): Unit = {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,19 @@ import org.scalatest.{BeforeAndAfterAll, GivenWhenThen}

abstract class WebSocketApiTestBase extends AnyFeatureSpec with BeforeAndAfterAll with GivenWhenThen with Matchers {

implicit val timeProvider: Clock = new DefaultClock
implicit val lifecycle: LifecycleContainer = new LifecycleContainer
implicit val tableDefContainer: TableDefContainer = new TableDefContainer
implicit var timeProvider: Clock = _
implicit var lifecycle: LifecycleContainer = _
implicit var tableDefContainer: TableDefContainer = _
var viewServerClient: ViewServerClient = _
var vuuClient: TestVuuClient = _
var tokenId: String = _
var sessionId: String = _

override def beforeAll(): Unit = {
timeProvider = new DefaultClock
lifecycle = new LifecycleContainer
tableDefContainer = new TableDefContainer

vuuClient = testStartUp()

tokenId = vuuClient.createAuthToken()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ package org.finos.vuu.wsapi.helpers
import org.finos.toolbox.jmx.{MetricsProvider, MetricsProviderImpl}
import org.finos.toolbox.lifecycle.LifecycleContainer
import org.finos.toolbox.time.Clock
import org.finos.vuu.core.{VuuClientConnectionOptions, VuuSecurityOptions, VuuServer, VuuServerConfig, VuuThreadingOptions, VuuWebSocketOptions}
import org.finos.vuu.core.module.{TableDefContainer, ViewServerModule}
import org.finos.vuu.net.{AlwaysHappyLoginValidator, ViewServerClient, WebSocketViewServerClient}
import org.finos.vuu.core._
import org.finos.vuu.net.auth.AlwaysHappyAuthenticator
import org.finos.vuu.net.http.VuuHttp2ServerOptions
import org.finos.vuu.net.json.JsonVsSerializer
import org.finos.vuu.net.ws.WebSocketClient
import org.finos.vuu.net.{AlwaysHappyLoginValidator, ViewServerClient, WebSocketViewServerClient}

class TestStartUp(moduleFactoryFunc: () => ViewServerModule)(
implicit val timeProvider: Clock,
implicit val lifecycle: LifecycleContainer,
implicit val tableDefContainer: TableDefContainer) {
implicit val tableDefContainer: TableDefContainer){


def startServerAndClient(): TestVuuClient = {
Expand Down Expand Up @@ -63,5 +63,4 @@ class TestStartUp(moduleFactoryFunc: () => ViewServerModule)(

vuuClient
}

}
46 changes: 33 additions & 13 deletions vuu/src/test/scala/org/finos/vuu/wsapi/helpers/TestVuuClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ import com.typesafe.scalalogging.StrictLogging
import org.finos.vuu.client.messages.{RequestId, TokenId}
import org.finos.vuu.net._
import org.scalatest.concurrent.TimeLimits.failAfter
import org.scalatest.concurrent.{Signaler, ThreadSignaler}
import org.scalatest.time.Span
import org.scalatest.time.SpanSugar._

import java.util.concurrent.ConcurrentHashMap
import scala.annotation.tailrec
import scala.language.postfixOps
import scala.reflect.ClassTag

class TestVuuClient(vsClient: ViewServerClient) extends StrictLogging{
class TestVuuClient(vsClient: ViewServerClient) extends StrictLogging {

type SessionId = String
type Token = String
Expand All @@ -29,28 +31,46 @@ class TestVuuClient(vsClient: ViewServerClient) extends StrictLogging{
def awaitForMsgWithBody[T <: AnyRef](implicit t: ClassTag[T]): Option[T] =
awaitForMsg.map(msg => msg.body.asInstanceOf[T])


implicit val signaler: Signaler = ThreadSignaler

def awaitForMsg[T <: AnyRef](implicit t: ClassTag[T]): Option[ViewServerMessage] = {
failAfter(timeout){
val msg = vsClient.awaitMsg
if (msg != null) { //null indicate error or timeout
if (isExpectedBodyType(t, msg))
Some(msg)
else
awaitForMsg
failAfter(timeout) {
getNextMessageUntilBodyIsExpectedType()
}
}

@tailrec
private def getNextMessageUntilBodyIsExpectedType[T <: AnyRef]()(implicit t: ClassTag[T]): Option[ViewServerMessage] = {
val msg = vsClient.awaitMsg
if (msg != null) { //null indicate error or timeout
if (isExpectedBodyType(t, msg)) {
Some(msg)
} else {
logger.info(s"Received ${msg.body.getClass} but was expecting ${t.runtimeClass}. Dismissing message and waiting for next one.")
getNextMessageUntilBodyIsExpectedType()
}
else
None
}
else {
logger.info(s"Did not receive any message in response. Try waiting again.")
getNextMessageUntilBodyIsExpectedType()
}
}

val responsesMap: ConcurrentHashMap[String, ViewServerMessage] = new ConcurrentHashMap

def awaitForResponse(requestId: String): Option[ViewServerMessage] = {
failAfter(timeout) {
getNextMessageUntilResponseForRequestId(requestId)
}
}

@tailrec
private def getNextMessageUntilResponseForRequestId(requestId: String): Option[ViewServerMessage] = {
lookupFromReceivedResponses(requestId)
.map(msg => {
logger.info(s"Found response for $requestId in cache")
return Some(msg)
Some(msg)
})

val msg = vsClient.awaitMsg
Expand All @@ -61,11 +81,11 @@ class TestVuuClient(vsClient: ViewServerClient) extends StrictLogging{
} else {
responsesMap.put(msg.requestId, msg)
logger.info(s"Added response for $requestId in cache")
awaitForResponse(requestId)
getNextMessageUntilResponseForRequestId(requestId)
}
else {
logger.error(s"Failed or timed out while waiting for response for $requestId")
None
getNextMessageUntilResponseForRequestId(requestId)
}
}

Expand Down

0 comments on commit 09ef16f

Please sign in to comment.