From 4427fb106f147a99898dda2b3b2f4fb8a319a75a Mon Sep 17 00:00:00 2001 From: Edsko de Vries Date: Fri, 7 Jul 2023 16:56:48 +0200 Subject: [PATCH] Better server-side handling of metadata and errors --- demo-server/Demo/Server/Cmdline.hs | 11 +- .../{API/Protobuf => Service}/Greeter.hs | 16 +- .../{API/Protobuf => Service}/RouteGuide.hs | 41 ++- demo-server/Main.hs | 23 +- docs/demo-server.md | 34 +- grapesy.cabal | 5 +- src/Network/GRPC/Client/Session.hs | 19 +- src/Network/GRPC/Server.hs | 102 +++--- src/Network/GRPC/Server/Call.hs | 303 ++++++++++++++---- src/Network/GRPC/Server/Handler.hs | 29 +- src/Network/GRPC/Server/Session.hs | 21 +- src/Network/GRPC/Server/StreamType.hs | 9 +- src/Network/GRPC/Spec.hs | 8 + src/Network/GRPC/Util/Session/API.hs | 8 +- src/Network/GRPC/Util/Session/Channel.hs | 2 +- src/Network/GRPC/Util/Session/Client.hs | 4 +- src/Network/GRPC/Util/Session/Server.hs | 13 +- test-grapesy/Test/Driver/ClientServer.hs | 119 ------- test-grapesy/Test/Driver/Dialogue.hs | 129 ++++++++ 19 files changed, 582 insertions(+), 314 deletions(-) rename demo-server/Demo/Server/{API/Protobuf => Service}/Greeter.hs (81%) rename demo-server/Demo/Server/{API/Protobuf => Service}/RouteGuide.hs (63%) create mode 100644 test-grapesy/Test/Driver/Dialogue.hs diff --git a/demo-server/Demo/Server/Cmdline.hs b/demo-server/Demo/Server/Cmdline.hs index 1452325f..d607e986 100644 --- a/demo-server/Demo/Server/Cmdline.hs +++ b/demo-server/Demo/Server/Cmdline.hs @@ -15,9 +15,10 @@ import Network.GRPC.Server.Run -------------------------------------------------------------------------------} data Cmdline = Cmdline { - cmdInsecure :: Maybe InsecureConfig - , cmdSecure :: Maybe SecureConfig - , cmdDebug :: Bool + cmdInsecure :: Maybe InsecureConfig + , cmdSecure :: Maybe SecureConfig + , cmdDebug :: Bool + , cmdTrailersOnlyShortcut :: Bool } deriving (Show) @@ -38,6 +39,10 @@ parseCmdline = Opt.long "debug" , Opt.help "Enable debug output" ]) + <*> (Opt.switch $ mconcat [ + Opt.long "trailers-only-shortcut" + , Opt.help "Use Trailers-Only even in non-error cases" + ]) parseInsecure :: Opt.Parser (Maybe InsecureConfig) parseInsecure = asum [ diff --git a/demo-server/Demo/Server/API/Protobuf/Greeter.hs b/demo-server/Demo/Server/Service/Greeter.hs similarity index 81% rename from demo-server/Demo/Server/API/Protobuf/Greeter.hs rename to demo-server/Demo/Server/Service/Greeter.hs index 9aa56682..a2cce348 100644 --- a/demo-server/Demo/Server/API/Protobuf/Greeter.hs +++ b/demo-server/Demo/Server/Service/Greeter.hs @@ -1,10 +1,11 @@ {-# LANGUAGE OverloadedLabels #-} {-# LANGUAGE OverloadedStrings #-} -module Demo.Server.API.Protobuf.Greeter (handlers) where +module Demo.Server.Service.Greeter (handlers) where import Control.Lens ((.~), (^.)) import Control.Monad +import Data.Default import Data.Function ((&)) import Data.ProtoLens import Data.ProtoLens.Labels () @@ -41,14 +42,15 @@ sayHello req = return $ defMessage & #message .~ msg sayHelloStreamReply :: RpcHandler IO sayHelloStreamReply = - (mkRpcHandler (Proxy @(Protobuf Greeter "sayHelloStreamReply")) go) { - handlerMetadata = \_reqMetadata -> return [ - AsciiHeader "initial-md" "initial-md-value" - ] - } + mkRpcHandler (Proxy @(Protobuf Greeter "sayHelloStreamReply")) go where go :: Call (Protobuf Greeter "sayHelloStreamReply") -> IO () go call = do + setResponseMetadata call [AsciiHeader "initial-md" "initial-md-value"] + + -- The client expects the metadata well before the first output + _ <- initiateResponse call + req <- recvFinalInput call let msg :: Text -> Text @@ -57,4 +59,4 @@ sayHelloStreamReply = forM_ ["0", "1", "2"] $ \i -> sendNextOutput call $ defMessage & #message .~ msg i - sendTrailers call [] + sendTrailers call def diff --git a/demo-server/Demo/Server/API/Protobuf/RouteGuide.hs b/demo-server/Demo/Server/Service/RouteGuide.hs similarity index 63% rename from demo-server/Demo/Server/API/Protobuf/RouteGuide.hs rename to demo-server/Demo/Server/Service/RouteGuide.hs index 1fe2cf1b..5241685f 100644 --- a/demo-server/Demo/Server/API/Protobuf/RouteGuide.hs +++ b/demo-server/Demo/Server/Service/RouteGuide.hs @@ -1,35 +1,45 @@ {-# LANGUAGE CPP #-} {-# LANGUAGE OverloadedLabels #-} -module Demo.Server.API.Protobuf.RouteGuide (handlers) where +module Demo.Server.Service.RouteGuide (handlers) where import Control.Lens (view, (^.)) import Control.Monad.State (StateT) import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.State qualified as State +import Data.Default import Data.Map qualified as Map import Data.Maybe (fromMaybe) import Data.ProtoLens import Data.ProtoLens.Labels () +import Data.Proxy import Data.Time +import Network.GRPC.Common.StreamElem (StreamElem(..)) import Network.GRPC.Common.StreamElem qualified as StreamElem import Network.GRPC.Common.StreamType +import Network.GRPC.Server import Network.GRPC.Server.Protobuf import Network.GRPC.Server.StreamType import Proto.RouteGuide import Demo.Server.Aux.RouteGuide +import Demo.Server.Cmdline {------------------------------------------------------------------------------- Top-level -------------------------------------------------------------------------------} -handlers :: [Feature] -> Methods IO (ProtobufMethodsOf RouteGuide) -handlers db = +handlers :: Cmdline -> [Feature] -> Methods IO (ProtobufMethodsOf RouteGuide) +handlers cmdline db = Method (mkNonStreaming $ getFeature db) - $ Method (mkServerStreaming $ listFeatures db) + $ ( if cmdTrailersOnlyShortcut cmdline + then RawMethod $ mkRpcHandler + (Proxy @(Protobuf RouteGuide "listFeatures")) + (trailersOnlyShortcut db) + else Method (mkServerStreaming $ listFeatures db) + ) $ Method (mkClientStreaming $ recordRoute db) $ Method (mkBiDiStreaming $ routeChat db) $ NoMoreMethods @@ -44,7 +54,7 @@ getFeature db p = return $ fromMaybe defMessage $ featureAt db p listFeatures :: [Feature] -> Rectangle -> (Feature -> IO ()) -> IO () listFeatures db r send = mapM_ send $ filter (inRectangle r . view #location) db -recordRoute :: [Feature] -> IO (StreamElem.StreamElem () Point) -> IO RouteSummary +recordRoute :: [Feature] -> IO (StreamElem () Point) -> IO RouteSummary recordRoute db recv = do start <- getCurrentTime ps <- StreamElem.collect recv @@ -53,7 +63,7 @@ recordRoute db recv = do routeChat :: [Feature] - -> IO (StreamElem.StreamElem () RouteNote) + -> IO (StreamElem () RouteNote) -> (RouteNote -> IO ()) -> IO () routeChat _db recv send = do @@ -62,6 +72,25 @@ routeChat _db recv send = do mapM_ send $ reverse $ Map.findWithDefault [] (n ^. #location) acc return $ Map.alter (Just . (n:) . fromMaybe []) (n ^. #location) acc +{------------------------------------------------------------------------------- + Trailers-Only shortcut + + See discussion in @demo-server.md@. +-------------------------------------------------------------------------------} + +trailersOnlyShortcut :: + [Feature] + -> Call (Protobuf RouteGuide "listFeatures") + -> IO () +trailersOnlyShortcut db call = do + r <- recvFinalInput call + let features = filter (inRectangle r . view #location) db + if null features then + sendTrailersOnly call $ TrailersOnly def + else do + mapM_ (sendOutput call . StreamElem) features + sendTrailers call def + {------------------------------------------------------------------------------- Auxiliary -------------------------------------------------------------------------------} diff --git a/demo-server/Main.hs b/demo-server/Main.hs index 922448da..c16dedbb 100644 --- a/demo-server/Main.hs +++ b/demo-server/Main.hs @@ -13,9 +13,9 @@ import Network.GRPC.Server.StreamType import Demo.Common.Logging -import Demo.Server.API.Protobuf.Greeter qualified as Greeter -import Demo.Server.API.Protobuf.RouteGuide qualified as RouteGuide import Demo.Server.Cmdline +import Demo.Server.Service.Greeter qualified as Greeter +import Demo.Server.Service.RouteGuide qualified as RouteGuide import Proto.Helloworld import Proto.RouteGuide @@ -26,10 +26,13 @@ import Paths_grapesy All services -------------------------------------------------------------------------------} -services :: [Feature] -> Services IO (ProtobufServices '[Greeter, RouteGuide]) -services db = +services :: + Cmdline + -> [Feature] + -> Services IO (ProtobufServices '[Greeter, RouteGuide]) +services cmdline db = Service Greeter.handlers - $ Service (RouteGuide.handlers db) + $ Service (RouteGuide.handlers cmdline db) $ NoMoreServices {------------------------------------------------------------------------------- @@ -38,22 +41,22 @@ services db = main :: IO () main = do - cmd <- getCmdline + cmdline <- getCmdline db <- getRouteGuideDb let serverConfig :: ServerConfig serverConfig = ServerConfig { serverTracer = - if cmdDebug cmd + if cmdDebug cmdline then contramap show threadSafeTracer else nullTracer , serverInsecure = - cmdInsecure cmd + cmdInsecure cmdline , serverSecure = - cmdSecure cmd + cmdSecure cmdline } - withServer (serverParams cmd) (fromServices (services db)) $ + withServer (serverParams cmdline) (fromServices $ services cmdline db) $ runServer serverConfig getRouteGuideDb :: IO [Feature] diff --git a/docs/demo-server.md b/docs/demo-server.md index 20d10d36..a77cf682 100644 --- a/docs/demo-server.md +++ b/docs/demo-server.md @@ -72,4 +72,36 @@ grpc-repo/examples/python/route_guide$ python3 route_guide_client.py Currently server-side compression can be verified simply by running the Python hello-world client (and then looking at the communication in Wireshark), because -the server applies compression independent of whether that saves space or not. \ No newline at end of file +the server applies compression independent of whether that saves space or not. + +### Trailers-Only shortcut + +A normal gRPC response looks like + +``` + + + +``` + +If there are no messages, then this whole thing collapses to just a set of +trailers (or headers; the distinction is no longer relevant in this case); the +gRPC specification refers to this as `Trailers-Only`. The spec says that this +should _only_ be used in error cases, but in practice some servers also use this +for normal cases. For example, the Python implementation of the `ListFeatures` +method will use `Trailers-Only` in the case that the list of features is empty. + +The Protobuf-specific wrappers in `grapesy` will not use `Trailers-Only` except +in the case of errors, comform the spec; however, it is possible to use the +lower-level server API to get the behaviour exibited by the Python example +implementation. The command line flag `--trailers-only-shortcut` enables this +for the demo server. The difference in server operation can only be observed +with Wireshark; a request for a list of features in the rectangle `(0, 0, 0, 0)` +(which is empty) will + +* result in three HTTP frames in the normal case (`HEADERS`, empty `DATA` to + separate headers from trailers, and another `HEADERS` frame with the trailers) +* result in a single HTTP `HEADERS` frame when using `--trailers-only-shortcut` + +Note that this behaviour is _NOT_ conform the gRPC spec, so not all clients +may support it. diff --git a/grapesy.cabal b/grapesy.cabal index e69b6c1b..4ed37000 100644 --- a/grapesy.cabal +++ b/grapesy.cabal @@ -205,6 +205,7 @@ test-suite test-grapesy other-modules: Paths_grapesy Test.Driver.ClientServer + Test.Driver.Dialogue Test.Sanity.StreamingType.NonStreaming Test.Util.ClientServer Test.Util.SOP @@ -282,10 +283,10 @@ executable demo-server -threaded other-modules: Demo.Common.Logging - Demo.Server.API.Protobuf.Greeter - Demo.Server.API.Protobuf.RouteGuide Demo.Server.Aux.RouteGuide Demo.Server.Cmdline + Demo.Server.Service.Greeter + Demo.Server.Service.RouteGuide Proto.Helloworld Proto.Helloworld_Fields Proto.RouteGuide diff --git a/src/Network/GRPC/Client/Session.hs b/src/Network/GRPC/Client/Session.hs index 0572ee32..d2f254dd 100644 --- a/src/Network/GRPC/Client/Session.hs +++ b/src/Network/GRPC/Client/Session.hs @@ -70,7 +70,7 @@ instance IsRPC rpc => IsSession (ClientSession rpc) where type Outbound (ClientSession rpc) = ClientOutbound rpc buildProperTrailers _client = \() -> - return [] -- Request trailers are not supported by gRPC + [] -- Request trailers are not supported by gRPC parseProperTrailers _client = processResponseTrailers $ Resp.parseProperTrailers (Proxy @rpc) @@ -105,15 +105,14 @@ instance IsRPC rpc => InitiateSession (ClientSession rpc) where (fmap GRPC.getTrailersOnly . Resp.parseTrailersOnly (Proxy @rpc)) (responseHeaders info) - buildRequestInfo _ start = do - return RequestInfo { - requestMethod = rawMethod resourceHeaders - , requestPath = rawPath resourceHeaders - , requestHeaders = Req.buildHeaders (Proxy @rpc) $ - case start of - FlowStartRegular headers -> outHeaders headers - FlowStartTrailersOnly headers -> headers - } + buildRequestInfo _ start = RequestInfo { + requestMethod = rawMethod resourceHeaders + , requestPath = rawPath resourceHeaders + , requestHeaders = Req.buildHeaders (Proxy @rpc) $ + case start of + FlowStartRegular headers -> outHeaders headers + FlowStartTrailersOnly headers -> headers + } where resourceHeaders :: RawResourceHeaders resourceHeaders = buildResourceHeaders $ ResourceHeaders { diff --git a/src/Network/GRPC/Server.hs b/src/Network/GRPC/Server.hs index 346ec8db..7cb235b6 100644 --- a/src/Network/GRPC/Server.hs +++ b/src/Network/GRPC/Server.hs @@ -9,11 +9,12 @@ module Network.GRPC.Server ( , Call -- opaque , RpcHandler -- opaque , Handler.mkRpcHandler - , Handler.handlerMetadata - -- * Ongoing calls + -- * Open (ongoing) call , recvInput , sendOutput + , getRequestMetadata + , setResponseMetadata -- ** Protocol specific wrappers , recvFinalInput @@ -22,6 +23,14 @@ module Network.GRPC.Server ( , sendNextOutput , sendTrailers + -- ** Low-level API + , ProperTrailers(..) + , TrailersOnly(..) + , recvInputSTM + , sendOutputSTM + , initiateResponse + , sendTrailersOnly + -- * Common serialization formats , Protobuf ) where @@ -29,7 +38,6 @@ module Network.GRPC.Server ( import Control.Exception import Control.Tracer import Data.Text qualified as Text -import Network.HTTP.Types qualified as HTTP import Network.HTTP2.Server qualified as HTTP2 import Network.GRPC.Common.Exceptions @@ -43,8 +51,6 @@ import Network.GRPC.Server.Handler qualified as Handler import Network.GRPC.Spec import Network.GRPC.Spec.PseudoHeaders import Network.GRPC.Spec.RPC.Protobuf (Protobuf) -import Network.GRPC.Util.Session.Server qualified as Server -import Network.GRPC.Spec.Response qualified as Resp {------------------------------------------------------------------------------- Server proper @@ -66,49 +72,15 @@ handleRequest handlers conn = do -- TODO: Proper "Apache style" logging (in addition to the debug logging) traceWith tracer $ Context.NewRequest path - RpcHandler{ - handlerMetadata - , handlerRun - } <- getHandler handlers path - mCall :: Either SomeException (Call rpc) <- - try $ acceptCall conn handlerMetadata - - case mCall of - Right call -> do - -- TODO: Timeouts - -- - -- Wait-for-ready semantics makes this more complicated, maybe. - -- See example in the grpc-repo (python/wait_for_ready). - - mErr :: Either SomeException () <- try $ handlerRun call - case mErr of - Right () -> return () - Left err -> do - -- TODO: We need to think hard about error handling. - -- - -- o It should be possible to throw a specific gRPC non-OK status - -- (i.e., we should catch GrpcException and treat it special) - -- o We need to think about how streaming works with trailers, if - -- streaming goes wrong halfway - -- o We need to consider security concerns here, too - -- (exceptions can leak sensitive data) - -- - -- gRPC error responses must make use of the gRPC Trailers-Only case - -- according to the spec. - putStrLn $ "Uncaught exception: " ++ show err - putStrLn "(TODO: We need a proper handler here.)" - Left err -> do - traceWith tracer $ Context.AcceptCallFailed err - Server.respond (Connection.connectionToClient conn) $ - HTTP2.responseNoBody - HTTP.ok200 -- gRPC uses HTTP 200 even when there are gRPC errors - (Resp.buildTrailersOnly $ TrailersOnly $ ProperTrailers { - trailerGrpcStatus = GrpcError GrpcUnknown - -- TODO: Potential security concern here - -- (showing the exception)? - , trailerGrpcMessage = Just $ Text.pack $ show err - , trailerMetadata = [] - }) + RpcHandler handler <- getHandler handlers path + call <- acceptCall conn + + -- TODO: Timeouts + -- + -- Wait-for-ready semantics makes this more complicated, maybe. + -- See example in the grpc-repo (python/wait_for_ready). + + handle (forwardException call) $ handler call where path :: Path path = Connection.path conn @@ -119,6 +91,40 @@ handleRequest handlers conn = do $ Context.params $ Connection.context conn +-- | Forward exception to the client +-- +-- If the handler throws an exception, attempt to forward it to the client so +-- that it is notified something went wrong. This is a best-effort only: +-- +-- * The nature of the exception might mean that we we cannot send anything to +-- the client at all. +-- * It is possible the exception was thrown /after/ the handler already send +-- the trailers to the client. +-- +-- We therefore catch and suppress all exceptions here. +forwardException :: Call rpc -> SomeException -> IO () +forwardException call err = + handle ignoreExceptions $ + sendTrailers call trailers + where + trailers :: ProperTrailers + trailers + | Just (err' :: GrpcException) <- fromException err + = grpcExceptionToTrailers err' + + -- TODO: There might be a security concern here (server-side exceptions + -- could potentially leak some sensitive data). + | otherwise + = ProperTrailers { + trailerGrpcStatus = GrpcError GrpcUnknown + , trailerGrpcMessage = Just $ Text.pack $ show err + , trailerMetadata = [] + } + + -- See discussion above. + ignoreExceptions :: SomeException -> IO () + ignoreExceptions _ = return () + {------------------------------------------------------------------------------- Get handler for the request -------------------------------------------------------------------------------} diff --git a/src/Network/GRPC/Server/Call.hs b/src/Network/GRPC/Server/Call.hs index 6cb22703..4bac343c 100644 --- a/src/Network/GRPC/Server/Call.hs +++ b/src/Network/GRPC/Server/Call.hs @@ -12,6 +12,8 @@ module Network.GRPC.Server.Call ( -- * Open (ongoing) call , recvInput , sendOutput + , getRequestMetadata + , setResponseMetadata -- ** Protocol specific wrappers , recvFinalInput @@ -19,15 +21,26 @@ module Network.GRPC.Server.Call ( , sendFinalOutput , sendNextOutput , sendTrailers + + -- ** Low-level API + , recvInputSTM + , sendOutputSTM + , initiateResponse + , sendTrailersOnly ) where import Control.Concurrent.STM import Control.Exception +import Control.Monad import Control.Tracer import Data.Bifunctor +import Data.Text qualified as Text +import Network.HTTP.Types qualified as HTTP +import Network.HTTP2.Server qualified as HTTP2 import Network.GRPC.Common.Compression (Compression) import Network.GRPC.Common.Compression qualified as Compr +import Network.GRPC.Common.Compression qualified as Compression import Network.GRPC.Common.Exceptions import Network.GRPC.Common.StreamElem (StreamElem(..)) import Network.GRPC.Server.Connection (Connection) @@ -36,37 +49,91 @@ import Network.GRPC.Server.Context qualified as Context import Network.GRPC.Server.Session import Network.GRPC.Spec import Network.GRPC.Spec.CustomMetadata +import Network.GRPC.Spec.Response qualified as Resp import Network.GRPC.Spec.RPC import Network.GRPC.Util.Session qualified as Session -import Network.GRPC.Common.Compression qualified as Compression +import Network.GRPC.Util.Session.Server qualified as Server {------------------------------------------------------------------------------- Definition -------------------------------------------------------------------------------} data Call rpc = IsRPC rpc => Call { + -- | Server state (across all calls) callSession :: ServerSession rpc + + -- | Bidirectional channel to the client , callChannel :: Session.Channel (ServerSession rpc) + + -- | Request metadata + -- + -- This is filled once we get the request headers + , callRequestMetadata :: TMVar [CustomMetadata] + + -- | Response metadata + -- + -- Metadata to be included when we send the initial response headers. + -- Can be updated until the first message (see 'callFirstMessage'). + -- + -- Defaults to the empty list. + , callResponseMetadata :: TVar [CustomMetadata] + + -- | What kicked off the response? + -- + -- This is empty until the response /has/ in fact been kicked off. + , callResponseKickoff :: TMVar Kickoff } +-- | What kicked off the response? +-- +-- When the server handler starts, we do not immediately initate the response +-- to the client, because the server might not have decided the initial response +-- metadata yet (indeed, it might need wait for some incoming messages from the +-- client before it can compute that metadata). We therefore wait until we +-- send the initial response headers, until the handler.. +-- +-- 1. explicitly calls 'initiateResponse' +-- 2. sends the first message using 'sendOutput' +-- 3. initiates and immediately terminates the response using 'sendTrailersOnly' +-- +-- We only need distinguish between (1 or 2) versus (3), corresponding precisely +-- to the two constructors of 'FlowStart'. +data Kickoff = + KickoffRegular + | KickoffTrailersOnly TrailersOnly + deriving (Show) + {------------------------------------------------------------------------------- Open a call -------------------------------------------------------------------------------} -- | Accept incoming call -acceptCall :: forall rpc. - IsRPC rpc - => Connection - -> ([CustomMetadata] -> IO [CustomMetadata]) - -> IO (Call rpc) -acceptCall conn mkResponseMetadata = do +-- +-- If an exception is thrown during call setup, we will send an error response +-- to the client (and then rethrow the exception). +acceptCall :: forall rpc. IsRPC rpc => Connection -> IO (Call rpc) +acceptCall conn = do + callRequestMetadata <- newEmptyTMVarIO + callResponseMetadata <- newTVarIO [] + callResponseKickoff <- newEmptyTMVarIO callChannel <- Session.initiateResponse callSession - tracer + (contramap (Context.PeerDebugMsg @rpc) tracer) (Connection.connectionToClient conn) - mkOutboundHeaders - return Call{callSession, callChannel} + ( handle sendErrorResponse + . mkOutboundHeaders + callRequestMetadata + callResponseMetadata + callResponseKickoff + ) + return Call{ + callSession + , callChannel + , callRequestMetadata + , callResponseMetadata + , callResponseKickoff + } where callSession :: ServerSession rpc callSession = ServerSession { @@ -76,69 +143,180 @@ acceptCall conn mkResponseMetadata = do compr :: Compr.Negotation compr = Context.serverCompression $ Context.params $ Connection.context conn - tracer :: Tracer IO (Session.DebugMsg (ServerSession rpc)) - tracer = - contramap (Context.PeerDebugMsg @rpc) $ - Context.serverDebugTracer $ Context.params (Connection.context conn) + tracer :: Tracer IO Context.ServerDebugMsg + tracer = Context.serverDebugTracer $ Context.params (Connection.context conn) mkOutboundHeaders :: - Session.FlowStart (ServerInbound rpc) + TMVar [CustomMetadata] + -> TVar [CustomMetadata] + -> TMVar Kickoff + -> Session.FlowStart (ServerInbound rpc) -> IO (Session.FlowStart (ServerOutbound rpc)) - mkOutboundHeaders start = do - responseMetadata <- mkResponseMetadata $ requestMetadata inboundHeaders - - cOut :: Compression <- - case requestAcceptCompression inboundHeaders of - Nothing -> return Compression.identity - Just cids -> - -- If the requests explicitly lists compression algorithms, and - -- that list does /not/ include @identity@, then we should not - -- default to 'Compression.identity', even if all other - -- algorithms are unsupported. This gives the client the option - -- to /insist/ on compression. - case Compression.choose compr cids of - Right c -> return c - Left err -> throwIO err - - return $ Session.FlowStartRegular $ OutboundHeaders { - outHeaders = ResponseHeaders { - responseCompression = Just $ Compr.compressionId cOut - , responseAcceptCompression = Just $ Compr.offer compr - , responseMetadata = responseMetadata + mkOutboundHeaders requestMetadataVar + responseMetadataVar + responseKickoffVar + requestStart + = do + -- Make request metadata available (see 'getRequestMetadata') + atomically $ putTMVar requestMetadataVar $ + requestMetadata inboundHeaders + + -- Wait for kickoff (see 'Kickoff' for discussion) + kickoff <- atomically $ readTMVar responseKickoffVar + + -- Get response metadata (see 'setResponseMetadata') + responseMetadata <- atomically $ readTVar responseMetadataVar + + -- Session start + case kickoff of + KickoffRegular -> do + cOut :: Compression <- + case requestAcceptCompression inboundHeaders of + Nothing -> return Compression.identity + Just cids -> + -- If the requests explicitly lists compression algorithms, + -- and that list does /not/ include @identity@, then we + -- should not default to 'Compression.identity', even if all + -- other algorithms are unsupported. This gives the client + -- the option to /insist/ on compression. + case Compression.choose compr cids of + Right c -> return c + Left err -> throwIO err + return $ Session.FlowStartRegular $ OutboundHeaders { + outHeaders = ResponseHeaders { + responseCompression = Just $ Compr.compressionId cOut + , responseAcceptCompression = Just $ Compr.offer compr + , responseMetadata = responseMetadata + } + , outCompression = cOut } - , outCompression = cOut - } + KickoffTrailersOnly trailers -> + return $ Session.FlowStartTrailersOnly trailers where inboundHeaders :: RequestHeaders inboundHeaders = - case start of + case requestStart of Session.FlowStartRegular headers -> inbHeaders headers Session.FlowStartTrailersOnly headers -> headers + -- | Send response when 'mkOutboundHeaders' fails + sendErrorResponse :: SomeException -> IO a + sendErrorResponse err = do + traceWith tracer $ Context.AcceptCallFailed err + Server.respond (Connection.connectionToClient conn) $ + HTTP2.responseNoBody + HTTP.ok200 -- gRPC uses HTTP 200 even when there are gRPC errors + (Resp.buildTrailersOnly $ TrailersOnly $ ProperTrailers { + trailerGrpcStatus = GrpcError GrpcUnknown + -- TODO: Potential security concern here + -- (showing the exception)? + , trailerGrpcMessage = Just $ Text.pack $ show err + , trailerMetadata = [] + }) + throwIO err + {------------------------------------------------------------------------------- Open (ongoing) call -------------------------------------------------------------------------------} --- | Receive RPC input +-- | Receive RPC input from the client -- -- We do not return trailers, since gRPC does not support sending trailers from -- the client to the server (only from the server to the client). -recvInput :: forall rpc. Call rpc -> STM (StreamElem () (Input rpc)) -recvInput Call{callChannel} = +recvInput :: forall rpc. Call rpc -> IO (StreamElem () (Input rpc)) +recvInput = atomically . recvInputSTM + +-- | Send RPC output to the client +sendOutput :: Call rpc -> StreamElem ProperTrailers (Output rpc) -> IO () +sendOutput call msg = do + _updated <- initiateResponse call + atomically $ sendOutputSTM call msg + +-- | Get request metadata +-- +-- This will block until we have received the initial request /headers/ (but may +-- well return before we receive the first /message/ from the client). +getRequestMetadata :: Call rpc -> IO [CustomMetadata] +getRequestMetadata Call{callRequestMetadata} = + atomically $ readTMVar callRequestMetadata + +-- | Set the initial response metadata +-- +-- This can be set at any time before the response is initiated (either +-- implicitly by calling 'sendOutput', or explicitly by calling +-- 'initiateResponse' or 'sendTrailersOnly'). If the response has already +-- been initiated (and therefore the initial response metadata already sent), +-- will throw 'ResponseAlreadyInitiated'. +-- +-- Note that this is about the /initial/ metadata; additional metadata can be +-- sent after the final message; see 'sendOutput'. +setResponseMetadata :: Call rpc -> [CustomMetadata] -> IO () +setResponseMetadata Call{ callResponseMetadata + , callResponseKickoff + } + md = atomically $ do + mKickoff <- tryReadTMVar callResponseKickoff + case mKickoff of + Nothing -> writeTVar callResponseMetadata md + Just _ -> throwSTM ResponseAlreadyInitiated + +{------------------------------------------------------------------------------- + Low-level API + + 'initiateResponse' and 'sendOutputSTM' /MUST NOT/ live in the same + transaction: in order to send a message, we need to know if the call is in + "regular" flow state or "trailers only" flow state, which we only know after + 'initiateResponse'; so if these live in the same transaction, we will + deadlock. +-------------------------------------------------------------------------------} + +-- | STM version of 'recvInput' +-- +-- Most server handlers will deal with single clients, but in principle +-- 'recvInputSTM' could be used to wait for the first message from any number +-- of clients. +recvInputSTM :: forall rpc. Call rpc -> STM (StreamElem () (Input rpc)) +recvInputSTM Call{callChannel} = first ignoreTrailersOnly <$> Session.recv callChannel where ignoreTrailersOnly :: Either RequestHeaders () -> () ignoreTrailersOnly _ = () -sendOutput :: Call rpc -> StreamElem [CustomMetadata] (Output rpc) -> STM () -sendOutput Call{callChannel} = Session.send callChannel . first mkTrailers - where - mkTrailers :: [CustomMetadata] -> ProperTrailers - mkTrailers metadata = ProperTrailers{ - trailerGrpcStatus = GrpcOk - , trailerGrpcMessage = Nothing - , trailerMetadata = metadata - } +-- | STM version of 'sendOutput' +-- +-- You /MUST/ call 'initiateResponse' before calling 'sendOutputSTM'; throws +-- 'ResponseNotInitiated' otherwise. This is a low-level API; most users can use +-- 'sendOutput' instead. +sendOutputSTM :: Call rpc -> StreamElem ProperTrailers (Output rpc) -> STM () +sendOutputSTM Call{callChannel, callResponseKickoff} msg = do + mKickoff <- tryReadTMVar callResponseKickoff + case mKickoff of + Just _ -> Session.send callChannel msg + Nothing -> throwSTM $ ResponseNotInitiated + +-- | Initiate the response +-- +-- See 'sendOutputSTM' for discusison. +-- +-- Returns 'False' if the response was already initiated. +initiateResponse :: Call rpc -> IO Bool +initiateResponse Call{callResponseKickoff} = + atomically $ tryPutTMVar callResponseKickoff KickoffRegular + +-- | TODO: Docs and test +-- +-- Throws 'ResponseAlreadyInitiated' if the response has already been initiated. +sendTrailersOnly :: Call rpc -> TrailersOnly -> IO () +sendTrailersOnly Call{callResponseKickoff} trailers = do + updated <- atomically $ tryPutTMVar callResponseKickoff $ + KickoffTrailersOnly trailers + unless updated $ throwIO ResponseAlreadyInitiated + +data ResponseKickoffException = + ResponseAlreadyInitiated + | ResponseNotInitiated + deriving stock (Show) + deriving anyclass (Exception) {------------------------------------------------------------------------------- Protocol specific wrappers @@ -152,32 +330,37 @@ sendOutput Call{callChannel} = Session.send callChannel . first mkTrailers -- we will block until we receive the end-of-stream indication. recvFinalInput :: forall rpc. Call rpc -> IO (Input rpc) recvFinalInput call@Call{} = do - inp1 <- atomically $ recvInput call + inp1 <- recvInput call case inp1 of NoMoreElems () -> throwIO $ TooFewInputs @rpc FinalElem inp () -> return inp StreamElem inp -> do - inp2 <- atomically $ recvInput call + inp2 <- recvInput call case inp2 of NoMoreElems () -> return inp FinalElem inp' _ -> throwIO $ TooManyInputs @rpc inp' StreamElem inp' -> throwIO $ TooManyInputs @rpc inp' recvNextInput :: Call rpc -> IO (StreamElem () (Input rpc)) -recvNextInput call = atomically $ recvInput call +recvNextInput call = recvInput call -sendFinalOutput :: Call rpc -> (Output rpc, [CustomMetadata]) -> IO () -sendFinalOutput call = atomically . sendOutput call . uncurry FinalElem +-- | Send final output +-- +-- See also 'sendTrailers'. +sendFinalOutput :: Call rpc -> (Output rpc, ProperTrailers) -> IO () +sendFinalOutput call = sendOutput call . uncurry FinalElem -- | Send the next output -- -- If this is the last output, you should call 'sendTrailers' after. sendNextOutput :: Call rpc -> Output rpc -> IO () -sendNextOutput call = atomically . sendOutput call . StreamElem +sendNextOutput call = sendOutput call . StreamElem -- | Send trailers -- -- This tells the client that there will be no more outputs. You should call --- this even when there /are/ no trailers (just supply the empty list). -sendTrailers :: Call rpc -> [CustomMetadata] -> IO () -sendTrailers call = atomically . sendOutput call . NoMoreElems +-- this (or 'sendFinalOutput') even when there is no special information to be +-- included in the trailers (in this case, you can use the 'Default' instance +-- for 'ProperTrailers'). +sendTrailers :: Call rpc -> ProperTrailers -> IO () +sendTrailers call = sendOutput call . NoMoreElems diff --git a/src/Network/GRPC/Server/Handler.hs b/src/Network/GRPC/Server/Handler.hs index cf6157fa..2a52c67a 100644 --- a/src/Network/GRPC/Server/Handler.hs +++ b/src/Network/GRPC/Server/Handler.hs @@ -28,7 +28,6 @@ import Data.Proxy import Data.Typeable import Network.GRPC.Server.Call -import Network.GRPC.Spec.CustomMetadata import Network.GRPC.Spec.PseudoHeaders import Network.GRPC.Spec.RPC @@ -40,23 +39,12 @@ import Network.GRPC.Spec.RPC -------------------------------------------------------------------------------} data RpcHandler m = forall rpc. IsRPC rpc => RpcHandler { - -- | Response metadata - -- - -- Since this metadata must be returned in the initial headers (before - -- communication with the client), the only context that is available is - -- the metadata that the client included in their request (and, - -- implicitly, the pseudo-headers, as they determine the handler). - -- - -- The handler can return additional metadata in the trailers at the /end/ - -- of the communication; see 'sendOutput'. - handlerMetadata :: [CustomMetadata] -> m [CustomMetadata] - -- | Handler proper - , handlerRun :: Call rpc -> m () + runRpcHandler :: Call rpc -> m () } instance Show (RpcHandler m) where - show RpcHandler{handlerRun} = aux handlerRun + show RpcHandler{runRpcHandler} = aux runRpcHandler where aux :: forall rpc. IsRPC rpc => (Call rpc -> m ()) -> String aux _ = "" @@ -65,21 +53,16 @@ instance Show (RpcHandler m) where Construction -------------------------------------------------------------------------------} --- | Default RPC handler -mkRpcHandler :: - (Monad m, IsRPC rpc) - => Proxy rpc -> (Call rpc -> m ()) -> RpcHandler m -mkRpcHandler _ handlerRun = RpcHandler{ - handlerRun - , handlerMetadata = \_ -> return [] - } +-- | Constructor for 'RpcHandler' +mkRpcHandler :: IsRPC rpc => Proxy rpc -> (Call rpc -> m ()) -> RpcHandler m +mkRpcHandler _ = RpcHandler {------------------------------------------------------------------------------- Query -------------------------------------------------------------------------------} path :: forall m. RpcHandler m -> Path -path RpcHandler{handlerRun} = aux handlerRun +path RpcHandler{runRpcHandler} = aux runRpcHandler where aux :: forall rpc. IsRPC rpc => (Call rpc -> m ()) -> Path aux _ = rpcPath (Proxy @rpc) diff --git a/src/Network/GRPC/Server/Session.hs b/src/Network/GRPC/Server/Session.hs index 6776b170..1ab548f4 100644 --- a/src/Network/GRPC/Server/Session.hs +++ b/src/Network/GRPC/Server/Session.hs @@ -63,7 +63,7 @@ instance IsRPC rpc => IsSession (ServerSession rpc) where type Outbound (ServerSession rpc) = ServerOutbound rpc parseProperTrailers _ = \_ -> return () - buildProperTrailers _ = return . Resp.buildProperTrailers + buildProperTrailers _ = Resp.buildProperTrailers parseMsg _ = LP.parseInput (Proxy @rpc) . inbCompression buildMsg _ = LP.buildOutput (Proxy @rpc) . outCompression @@ -89,16 +89,15 @@ instance IsRPC rpc => AcceptSession (ServerSession rpc) where Left err -> throwIO $ RequestInvalidHeaders err Right hdrs -> return hdrs - buildResponseInfo _ start = - return ResponseInfo { - responseStatus = HTTP.ok200 - , responseHeaders = - case start of - FlowStartRegular headers -> - Resp.buildHeaders (Proxy @rpc) (outHeaders headers) - FlowStartTrailersOnly trailers -> - Resp.buildTrailersOnly trailers - } + buildResponseInfo _ start = ResponseInfo { + responseStatus = HTTP.ok200 + , responseHeaders = + case start of + FlowStartRegular headers -> + Resp.buildHeaders (Proxy @rpc) (outHeaders headers) + FlowStartTrailersOnly trailers -> + Resp.buildTrailersOnly trailers + } {------------------------------------------------------------------------------- Exceptions diff --git a/src/Network/GRPC/Server/StreamType.hs b/src/Network/GRPC/Server/StreamType.hs index 7868146d..bfa74cb0 100644 --- a/src/Network/GRPC/Server/StreamType.hs +++ b/src/Network/GRPC/Server/StreamType.hs @@ -10,6 +10,7 @@ module Network.GRPC.Server.StreamType ( ) where import Control.Monad.IO.Class +import Data.Default import Data.Kind import Data.Proxy @@ -45,27 +46,27 @@ instance StreamingRpcHandler NonStreamingHandler where mkRpcHandler proxy $ \call -> do inp <- liftIO $ recvFinalInput call out <- h inp - liftIO $ sendFinalOutput call (out, []) + liftIO $ sendFinalOutput call (out, def) instance StreamingRpcHandler ClientStreamingHandler where streamingRpcHandler proxy (UnsafeClientStreamingHandler h) = mkRpcHandler proxy $ \call -> do out <- h (liftIO $ recvNextInput call) - liftIO $ sendFinalOutput call (out, []) + liftIO $ sendFinalOutput call (out, def) instance StreamingRpcHandler ServerStreamingHandler where streamingRpcHandler proxy (UnsafeServerStreamingHandler h) = mkRpcHandler proxy $ \call -> do inp <- liftIO $ recvFinalInput call h inp (liftIO . sendNextOutput call) - liftIO $ sendTrailers call [] + liftIO $ sendTrailers call def instance StreamingRpcHandler BiDiStreamingHandler where streamingRpcHandler proxy (UnsafeBiDiStreamingHandler h) = mkRpcHandler proxy $ \call -> do h (liftIO $ recvNextInput call) (liftIO . sendNextOutput call) - liftIO $ sendTrailers call [] + liftIO $ sendTrailers call def {------------------------------------------------------------------------------- Methods diff --git a/src/Network/GRPC/Spec.hs b/src/Network/GRPC/Spec.hs index 213d8e6f..444315f9 100644 --- a/src/Network/GRPC/Spec.hs +++ b/src/Network/GRPC/Spec.hs @@ -164,6 +164,14 @@ data ProperTrailers = ProperTrailers { deriving stock (GHC.Generic) deriving anyclass (SOP.Generic, SOP.HasDatatypeInfo) +-- | The 'Default' corresponds to a successful response +instance Default ProperTrailers where + def = ProperTrailers { + trailerGrpcStatus = GrpcOk + , trailerGrpcMessage = Nothing + , trailerMetadata = [] + } + -- | Trailers sent in the gRPC Trailers-Only case -- -- In the current version of the spec, the information in 'TrailersOnly' is diff --git a/src/Network/GRPC/Util/Session/API.hs b/src/Network/GRPC/Util/Session/API.hs index ea6a8924..90ddd144 100644 --- a/src/Network/GRPC/Util/Session/API.hs +++ b/src/Network/GRPC/Util/Session/API.hs @@ -105,7 +105,7 @@ class ( DataFlow (Inbound sess) -- | Build proper trailers buildProperTrailers :: sess - -> ProperTrailers (Outbound sess) -> IO [HTTP.Header] + -> ProperTrailers (Outbound sess) -> [HTTP.Header] -- | Parse message parseMsg :: @@ -126,7 +126,7 @@ class IsSession sess => InitiateSession sess where -- | Build 'RequestInfo' for the server buildRequestInfo :: sess - -> FlowStart (Outbound sess) -> IO RequestInfo + -> FlowStart (Outbound sess) -> RequestInfo -- | Parse 'ResponseInfo' from the server, regular case -- @@ -140,8 +140,6 @@ class IsSession sess => InitiateSession sess where sess -> ResponseInfo -> IO (TrailersOnly (Inbound sess)) - -- | Parse - -- | Accept session -- -- A server node listens and accepts incoming requests from client nodes. @@ -162,7 +160,7 @@ class IsSession sess => AcceptSession sess where buildResponseInfo :: sess -> FlowStart (Outbound sess) - -> IO ResponseInfo + -> ResponseInfo {------------------------------------------------------------------------------- Exceptions diff --git a/src/Network/GRPC/Util/Session/Channel.hs b/src/Network/GRPC/Util/Session/Channel.hs index fc6ce818..4201e77b 100644 --- a/src/Network/GRPC/Util/Session/Channel.hs +++ b/src/Network/GRPC/Util/Session/Channel.hs @@ -377,7 +377,7 @@ processOutboundTrailers sess st = go go (Just _) = return $ HTTP2.NextTrailersMaker go go Nothing = do outboundTrailers <- atomically $ readTMVar (flowTrailers st) - HTTP2.Trailers <$> buildProperTrailers sess outboundTrailers + return $ HTTP2.Trailers $ buildProperTrailers sess outboundTrailers data DebugMsg sess = -- | Thread sending messages is awaiting a message diff --git a/src/Network/GRPC/Util/Session/Client.hs b/src/Network/GRPC/Util/Session/Client.hs index 2db622b7..65727e32 100644 --- a/src/Network/GRPC/Util/Session/Client.hs +++ b/src/Network/GRPC/Util/Session/Client.hs @@ -76,8 +76,8 @@ initiateRequest :: forall sess. -> FlowStart (Outbound sess) -> IO (Channel sess) initiateRequest sess tracer ConnectionToServer{sendRequest} outboundStart = do - channel <- initChannel - requestInfo <- buildRequestInfo sess outboundStart + channel <- initChannel + let requestInfo = buildRequestInfo sess outboundStart case outboundStart of FlowStartRegular headers -> do diff --git a/src/Network/GRPC/Util/Session/Server.hs b/src/Network/GRPC/Util/Session/Server.hs index a9777795..13166f3c 100644 --- a/src/Network/GRPC/Util/Session/Server.hs +++ b/src/Network/GRPC/Util/Session/Server.hs @@ -38,6 +38,15 @@ initiateResponse :: forall sess. -> Tracer IO (DebugMsg sess) -> ConnectionToClient -> (FlowStart (Inbound sess) -> IO (FlowStart (Outbound sess))) + -- ^ Construct headers for the initial response + -- + -- This function has quite a bit of control over how the outbound part of + -- the channel gets established: + -- + -- * If it blocks, the response will not be initiated until it returns. + -- * If it throws an exception, no response will be attempted at all. This + -- allows the function to send a response of its own (typically, some kind + -- of error response). -> IO (Channel sess) initiateResponse sess tracer conn startOutbound = do channel <- initChannel @@ -56,8 +65,6 @@ initiateResponse sess tracer conn startOutbound = do FlowStartTrailersOnly <$> parseRequestTrailersOnly sess requestInfo else FlowStartRegular <$> parseRequestRegular sess requestInfo - outboundStart <- startOutbound inboundStart - responseInfo <- buildResponseInfo sess outboundStart void $ forkIO $ threadBody (channelInbound channel) newEmptyTMVarIO $ \stVar -> do @@ -72,6 +79,8 @@ initiateResponse sess tracer conn startOutbound = do void $ forkIO $ threadBody (channelOutbound channel) newEmptyTMVarIO $ \stVar -> do + outboundStart <- startOutbound inboundStart + let responseInfo = buildResponseInfo sess outboundStart case outboundStart of FlowStartRegular headers -> do regular <- initFlowStateRegular headers diff --git a/test-grapesy/Test/Driver/ClientServer.hs b/test-grapesy/Test/Driver/ClientServer.hs index 73313019..8842834f 100644 --- a/test-grapesy/Test/Driver/ClientServer.hs +++ b/test-grapesy/Test/Driver/ClientServer.hs @@ -4,34 +4,19 @@ module Test.Driver.ClientServer ( -- * Basic client-server test ClientServerTest(..) , testClientServer - -- * Dialogue-based - , RequestState(..) - , ResponseState(..) - , ClientState - , ServerState - , Interaction(..) - , Interactions(..) - , Dialogue(..) - , execDialogue -- * Re-exports , module Test.Util.ClientServer ) where -import Control.Concurrent import Control.Exception import Data.Default -import Data.Kind -import Data.SOP import Data.Typeable import Test.Tasty.HUnit import Network.GRPC.Client qualified as Client -import Network.GRPC.Common.CustomMetadata (CustomMetadata) import Network.GRPC.Server qualified as Server -import Network.GRPC.Common.Binary import Test.Util.ClientServer -import Test.Util.SOP {------------------------------------------------------------------------------- Basic client-server test @@ -73,107 +58,3 @@ testClientServer ClientServerTest{config, client, server} = do Right () -> return "" -{------------------------------------------------------------------------------- - Dialogue --------------------------------------------------------------------------------} - -type DialogueRpc = BinaryRpc "binary" "dialogue" - -data RequestState = - RequestStarted - | RequestEnded - -type ClientState = [RequestState] - -data RequestData :: RequestState -> Type where - RequestOpen :: Client.Call DialogueRpc -> RequestData RequestStarted - RequestClosed :: RequestData RequestEnded - -data ResponseState = - ResponseStarted - | ResponseEnded - -type ServerState = [ResponseState] - -data Interaction :: (ClientState, ServerState) -> (ClientState, ServerState) -> Type where - StartRequest :: - Maybe Client.Timeout - -> [CustomMetadata] - -> Interaction '(reqs, resps) '(RequestStarted ': reqs, resps) - CloseRequest :: - Update RequestData RequestStarted RequestEnded reqs reqs' - -> Interaction '(reqs, resps) '(reqs', resps) - -deriving stock instance Show (Interaction st st') - -data Interactions :: (ClientState, ServerState) -> (ClientState, ServerState) -> Type where - Done :: Interactions '(client0, server0) '(client0, server0) - Step :: Interaction '(client0, server0) '(client1, server1) - -> Interactions '(client1, server1) '(client2, server2) - -> Interactions '(client0, server0) '(client2, server2) - -deriving stock instance Show (Interactions st st') - -data Dialogue :: Type where - Dialogue :: Interactions '( '[] , '[] ) '(reqs , resps) -> Dialogue - -deriving instance Show Dialogue - -execDialogue :: Dialogue -> ClientServerTest -execDialogue (Dialogue interactions) = def { - client = \conn -> - clientSide conn interactions - , server = do - interactionsVar <- newMVar interactions - return [serverSide interactionsVar] - } - where - clientSide :: - Client.Connection - -> Interactions '( '[], '[] ) '(reqs, resps) - -> IO () - clientSide conn = go Nil - where - go :: NP RequestData reqs -> Interactions '(reqs, a) '(reqs', b) -> IO () - go _ Done = - return () - go reqs (Step (StartRequest timeout metadata) is) = do - req <- aux - go (req :* reqs) is - where - aux :: IO (RequestData 'RequestStarted) - aux = - RequestOpen <$> - Client.startRPC conn params (Proxy @DialogueRpc) - - params :: Client.CallParams - params = Client.CallParams { - callTimeout = timeout - , callRequestMetadata = metadata - } - go reqs (Step (CloseRequest ix) is) = do - reqs' <- updateAtM aux ix reqs - go reqs' is - where - aux :: RequestData 'RequestStarted -> IO (RequestData 'RequestEnded) - aux (RequestOpen call) = do - Client.abortRPC call - return $ RequestClosed - - serverSide :: - MVar (Interactions '( '[], '[] ) '(reqs, resps)) - -> Server.RpcHandler IO - serverSide _interactionsVar = (Server.mkRpcHandler (Proxy @DialogueRpc) go) { - Server.handlerMetadata = \_requestMetadata -> - -- TODO: Hmm this is weird. we don't get to control /when/ the - -- response is sent back; what if we want the response metadata - -- to depend on some input /messages/ and not just the input - -- metadata? This is a problem with the API - undefined - } - where - go :: Server.Call DialogueRpc -> IO () - go _call = undefined - - - diff --git a/test-grapesy/Test/Driver/Dialogue.hs b/test-grapesy/Test/Driver/Dialogue.hs new file mode 100644 index 00000000..40577d27 --- /dev/null +++ b/test-grapesy/Test/Driver/Dialogue.hs @@ -0,0 +1,129 @@ +module Test.Driver.Dialogue ( + RequestState(..) + , ResponseState(..) + , ClientState + , ServerState + , Interaction(..) + , Interactions(..) + , Dialogue(..) + , execDialogue + ) where + +import Control.Concurrent +import Data.Default +import Data.Kind +import Data.SOP + +import Network.GRPC.Client qualified as Client +import Network.GRPC.Common.CustomMetadata (CustomMetadata) +import Network.GRPC.Server qualified as Server +import Network.GRPC.Common.Binary + +import Test.Driver.ClientServer +import Test.Util.SOP + +{------------------------------------------------------------------------------- + Dialogue +-------------------------------------------------------------------------------} + +type DialogueRpc = BinaryRpc "binary" "dialogue" + +data RequestState = + RequestStarted + | RequestEnded + +type ClientState = [RequestState] + +data RequestData :: RequestState -> Type where + RequestOpen :: Client.Call DialogueRpc -> RequestData RequestStarted + RequestClosed :: RequestData RequestEnded + +data ResponseState = + ResponseStarted + | ResponseEnded + +type ServerState = [ResponseState] + +data Interaction :: (ClientState, ServerState) -> (ClientState, ServerState) -> Type where + StartRequest :: + Maybe Client.Timeout + -> [CustomMetadata] + -> Interaction '(reqs, resps) '(RequestStarted ': reqs, resps) + CloseRequest :: + Update RequestData RequestStarted RequestEnded reqs reqs' + -> Interaction '(reqs, resps) '(reqs', resps) + +deriving stock instance Show (Interaction st st') + +data Interactions :: (ClientState, ServerState) -> (ClientState, ServerState) -> Type where + Done :: Interactions '(client0, server0) '(client0, server0) + Step :: Interaction '(client0, server0) '(client1, server1) + -> Interactions '(client1, server1) '(client2, server2) + -> Interactions '(client0, server0) '(client2, server2) + +deriving stock instance Show (Interactions st st') + +data Dialogue :: Type where + Dialogue :: Interactions '( '[] , '[] ) '(reqs , resps) -> Dialogue + +deriving instance Show Dialogue + +execDialogue :: Dialogue -> ClientServerTest +execDialogue (Dialogue interactions) = def { + client = \conn -> + clientSide conn interactions + , server = do + interactionsVar <- newMVar interactions + return [serverSide interactionsVar] + } + where + clientSide :: + Client.Connection + -> Interactions '( '[], '[] ) '(reqs, resps) + -> IO () + clientSide conn = go Nil + where + go :: NP RequestData reqs -> Interactions '(reqs, a) '(reqs', b) -> IO () + go _ Done = + return () + go reqs (Step (StartRequest timeout metadata) is) = do + req <- aux + go (req :* reqs) is + where + aux :: IO (RequestData 'RequestStarted) + aux = + RequestOpen <$> + Client.startRPC conn params (Proxy @DialogueRpc) + + params :: Client.CallParams + params = Client.CallParams { + callTimeout = timeout + , callRequestMetadata = metadata + } + go reqs (Step (CloseRequest ix) is) = do + reqs' <- updateAtM aux ix reqs + go reqs' is + where + aux :: RequestData 'RequestStarted -> IO (RequestData 'RequestEnded) + aux (RequestOpen call) = do + Client.abortRPC call + return $ RequestClosed + + serverSide :: + MVar (Interactions '( '[], '[] ) '(reqs, resps)) + -> Server.RpcHandler IO + serverSide _interactionsVar = undefined {- (Server.mkRpcHandler (Proxy @DialogueRpc) go) { + Server.handlerMetadata = \_requestMetadata -> + -- TODO: Hmm this is weird. we don't get to control /when/ the + -- response is sent back; what if we want the response metadata + -- to depend on some input /messages/ and not just the input + -- metadata? This is a problem with the API + undefined + } + where + go :: Server.Call DialogueRpc -> IO () + go _call = undefined + + -} + +