Skip to content

Commit

Permalink
WIP: Ensure deadlock freedom
Browse files Browse the repository at this point in the history
This makes a bunch of subtle adjustments that ensure that test failures and/or
network failures never result in deadlocks.
  • Loading branch information
edsko committed Jul 28, 2023
1 parent f93655f commit 5997ab3
Show file tree
Hide file tree
Showing 45 changed files with 2,227 additions and 1,427 deletions.
7 changes: 5 additions & 2 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@ package grapesy

source-repository-package
type: git
location: https://github.com/kazu-yamamoto/http2.git
tag: 059b24427ef33e7a0f8cb1a06dcf229590bd2d48
-- https://github.com/kazu-yamamoto/http2/pull/81
location: https://github.com/edsko/http2.git
tag: 9e7713bedc4788c0d117c3abea9d3dc5e046c377
-- location: https://github.com/kazu-yamamoto/http2.git
-- tag: 059b24427ef33e7a0f8cb1a06dcf229590bd2d48
9 changes: 4 additions & 5 deletions demo-client/Demo/Client/API/Core/NoFinal/Greeter.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ module Demo.Client.API.Core.NoFinal.Greeter (
sayHello
) where

import Control.Concurrent.STM
import Data.Default
import Data.Proxy

import Network.GRPC.Client
import Network.GRPC.Common.StreamElem (StreamElem(..))
import Network.GRPC.Common

import Proto.Helloworld

Expand All @@ -18,8 +17,8 @@ import Proto.Helloworld
sayHello :: Connection -> HelloRequest -> IO ()
sayHello conn n =
withRPC conn def (Proxy @(Protobuf Greeter "sayHello")) $ \call -> do
atomically $ sendInput call $ StreamElem n
out <- atomically $ recvOutput call
trailers <- atomically $ recvOutput call
sendInput call $ StreamElem n
out <- recvOutput call
trailers <- recvOutput call
print (out, trailers)

10 changes: 5 additions & 5 deletions demo-client/Demo/Client/API/Protobuf/Pipes/RouteGuide.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import Pipes.Safe

import Network.GRPC.Client
import Network.GRPC.Client.StreamType.Pipes
import Network.GRPC.Common.StreamElem (StreamElem(..))
import Network.GRPC.Common

import Proto.RouteGuide

Expand All @@ -34,23 +34,23 @@ listFeatures conn r = runSafeT . runEffect $

recordRoute ::
Connection
-> Producer' (StreamElem () Point) (SafeT IO) ()
-> Producer' (StreamElem NoMetadata Point) (SafeT IO) ()
-> IO ()
recordRoute conn ps = runSafeT . runEffect $
ps >-> (cons >>= logMsg)
where
cons :: Consumer' (StreamElem () Point) (SafeT IO) RouteSummary
cons :: Consumer' (StreamElem NoMetadata Point) (SafeT IO) RouteSummary
cons = clientStreaming conn def (Proxy @(Protobuf RouteGuide "recordRoute"))

