From 60e2aa629149aecf802aa6128423acbf95c5017a Mon Sep 17 00:00:00 2001 From: Edsko de Vries Date: Thu, 13 Jul 2023 15:14:52 +0200 Subject: [PATCH] Continue test-suite The dialogue-based test suite is _so_ useful for discovering inconsistencies between the server and the client API, since a _single_ description of the communication is interpreted in "dual" ways by both the server and the client. We still haven't actually _executed_ any of these tests (we're getting close to that point now), but simply writing them has already proven very useful indeed. --- demo-server/Demo/Server/Service/RouteGuide.hs | 2 +- grapesy.cabal | 20 +- src/Network/GRPC/Client.hs | 1 + src/Network/GRPC/Client/Binary.hs | 14 +- src/Network/GRPC/Server.hs | 5 +- src/Network/GRPC/Server/Binary.hs | 49 ++- src/Network/GRPC/Server/Call.hs | 81 ++++- src/Network/GRPC/Server/StreamType.hs | 4 +- src/Network/GRPC/Spec.hs | 30 +- test-common/Test/Util/SOP.hs | 51 ---- test-grapesy/Test/Driver/ClientServer.hs | 12 +- test-grapesy/Test/Driver/Dialogue.hs | 285 ++++++++++++------ .../Test/Sanity/StreamingType/NonStreaming.hs | 4 +- 13 files changed, 355 insertions(+), 203 deletions(-) delete mode 100644 test-common/Test/Util/SOP.hs diff --git a/demo-server/Demo/Server/Service/RouteGuide.hs b/demo-server/Demo/Server/Service/RouteGuide.hs index 5241685f..54d5b3cd 100644 --- a/demo-server/Demo/Server/Service/RouteGuide.hs +++ b/demo-server/Demo/Server/Service/RouteGuide.hs @@ -86,7 +86,7 @@ trailersOnlyShortcut db call = do r <- recvFinalInput call let features = filter (inRectangle r . view #location) db if null features then - sendTrailersOnly call $ TrailersOnly def + sendTrailersOnly call [] else do mapM_ (sendOutput call . StreamElem) features sendTrailers call def diff --git a/grapesy.cabal b/grapesy.cabal index 4ed37000..e32df644 100644 --- a/grapesy.cabal +++ b/grapesy.cabal @@ -208,21 +208,21 @@ test-suite test-grapesy Test.Driver.Dialogue Test.Sanity.StreamingType.NonStreaming Test.Util.ClientServer - Test.Util.SOP build-depends: -- Internal dependencies , grapesy build-depends: -- External dependencies - , async >= 2.2 && < 2.3 - , containers >= 0.6 && < 0.7 - , contra-tracer >= 0.2 && < 0.3 - , data-default >= 0.7 && < 0.8 - , sop-core >= 0.5 && < 0.6 - , tasty >= 1.4 && < 1.5 - , tasty-hunit >= 0.10 && < 0.11 - , text >= 1.2 && < 2.1 - , tls >= 1.5 && < 1.8 + , async >= 2.2 && < 2.3 + , containers >= 0.6 && < 0.7 + , contra-tracer >= 0.2 && < 0.3 + , data-default >= 0.7 && < 0.8 + , stm >= 2.5 && < 2.6 + , tasty >= 1.4 && < 1.5 + , tasty-hunit >= 0.10 && < 0.11 + , text >= 1.2 && < 2.1 + , tls >= 1.5 && < 1.8 + , unbounded-delays >= 0.1 && < 0.2 executable demo-client import: diff --git a/src/Network/GRPC/Client.hs b/src/Network/GRPC/Client.hs index 556a9695..750f60c7 100644 --- a/src/Network/GRPC/Client.hs +++ b/src/Network/GRPC/Client.hs @@ -31,6 +31,7 @@ module Network.GRPC.Client ( , Timeout(..) , TimeoutValue(TimeoutValue, getTimeoutValue) , TimeoutUnit(..) + , timeoutToMicro -- * Ongoing calls -- diff --git a/src/Network/GRPC/Client/Binary.hs b/src/Network/GRPC/Client/Binary.hs index 3446381c..a1522113 100644 --- a/src/Network/GRPC/Client/Binary.hs +++ b/src/Network/GRPC/Client/Binary.hs @@ -23,14 +23,14 @@ import Network.GRPC.Common.StreamElem {------------------------------------------------------------------------------- Convenience wrappers using @binary@ for serialization/deserialization - We do /not/ wrap the client handlers here, because they are not a good match. - The standard client streaming handlers expect a /single/ IO action that - produces all inputs and/or a single IO action that handles all outputs, but - the raw binary protocol allows message types to be different at each point in - the communication. + Unlike for the server, we do /not/ wrap the client handlers here, because they + are not a good match. The standard client streaming handlers expect a /single/ + IO action that produces all inputs and/or a single IO action that handles all + outputs, but the raw binary protocol allows message types to be different at + each point in the communication. - These functions all have the type of the value sent or received as the - /first/ argument, to facilitate the use of type arguments. + These functions all have the type of the value sent or received as the /first/ + argument, to facilitate the use of type arguments. -------------------------------------------------------------------------------} sendInput :: forall inp serv meth. diff --git a/src/Network/GRPC/Server.hs b/src/Network/GRPC/Server.hs index 7cb235b6..ba8fb1a4 100644 --- a/src/Network/GRPC/Server.hs +++ b/src/Network/GRPC/Server.hs @@ -18,14 +18,11 @@ module Network.GRPC.Server ( -- ** Protocol specific wrappers , recvFinalInput - , recvNextInput , sendFinalOutput , sendNextOutput , sendTrailers -- ** Low-level API - , ProperTrailers(..) - , TrailersOnly(..) , recvInputSTM , sendOutputSTM , initiateResponse @@ -105,7 +102,7 @@ handleRequest handlers conn = do forwardException :: Call rpc -> SomeException -> IO () forwardException call err = handle ignoreExceptions $ - sendTrailers call trailers + sendProperTrailers call trailers where trailers :: ProperTrailers trailers diff --git a/src/Network/GRPC/Server/Binary.hs b/src/Network/GRPC/Server/Binary.hs index 814956dd..f66263a5 100644 --- a/src/Network/GRPC/Server/Binary.hs +++ b/src/Network/GRPC/Server/Binary.hs @@ -4,8 +4,13 @@ -- -- import Network.GRPC.Server.Binary qualified as Binary module Network.GRPC.Server.Binary ( + -- | Convenience wrappers using @binary@ for serialization/deserialization + sendOutput + , sendFinalOutput + , recvInput + , recvFinalInput -- * Streaming types - mkNonStreaming + , mkNonStreaming , mkClientStreaming , mkServerStreaming , mkBiDiStreaming @@ -14,9 +19,46 @@ module Network.GRPC.Server.Binary ( import Control.Monad.Catch import Data.Binary -import Network.GRPC.Common.StreamType qualified as StreamType import Network.GRPC.Common.Binary +import Network.GRPC.Common.CustomMetadata (CustomMetadata) import Network.GRPC.Common.StreamElem +import Network.GRPC.Common.StreamType qualified as StreamType +import Network.GRPC.Server (Call) +import Network.GRPC.Server qualified as Server + +{------------------------------------------------------------------------------- + Convenience wrapers using @binary@ for serialization/deserialization +-------------------------------------------------------------------------------} + +sendOutput :: + Binary a + => Call (BinaryRpc serv meth) + -> StreamElem [CustomMetadata] a + -> IO () +sendOutput call out = + Server.sendOutput call (encode <$> out) + +sendFinalOutput :: + Binary a + => Call (BinaryRpc serv meth) + -> (a, [CustomMetadata]) + -> IO () +sendFinalOutput call (out, trailers) = + Server.sendFinalOutput call (encode out, trailers) + +recvInput :: + Binary a + => Call (BinaryRpc serv meth) + -> IO (StreamElem () a) +recvInput call = do + Server.recvInput call >>= traverse decodeOrThrow + +recvFinalInput :: + Binary a + => Call (BinaryRpc serv meth) + -> IO a +recvFinalInput call = + Server.recvFinalInput call >>= decodeOrThrow {------------------------------------------------------------------------------- Handlers for specific streaming types @@ -61,4 +103,5 @@ mkBiDiStreaming :: forall m serv meth. ) -> StreamType.BiDiStreamingHandler m (BinaryRpc serv meth) mkBiDiStreaming f = StreamType.mkBiDiStreaming $ \recv send -> - f (recv >>= traverse decodeOrThrow) (send . encode) \ No newline at end of file + f (recv >>= traverse decodeOrThrow) (send . encode) + diff --git a/src/Network/GRPC/Server/Call.hs b/src/Network/GRPC/Server/Call.hs index 4bac343c..e14569e9 100644 --- a/src/Network/GRPC/Server/Call.hs +++ b/src/Network/GRPC/Server/Call.hs @@ -17,7 +17,6 @@ module Network.GRPC.Server.Call ( -- ** Protocol specific wrappers , recvFinalInput - , recvNextInput , sendFinalOutput , sendNextOutput , sendTrailers @@ -27,6 +26,9 @@ module Network.GRPC.Server.Call ( , sendOutputSTM , initiateResponse , sendTrailersOnly + + -- ** Internal API + , sendProperTrailers ) where import Control.Concurrent.STM @@ -227,7 +229,12 @@ recvInput :: forall rpc. Call rpc -> IO (StreamElem () (Input rpc)) recvInput = atomically . recvInputSTM -- | Send RPC output to the client -sendOutput :: Call rpc -> StreamElem ProperTrailers (Output rpc) -> IO () +-- +-- This will send a @grpc-status@ of @0@ to the client; for anything else (i.e., +-- to indicate something went wrong), the server handler should throw a +-- 'GrpcException' (the @grapesy@ client API treats this the same way: a +-- @grpc-status@ other than @0@ will be raised as a 'GrpcException'). +sendOutput :: Call rpc -> StreamElem [CustomMetadata] (Output rpc) -> IO () sendOutput call msg = do _updated <- initiateResponse call atomically $ sendOutputSTM call msg @@ -287,12 +294,19 @@ recvInputSTM Call{callChannel} = -- You /MUST/ call 'initiateResponse' before calling 'sendOutputSTM'; throws -- 'ResponseNotInitiated' otherwise. This is a low-level API; most users can use -- 'sendOutput' instead. -sendOutputSTM :: Call rpc -> StreamElem ProperTrailers (Output rpc) -> STM () +sendOutputSTM :: Call rpc -> StreamElem [CustomMetadata] (Output rpc) -> STM () sendOutputSTM Call{callChannel, callResponseKickoff} msg = do mKickoff <- tryReadTMVar callResponseKickoff case mKickoff of - Just _ -> Session.send callChannel msg + Just _ -> Session.send callChannel (first mkTrailers msg) Nothing -> throwSTM $ ResponseNotInitiated + where + mkTrailers :: [CustomMetadata] -> ProperTrailers + mkTrailers metadata = ProperTrailers { + trailerGrpcStatus = GrpcOk + , trailerGrpcMessage = Nothing + , trailerMetadata = metadata + } -- | Initiate the response -- @@ -303,14 +317,40 @@ initiateResponse :: Call rpc -> IO Bool initiateResponse Call{callResponseKickoff} = atomically $ tryPutTMVar callResponseKickoff KickoffRegular --- | TODO: Docs and test +-- | Use the gRPC @Trailers-Only@ case for non-error responses +-- +-- Under normal circumstances a gRPC server will respond to the client with +-- an initial set of headers, than zero or more messages, and finally a set of +-- trailers. When there /are/ no messages, this /can/ be collapsed into a single +-- set of trailers (or headers, depending on your point of view); the gRPC +-- specification refers to this as the @Trailers-Only@ case. It mandates: +-- +-- > Most responses are expected to have both headers and trailers but +-- > Trailers-Only is permitted for calls that produce an immediate error. +-- +-- In @grapesy@, if a server handler throws a 'GrpcException', we will make use +-- of this @Trailers-Only@ case if applicable, as per the specification. +-- +-- /However/, some servers make use of @Trailers-Only@ also in non-error cases. +-- For example, the @listFeatures@ handler in the official Python route guide +-- example server will use @Trailers-Only@ if there are no features to report. +-- Since this is not conform the gRPC specification, we do not do this in +-- @grapesy@ by default, but we make the option available through +-- 'sendTrailersOnly'. -- -- Throws 'ResponseAlreadyInitiated' if the response has already been initiated. -sendTrailersOnly :: Call rpc -> TrailersOnly -> IO () -sendTrailersOnly Call{callResponseKickoff} trailers = do +sendTrailersOnly :: Call rpc -> [CustomMetadata] -> IO () +sendTrailersOnly Call{callResponseKickoff} metadata = do updated <- atomically $ tryPutTMVar callResponseKickoff $ KickoffTrailersOnly trailers unless updated $ throwIO ResponseAlreadyInitiated + where + trailers :: TrailersOnly + trailers = TrailersOnly $ ProperTrailers { + trailerGrpcStatus = GrpcOk + , trailerGrpcMessage = Nothing + , trailerMetadata = metadata + } data ResponseKickoffException = ResponseAlreadyInitiated @@ -341,13 +381,10 @@ recvFinalInput call@Call{} = do FinalElem inp' _ -> throwIO $ TooManyInputs @rpc inp' StreamElem inp' -> throwIO $ TooManyInputs @rpc inp' -recvNextInput :: Call rpc -> IO (StreamElem () (Input rpc)) -recvNextInput call = recvInput call - -- | Send final output -- -- See also 'sendTrailers'. -sendFinalOutput :: Call rpc -> (Output rpc, ProperTrailers) -> IO () +sendFinalOutput :: Call rpc -> (Output rpc, [CustomMetadata]) -> IO () sendFinalOutput call = sendOutput call . uncurry FinalElem -- | Send the next output @@ -362,5 +399,25 @@ sendNextOutput call = sendOutput call . StreamElem -- this (or 'sendFinalOutput') even when there is no special information to be -- included in the trailers (in this case, you can use the 'Default' instance -- for 'ProperTrailers'). -sendTrailers :: Call rpc -> ProperTrailers -> IO () +sendTrailers :: Call rpc -> [CustomMetadata] -> IO () sendTrailers call = sendOutput call . NoMoreElems + +{------------------------------------------------------------------------------- + Internal API +-------------------------------------------------------------------------------} + +-- | Send 'ProperTrailers' +-- +-- This function is not part of the public API: we use it the top-level +-- exception handler in "Network.GRPC.Server" to forward exceptions in server +-- handlers to the client. +-- +-- If no messages have been sent yet, we make use of the @Trailers-Only@ case. +sendProperTrailers :: Call rpc -> ProperTrailers -> IO () +sendProperTrailers Call{callResponseKickoff, callChannel} trailers = do + updated <- atomically $ tryPutTMVar callResponseKickoff $ + KickoffTrailersOnly (TrailersOnly trailers) + unless updated $ + -- If we didn't update, then the response has already been initiated and + -- we cannot make use of the Trailers-Only case. + atomically $ Session.send callChannel (NoMoreElems trailers) diff --git a/src/Network/GRPC/Server/StreamType.hs b/src/Network/GRPC/Server/StreamType.hs index bfa74cb0..1f9e97fe 100644 --- a/src/Network/GRPC/Server/StreamType.hs +++ b/src/Network/GRPC/Server/StreamType.hs @@ -51,7 +51,7 @@ instance StreamingRpcHandler NonStreamingHandler where instance StreamingRpcHandler ClientStreamingHandler where streamingRpcHandler proxy (UnsafeClientStreamingHandler h) = mkRpcHandler proxy $ \call -> do - out <- h (liftIO $ recvNextInput call) + out <- h (liftIO $ recvInput call) liftIO $ sendFinalOutput call (out, def) instance StreamingRpcHandler ServerStreamingHandler where @@ -64,7 +64,7 @@ instance StreamingRpcHandler ServerStreamingHandler where instance StreamingRpcHandler BiDiStreamingHandler where streamingRpcHandler proxy (UnsafeBiDiStreamingHandler h) = mkRpcHandler proxy $ \call -> do - h (liftIO $ recvNextInput call) + h (liftIO $ recvInput call) (liftIO . sendNextOutput call) liftIO $ sendTrailers call def diff --git a/src/Network/GRPC/Spec.hs b/src/Network/GRPC/Spec.hs index 444315f9..43b19684 100644 --- a/src/Network/GRPC/Spec.hs +++ b/src/Network/GRPC/Spec.hs @@ -8,6 +8,7 @@ module Network.GRPC.Spec ( , Timeout(..) , TimeoutValue(TimeoutValue, getTimeoutValue) , TimeoutUnit(..) + , timeoutToMicro -- * Inputs (message sent to the peer) , RequestHeaders(..) , IsFinal(..) @@ -102,6 +103,27 @@ data TimeoutUnit = | Nanosecond deriving stock (Show, Eq) +-- | Translate 'Timeout' to microseconds +-- +-- For 'Nanosecond' timeout we round up. +timeoutToMicro :: Timeout -> Integer +timeoutToMicro = \case + Timeout Hour (TimeoutValue n) -> mult n $ 1 * 1_000 * 1_000 * 60 * 24 + Timeout Minute (TimeoutValue n) -> mult n $ 1 * 1_000 * 1_000 * 60 + Timeout Second (TimeoutValue n) -> mult n $ 1 * 1_000 * 1_000 + Timeout Millisecond (TimeoutValue n) -> mult n $ 1 * 1_000 + Timeout Microsecond (TimeoutValue n) -> mult n $ 1 + Timeout Nanosecond (TimeoutValue n) -> nano n + where + mult :: Word -> Integer -> Integer + mult n m = fromIntegral n * m + + nano :: Word -> Integer + nano n = fromIntegral $ + mu + if n' == 0 then 0 else 1 + where + (mu, n') = divMod n 1_000 + {------------------------------------------------------------------------------- Inputs (message sent to the peer) -------------------------------------------------------------------------------} @@ -164,14 +186,6 @@ data ProperTrailers = ProperTrailers { deriving stock (GHC.Generic) deriving anyclass (SOP.Generic, SOP.HasDatatypeInfo) --- | The 'Default' corresponds to a successful response -instance Default ProperTrailers where - def = ProperTrailers { - trailerGrpcStatus = GrpcOk - , trailerGrpcMessage = Nothing - , trailerMetadata = [] - } - -- | Trailers sent in the gRPC Trailers-Only case -- -- In the current version of the spec, the information in 'TrailersOnly' is diff --git a/test-common/Test/Util/SOP.hs b/test-common/Test/Util/SOP.hs deleted file mode 100644 index 7cd4c94a..00000000 --- a/test-common/Test/Util/SOP.hs +++ /dev/null @@ -1,51 +0,0 @@ -{-# LANGUAGE PolyKinds #-} - -module Test.Util.SOP ( - -- * Indices - Ix(..) - , indexNP - -- * Updates - , Update(..) - , updateAt - , updateAtM - ) where - -import Data.Kind -import Data.SOP - -{------------------------------------------------------------------------------- - Indices --------------------------------------------------------------------------------} - -data Ix :: [k] -> k -> Type where - IZero :: Ix (a ': as) a - ISucc :: Ix as a -> Ix (a ': as) a - -deriving stock instance Show (Ix as a) - -indexNP :: Ix as a -> NP f as -> f a -indexNP IZero (x :* _ ) = x -indexNP (ISucc i) (_ :* xs) = indexNP i xs - -{------------------------------------------------------------------------------- - Update --------------------------------------------------------------------------------} - -data Update :: (k -> Type) -> k -> k -> [k] -> [k] -> Type where - UZero :: Update f a b (a ': cs) (b ': cs) - USucc :: Update f a b bs cs -> Update f a b (a ': bs) (a ': cs) - -deriving stock instance Show (Update f a b as bs) - -updateAt :: (f a -> f b) -> Update f a b as bs -> NP f as -> NP f bs -updateAt f UZero (x :* xs) = f x :* xs -updateAt f (USucc i) (x :* xs) = x :* updateAt f i xs - -updateAtM :: forall m f a b as bs. - Functor m - => (f a -> m (f b)) -> Update f a b as bs -> NP f as -> m (NP f bs) -updateAtM f = go - where - go :: forall as' bs'. Update f a b as' bs' -> NP f as' -> m (NP f bs') - go UZero (x :* xs) = ( :* xs) <$> f x - go (USucc i) (x :* xs) = (x :*) <$> go i xs diff --git a/test-grapesy/Test/Driver/ClientServer.hs b/test-grapesy/Test/Driver/ClientServer.hs index 8842834f..dbfef472 100644 --- a/test-grapesy/Test/Driver/ClientServer.hs +++ b/test-grapesy/Test/Driver/ClientServer.hs @@ -25,20 +25,20 @@ import Test.Util.ClientServer data ClientServerTest = ClientServerTest { config :: ClientServerConfig , client :: Client.Connection -> IO () - , server :: IO [Server.RpcHandler IO] + , server :: [Server.RpcHandler IO] } instance Default ClientServerTest where def = ClientServerTest { config = def , client = \_ -> return () - , server = return [] + , server = [] } -testClientServer :: ClientServerTest -> IO String -testClientServer ClientServerTest{config, client, server} = do - handlers <- server - mRes <- try $ runTestClientServer config client handlers +testClientServer :: IO ClientServerTest -> IO String +testClientServer mkTest = do + ClientServerTest{config, client, server} <- mkTest + mRes <- try $ runTestClientServer config client server case mRes of Left err | Just (testFailure :: HUnitFailure) <- fromException err diff --git a/test-grapesy/Test/Driver/Dialogue.hs b/test-grapesy/Test/Driver/Dialogue.hs index 40577d27..fa5d1a6d 100644 --- a/test-grapesy/Test/Driver/Dialogue.hs +++ b/test-grapesy/Test/Driver/Dialogue.hs @@ -1,129 +1,220 @@ module Test.Driver.Dialogue ( - RequestState(..) - , ResponseState(..) - , ClientState - , ServerState - , Interaction(..) - , Interactions(..) - , Dialogue(..) - , execDialogue + execGlobalSteps ) where import Control.Concurrent +import Control.Concurrent.STM +import Control.Concurrent.Thread.Delay +import Control.Exception +import Control.Monad import Data.Default -import Data.Kind -import Data.SOP +import Data.Proxy +import Network.GRPC.Client (Timeout, timeoutToMicro) import Network.GRPC.Client qualified as Client +import Network.GRPC.Client.Binary qualified as Client.Binary +import Network.GRPC.Common.Binary import Network.GRPC.Common.CustomMetadata (CustomMetadata) +import Network.GRPC.Common.StreamElem (StreamElem) import Network.GRPC.Server qualified as Server -import Network.GRPC.Common.Binary +import Network.GRPC.Server.Binary qualified as Server.Binary import Test.Driver.ClientServer -import Test.Util.SOP {------------------------------------------------------------------------------- - Dialogue + RPC -------------------------------------------------------------------------------} -type DialogueRpc = BinaryRpc "binary" "dialogue" +type TestRpc = BinaryRpc "binary" "test" -data RequestState = - RequestStarted - | RequestEnded +{------------------------------------------------------------------------------- + Barrier -type ClientState = [RequestState] + These are concurrent tests, but it can be difficult to pinpoint bugs if the + ordering of actions is not determistic. We therefore add specific "barrier" + points, which allow to shrink tests towards more deterministic. +-------------------------------------------------------------------------------} -data RequestData :: RequestState -> Type where - RequestOpen :: Client.Call DialogueRpc -> RequestData RequestStarted - RequestClosed :: RequestData RequestEnded +newtype Barrier = Barrier Int + deriving stock (Show, Eq, Ord) -data ResponseState = - ResponseStarted - | ResponseEnded +newBarrier :: IO (TVar Barrier) +newBarrier = newTVarIO (Barrier 0) -type ServerState = [ResponseState] +waitForBarrier :: TVar Barrier -> Barrier -> IO () +waitForBarrier var target = atomically $ do + actual <- readTVar var + when (actual < target) retry -data Interaction :: (ClientState, ServerState) -> (ClientState, ServerState) -> Type where - StartRequest :: - Maybe Client.Timeout - -> [CustomMetadata] - -> Interaction '(reqs, resps) '(RequestStarted ': reqs, resps) - CloseRequest :: - Update RequestData RequestStarted RequestEnded reqs reqs' - -> Interaction '(reqs, resps) '(reqs', resps) +{------------------------------------------------------------------------------- + Test failures +-------------------------------------------------------------------------------} -deriving stock instance Show (Interaction st st') +data TestFailure = + -- | Thrown by the server when an unexpected new RPC is initiated + UnexpectedRequest -data Interactions :: (ClientState, ServerState) -> (ClientState, ServerState) -> Type where - Done :: Interactions '(client0, server0) '(client0, server0) - Step :: Interaction '(client0, server0) '(client1, server1) - -> Interactions '(client1, server1) '(client2, server2) - -> Interactions '(client0, server0) '(client2, server2) + -- | Server received an unexpected value + | ServerUnexpected ReceivedUnexpected -deriving stock instance Show (Interactions st st') + -- | Client received an unexpected value + | ClientUnexpected ReceivedUnexpected + deriving stock (Show) + deriving anyclass (Exception) -data Dialogue :: Type where - Dialogue :: Interactions '( '[] , '[] ) '(reqs , resps) -> Dialogue +data ReceivedUnexpected = forall a. Show a => ReceivedUnexpected { + expected :: a + , received :: a + } -deriving instance Show Dialogue +deriving stock instance Show ReceivedUnexpected -execDialogue :: Dialogue -> ClientServerTest -execDialogue (Dialogue interactions) = def { - client = \conn -> - clientSide conn interactions - , server = do - interactionsVar <- newMVar interactions - return [serverSide interactionsVar] - } +{------------------------------------------------------------------------------- + Single channel + + TODO: Exceptions (both GrpcException and general other exceptions). + + TODO: We should test that the Trailers-Only case gets triggered if no messages + were exchanged before the exception (not sure this is observable without + Wireshark..?). +-------------------------------------------------------------------------------} + +data LocalStep = + StartResponse [CustomMetadata] + | LocalBarrier Barrier + | ServerSleep Timeout + | ClientToServer (StreamElem () Int) + | ServerToClient (StreamElem [CustomMetadata] Int) + deriving stock (Show) + +type LocalSteps = [LocalStep] + +{------------------------------------------------------------------------------- + Many channels (birds-eye view) +-------------------------------------------------------------------------------} + +data GlobalStep = + Spawn (Maybe Timeout) [CustomMetadata] LocalSteps + | GlobalBarrier Barrier + deriving stock (Show) + +type GlobalSteps = [GlobalStep] + +{------------------------------------------------------------------------------- + Client-side interpretation +-------------------------------------------------------------------------------} + +clientGlobal :: TVar Barrier -> Client.Connection -> GlobalSteps -> IO () +clientGlobal barrierVar conn = + mapM_ go where - clientSide :: - Client.Connection - -> Interactions '( '[], '[] ) '(reqs, resps) - -> IO () - clientSide conn = go Nil + go :: GlobalStep -> IO () + go (GlobalBarrier b) = do + waitForBarrier barrierVar b + go (Spawn timeout metadata localSteps) = + void $ forkIO $ do + call <- Client.startRPC conn params (Proxy @TestRpc) + clientLocal barrierVar call localSteps where - go :: NP RequestData reqs -> Interactions '(reqs, a) '(reqs', b) -> IO () - go _ Done = - return () - go reqs (Step (StartRequest timeout metadata) is) = do - req <- aux - go (req :* reqs) is - where - aux :: IO (RequestData 'RequestStarted) - aux = - RequestOpen <$> - Client.startRPC conn params (Proxy @DialogueRpc) - - params :: Client.CallParams - params = Client.CallParams { - callTimeout = timeout - , callRequestMetadata = metadata - } - go reqs (Step (CloseRequest ix) is) = do - reqs' <- updateAtM aux ix reqs - go reqs' is - where - aux :: RequestData 'RequestStarted -> IO (RequestData 'RequestEnded) - aux (RequestOpen call) = do - Client.abortRPC call - return $ RequestClosed - - serverSide :: - MVar (Interactions '( '[], '[] ) '(reqs, resps)) - -> Server.RpcHandler IO - serverSide _interactionsVar = undefined {- (Server.mkRpcHandler (Proxy @DialogueRpc) go) { - Server.handlerMetadata = \_requestMetadata -> - -- TODO: Hmm this is weird. we don't get to control /when/ the - -- response is sent back; what if we want the response metadata - -- to depend on some input /messages/ and not just the input - -- metadata? This is a problem with the API - undefined - } - where - go :: Server.Call DialogueRpc -> IO () - go _call = undefined + params :: Client.CallParams + params = Client.CallParams { + callTimeout = timeout + , callRequestMetadata = metadata + } + +clientLocal :: TVar Barrier -> Client.Call TestRpc -> LocalSteps -> IO () +clientLocal barrierVar call localSteps = do + mapM_ go localSteps + Client.abortRPC call + where + go :: LocalStep -> IO () + go (LocalBarrier barrier) = + waitForBarrier barrierVar barrier + go (StartResponse expectedMetadata) = do + receivedMetadata <- atomically $ Client.recvResponseMetadata call + unless (receivedMetadata == expectedMetadata) $ + throwIO $ ClientUnexpected $ ReceivedUnexpected { + expected = expectedMetadata + , received = receivedMetadata + } + go (ServerSleep _) = + -- Server delays are not (directly) observable in the client. The only + -- things we should be able to observe is if the client-specified + -- timeout is exceeded. + return () + go (ClientToServer x) = + Client.Binary.sendInput call x + go (ServerToClient expectedElem) = do + receivedElem <- Client.Binary.recvOutput call + unless (receivedElem == expectedElem) $ + throwIO $ ClientUnexpected $ ReceivedUnexpected { + expected = expectedElem + , received = receivedElem + } - -} +{------------------------------------------------------------------------------- + Server-side interpretation + + The server-side is slightly different, since the infrastructure spawns + threads on our behalf (one for each incoming RPC). +-------------------------------------------------------------------------------} + +serverGlobal :: TVar Barrier -> MVar GlobalSteps -> Server.Call TestRpc -> IO () +serverGlobal barrierVar globalStepsVar call = do + -- The client will only start new calls from a /single/ thread, so the order + -- that these come in is determinstic. Here we peel off the next + -- 'LocalSteps', pending barriers perhaps, whilst holding the lock, to + -- ensure the same determinism server-side. + mapM_ go =<< modifyMVar globalStepsVar getNextSteps + where + getNextSteps :: GlobalSteps -> IO (GlobalSteps, LocalSteps) + getNextSteps [] = + throwIO $ UnexpectedRequest + getNextSteps (GlobalBarrier barrier : global') = do + waitForBarrier barrierVar barrier + getNextSteps global' + getNextSteps (Spawn _timeout expectedMetadata localSteps : global') = do + -- We don't care about the timeout the client sets; if the server takes + -- too long (see 'ServerSleep'), the framework should kill the server + -- and the /client/ will check that it gets the expected exception. + receivedMetadata <- Server.getRequestMetadata call + unless (receivedMetadata == expectedMetadata) $ + throwIO $ ServerUnexpected $ ReceivedUnexpected { + expected = expectedMetadata + , received = receivedMetadata + } + return (global', localSteps) + + go :: LocalStep -> IO () + go (LocalBarrier barrier) = + waitForBarrier barrierVar barrier + go (StartResponse metadata) = do + Server.setResponseMetadata call metadata + void $ Server.initiateResponse call + go (ServerSleep timeout) = do + delay $ timeoutToMicro timeout + go (ClientToServer expectedElem) = do + receivedElem <- Server.Binary.recvInput call + unless (expectedElem == receivedElem) $ + throwIO $ ServerUnexpected $ ReceivedUnexpected { + expected = expectedElem + , received = receivedElem + } + go (ServerToClient x) = do + Server.Binary.sendOutput call x + +{------------------------------------------------------------------------------- + Top-level +-------------------------------------------------------------------------------} +execGlobalSteps :: GlobalSteps -> IO ClientServerTest +execGlobalSteps steps = do + barrierVar <- newBarrier + globalStepsVar <- newMVar steps + return $ def { + client = \conn -> clientGlobal barrierVar conn steps + , server = [ Server.mkRpcHandler (Proxy @TestRpc) $ + serverGlobal barrierVar globalStepsVar + ] + } diff --git a/test-grapesy/Test/Sanity/StreamingType/NonStreaming.hs b/test-grapesy/Test/Sanity/StreamingType/NonStreaming.hs index 51689079..74f488ca 100644 --- a/test-grapesy/Test/Sanity/StreamingType/NonStreaming.hs +++ b/test-grapesy/Test/Sanity/StreamingType/NonStreaming.hs @@ -73,14 +73,14 @@ tests = testGroup "Test.Sanity.StreamingType.NonStreaming" [ type BinaryIncrement = BinaryRpc "binary" "increment" test_increment :: ClientServerConfig -> IO String -test_increment config = testClientServer $ def { +test_increment config = testClientServer $ return def { config , client = \conn -> do Client.withRPC conn def (Proxy @BinaryIncrement) $ \call -> do Binary.sendFinalInput @Word8 call 1 resp <- fst <$> Binary.recvFinalOutput @Word8 call assertEqual "" 2 $ resp - , server = return [ + , server = [ streamingRpcHandler (Proxy @BinaryIncrement) $ Binary.mkNonStreaming $ \(n :: Word8) -> return (succ n)