routeChat ::
Connection
-> Producer' (StreamElem () RouteNote) IO ()
-> Producer' (StreamElem NoMetadata RouteNote) IO ()
-> IO ()
routeChat conn ns =
biDiStreaming conn def (Proxy @(Protobuf RouteGuide "routeChat")) aux
where
aux ::
Consumer' (StreamElem () RouteNote) IO ()
Consumer' (StreamElem NoMetadata RouteNote) IO ()
-> Producer' RouteNote IO ()
-> IO ()
aux cons prod =
Expand Down
6 changes: 3 additions & 3 deletions demo-client/Demo/Client/API/Protobuf/RouteGuide.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ module Demo.Client.API.Protobuf.RouteGuide (

import Network.GRPC.Client
import Network.GRPC.Client.StreamType
import Network.GRPC.Common.StreamElem (StreamElem(..))
import Network.GRPC.Common
import Network.GRPC.Common.StreamType

import Proto.RouteGuide
Expand All @@ -29,13 +29,13 @@ listFeatures conn rect = do
serverStreaming (rpc @(Protobuf RouteGuide "listFeatures") conn) rect $
logMsg

recordRoute :: Connection -> IO (StreamElem () Point) -> IO ()
recordRoute :: Connection -> IO (StreamElem NoMetadata Point) -> IO ()
recordRoute conn getPoint = do
summary <-
clientStreaming (rpc @(Protobuf RouteGuide "recordRoute") conn) getPoint
logMsg summary

routeChat :: Connection -> IO (StreamElem () RouteNote) -> IO ()
routeChat :: Connection -> IO (StreamElem NoMetadata RouteNote) -> IO ()
routeChat conn getNote = do
biDiStreaming (rpc @(Protobuf RouteGuide "routeChat") conn) getNote $
logMsg
22 changes: 12 additions & 10 deletions demo-client/Demo/Client/Util/DelayOr.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import Data.List.NonEmpty (NonEmpty(..))
import Data.List.NonEmpty qualified as NE
import Pipes

import Network.GRPC.Common.StreamElem (StreamElem(..))
import Network.GRPC.Common

import Demo.Common.Logging

Expand All @@ -24,13 +24,15 @@ isDelay :: DelayOr a -> Either Double a
isDelay (Delay d) = Left d
isDelay (Exec a) = Right a

execAll :: forall a. Show a => [DelayOr a] -> IO (IO (StreamElem () a))
execAll :: forall a. Show a => [DelayOr a] -> IO (IO (StreamElem NoMetadata a))
execAll =
fmap (flip modifyMVar getNext) . newMVar . alternating . map isDelay
where
getNext :: AltLists Double a -> IO (AltLists Double a, (StreamElem () a))
getNext ::
AltLists Double a
-> IO (AltLists Double a, (StreamElem NoMetadata a))
getNext (Alternating Nil) =
return (Alternating Nil, NoMoreElems ())
return (Alternating Nil, NoMoreElems NoMetadata)
getNext (Alternating (Lft ds xss)) = do
let d = sum ds
traceWith threadSafeTracer $ "Delay " ++ show d ++ "s"
Expand All @@ -48,14 +50,14 @@ execAll =

yieldAll :: forall a m.
(MonadIO m, Show a)
=> [DelayOr a] -> Producer' (StreamElem () a) m ()
=> [DelayOr a] -> Producer' (StreamElem NoMetadata a) m ()
yieldAll = withAlternating go . alternating . map isDelay
where
go ::
Alt d (NonEmpty Double) (NonEmpty a)
-> Producer' (StreamElem () a) m ()
-> Producer' (StreamElem NoMetadata a) m ()
go Nil =
yield $ NoMoreElems ()
yield $ NoMoreElems NoMetadata
go (Lft ds xss) = do
let d = sum ds
liftIO $ do
Expand All @@ -73,9 +75,9 @@ yieldAll = withAlternating go . alternating . map isDelay
checkIsFinal ::
NonEmpty a
-> Alt L (NonEmpty Double) (NonEmpty a)
-> StreamElem () a
checkIsFinal (a :| []) Nil = FinalElem a ()
checkIsFinal (a :| []) (Lft _ Nil) = FinalElem a ()
-> StreamElem NoMetadata a
checkIsFinal (a :| []) Nil = FinalElem a NoMetadata
checkIsFinal (a :| []) (Lft _ Nil) = FinalElem a NoMetadata
checkIsFinal (a :| []) (Lft _ (Rgt _ _)) = StreamElem a
checkIsFinal (a :| (_ : _)) _ = StreamElem a

Expand Down
2 changes: 1 addition & 1 deletion demo-server/Demo/Server/Service/Greeter.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import Data.ProtoLens.Labels ()
import Data.Proxy
import Data.Text (Text)

import Network.GRPC.Common.CustomMetadata (CustomMetadata(..))
import Network.GRPC.Common
import Network.GRPC.Common.StreamType
import Network.GRPC.Server
import Network.GRPC.Server.Protobuf
Expand Down
6 changes: 3 additions & 3 deletions demo-server/Demo/Server/Service/RouteGuide.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import Data.ProtoLens.Labels ()
import Data.Proxy
import Data.Time

import Network.GRPC.Common.StreamElem (StreamElem(..))
import Network.GRPC.Common
import Network.GRPC.Common.StreamElem qualified as StreamElem
import Network.GRPC.Common.StreamType
import Network.GRPC.Server
Expand Down Expand Up @@ -54,7 +54,7 @@ getFeature db p = return $ fromMaybe defMessage $ featureAt db p
listFeatures :: [Feature] -> Rectangle -> (Feature -> IO ()) -> IO ()
listFeatures db r send = mapM_ send $ filter (inRectangle r . view #location) db

recordRoute :: [Feature] -> IO (StreamElem () Point) -> IO RouteSummary
recordRoute :: [Feature] -> IO (StreamElem NoMetadata Point) -> IO RouteSummary
recordRoute db recv = do
start <- getCurrentTime
ps <- StreamElem.collect recv
Expand All @@ -63,7 +63,7 @@ recordRoute db recv = do

routeChat ::
[Feature]
-> IO (StreamElem () RouteNote)
-> IO (StreamElem NoMetadata RouteNote)
-> (RouteNote -> IO ())
-> IO ()
routeChat _db recv send = do
Expand Down
5 changes: 1 addition & 4 deletions demo-server/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,10 @@ getRouteGuideDb = do
Nothing -> error "Could not parse the route guide DB"

serverParams :: Cmdline -> ServerParams
serverParams cmd = ServerParams {
serverParams cmd = def {
serverDebugTracer =
if cmdDebug cmd
then contramap show threadSafeTracer
else serverDebugTracer def

, serverCompression =
def
}

17 changes: 14 additions & 3 deletions grapesy.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ common lang
DeriveGeneric
DeriveTraversable
DerivingStrategies
DerivingVia
DisambiguateRecordFields
EmptyCase
FlexibleContexts
Expand Down Expand Up @@ -77,10 +78,9 @@ library
Network.GRPC.Client.Binary
Network.GRPC.Client.StreamType
Network.GRPC.Client.StreamType.Pipes
Network.GRPC.Common
Network.GRPC.Common.Binary
Network.GRPC.Common.Compression
Network.GRPC.Common.CustomMetadata
Network.GRPC.Common.Exceptions
Network.GRPC.Common.StreamElem
Network.GRPC.Common.StreamType
Network.GRPC.Server
Expand Down Expand Up @@ -117,6 +117,7 @@ library
Network.GRPC.Util.HTTP2.TLS
Network.GRPC.Util.Parser
Network.GRPC.Util.Partial
Network.GRPC.Util.PrettyVal
Network.GRPC.Util.RedundantConstraint
Network.GRPC.Util.Session
Network.GRPC.Util.Session.API
Expand Down Expand Up @@ -157,6 +158,7 @@ library
, network-run >= 0.2 && < 0.3
, pipes >= 4.3 && < 4.4
, pipes-safe >= 2.3 && < 2.4
, pretty-show
, proto-lens >= 0.7 && < 0.8
, sop-core >= 0.5 && < 0.6
, stm >= 2.5 && < 2.6
Expand Down Expand Up @@ -206,8 +208,14 @@ test-suite test-grapesy
Paths_grapesy
Test.Driver.ClientServer
Test.Driver.Dialogue
Test.Driver.Dialogue.Definition
Test.Driver.Dialogue.Execution
Test.Driver.Dialogue.Generation
Test.Driver.Dialogue.TestClock
Test.Prop.Dialogue
Test.Sanity.StreamingType.NonStreaming
Test.Util.ClientServer
Test.Util.PrettyVal
build-depends:
-- Internal dependencies
, grapesy
Expand All @@ -219,12 +227,14 @@ test-suite test-grapesy
, contra-tracer >= 0.2 && < 0.3
, data-default >= 0.7 && < 0.8
, exceptions >= 0.10 && < 0.11
, http2
, mtl >= 2.2 && < 2.4
, pretty-show
, QuickCheck >= 2.14 && < 2.15
, stm >= 2.5 && < 2.6
, tasty >= 1.4 && < 1.5
, tasty-hunit >= 0.10 && < 0.11
-- , tasty-quickcheck >= 0.10 && < 0.11
, tasty-quickcheck >= 0.10 && < 0.11
, text >= 1.2 && < 2.1
, tls >= 1.5 && < 1.8

Expand Down Expand Up @@ -342,6 +352,7 @@ executable test-stress
, containers >= 0.6 && < 0.7
, contra-tracer >= 0.2 && < 0.3
, data-default >= 0.7 && < 0.8
, http2
, optparse-applicative >= 0.16 && < 0.19
, text >= 1.2 && < 2.1
, tls >= 1.5 && < 1.8
Expand Down
79 changes: 6 additions & 73 deletions src/Network/GRPC/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ module Network.GRPC.Client (
-- * Make RPCs
, Call -- opaque
, withRPC
, startRPC
, abortRPC

-- ** Call parameters
, CallParams(..)
Expand All @@ -46,88 +44,23 @@ module Network.GRPC.Client (
, recvFinalOutput
, recvAllOutputs

-- ** Low-level API
, sendInputSTM
, recvOutputSTM
, startRPC
, closeRPC

-- * Common serialization formats
, Protobuf
) where

import Control.Monad.Catch
import Control.Monad.IO.Class
import Data.Proxy
import GHC.Stack

import Network.GRPC.Client.Call
import Network.GRPC.Client.Connection
import Network.GRPC.Spec
import Network.GRPC.Spec.PseudoHeaders (Scheme(..), Authority(..))
import Network.GRPC.Spec.RPC
import Network.GRPC.Spec.RPC.Protobuf (Protobuf)
import Network.GRPC.Util.TLS qualified as Util.TLS

{-------------------------------------------------------------------------------
Make RPCs
-------------------------------------------------------------------------------}

-- | Start RPC call
--
-- This is a non-blocking call; the connection will be set up in a background
-- thread; if this takes time, then the first call to 'sendInput' or
-- 'recvOutput' will block, but the call to 'startRPC' itself will not block.
-- This non-blocking nature makes this safe to use in 'bracket' patterns.
--
-- This is a low-level API. Consider using 'withRPC' instead.
startRPC ::
IsRPC rpc
=> Connection -> CallParams -> Proxy rpc -> IO (Call rpc)
startRPC = initiateCall

-- | Stop RPC call
--
-- This is a low-level API. Consider using 'withRPC' instead.
--
-- TODO: Say say something about why this is called abort.
--
-- NOTE: When an 'Call' is closed, it remembers the /reason/ why it was closed.
-- For example, if it is closed due to a network exception, then this network
-- exception is recorded as part of this reason. When the call is closed due to
-- call to 'stopRPC', we use the 'CallAborted' exception. /If/ the call to
-- 'stopRPC' /itself/ was due to an exception, this exception will not be
-- recorded; if that is undesirable, consider using 'withRPC' instead.
abortRPC :: HasCallStack => Call rpc -> IO ()
abortRPC call = abortCall call $ CallAborted callStack

-- | Scoped RPC call
--
-- May throw
--
-- * 'RpcImmediateError' when we fail to establish the call
-- * 'CallClosed' when attempting to send or receive data an a closed call.
withRPC :: forall m rpc a.
(MonadMask m, MonadIO m, IsRPC rpc)
=> Connection -> CallParams -> Proxy rpc -> (Call rpc -> m a) -> m a
withRPC conn params proxy = fmap aux .
generalBracket
(liftIO $ startRPC conn params proxy)
(\call -> liftIO . \case
ExitCaseSuccess _ -> abortCall call $ CallAborted callStack
ExitCaseException e -> abortCall call e
ExitCaseAbort -> abortCall call UnknownException
)
where
aux :: (a, ()) -> a
aux = fst

-- | Exception corresponding to 'stopRPC'
--
-- We record the callstack to 'stopRPC'.
data CallAborted = CallAborted CallStack
deriving stock (Show)
deriving anyclass (Exception)

-- | Exception corresponding to the 'ExitCaseAbort' case of 'ExitCase'
data UnknownException = UnknownException
deriving stock (Show)
deriving anyclass (Exception)

{-------------------------------------------------------------------------------
Ongoing calls
-------------------------------------------------------------------------------}
Expand Down
Loading

0 comments on commit 5997ab3

Please sign in to comment.