From ce75d1125d9f7fee13c66db97af84e30617de7e8 Mon Sep 17 00:00:00 2001 From: Finley McIlwaine Date: Wed, 7 Aug 2024 19:08:32 -0700 Subject: [PATCH] Explicitly cancel streams on `closeRPC` Using the new `outBodyCancel`, the client can now clearly indicate that it is closing the stream as the result of an exception, and the server can handle it properly. --- grapesy.cabal | 4 +- .../Client/TestCase/CancelAfterBegin.hs | 14 +-- interop/Interop/Server.hs | 2 +- src/Network/GRPC/Client/Call.hs | 35 +++++- .../Test/Driver/Dialogue/Execution.hs | 54 ++++---- test-grapesy/Test/Prop/Dialogue.hs | 118 ++++++++++++------ test-grapesy/Test/Sanity/Exception.hs | 20 ++- util/Network/GRPC/Util/Session.hs | 1 + util/Network/GRPC/Util/Session/Channel.hs | 2 +- util/Network/GRPC/Util/Session/Client.hs | 21 +++- 10 files changed, 169 insertions(+), 102 deletions(-) diff --git a/grapesy.cabal b/grapesy.cabal index 268dc349..5e854ba0 100644 --- a/grapesy.cabal +++ b/grapesy.cabal @@ -212,7 +212,7 @@ library , exceptions >= 0.10 && < 0.11 , hashable >= 1.3 && < 1.5 , http-types >= 0.12 && < 0.13 - , http2 >= 5.3.1 && < 5.4 + , http2 >= 5.3.4 && < 5.4 , http2-tls >= 0.4.1 && < 0.5 , lens >= 5.0 && < 5.4 , mtl >= 2.2 && < 2.4 @@ -350,7 +350,7 @@ test-suite test-grapesy , containers >= 0.6 && < 0.8 , exceptions >= 0.10 && < 0.11 , http-types >= 0.12 && < 0.13 - , http2 >= 5.3.1 && < 5.4 + , http2 >= 5.3.4 && < 5.4 , lens >= 5.0 && < 5.4 , mtl >= 2.2 && < 2.4 , network >= 3.1 && < 3.3 diff --git a/interop/Interop/Client/TestCase/CancelAfterBegin.hs b/interop/Interop/Client/TestCase/CancelAfterBegin.hs index f65fb76a..9d8a1644 100644 --- a/interop/Interop/Client/TestCase/CancelAfterBegin.hs +++ b/interop/Interop/Client/TestCase/CancelAfterBegin.hs @@ -5,19 +5,19 @@ import Network.GRPC.Common import Interop.Client.Connect import Interop.Cmdline -import Interop.Util.Exceptions import Proto.API.Interop -- | -- -- This is not really testing anything about the server, but rather about how --- cancellation gets reported by the grapesy client library itself. +-- cancellation is handled by the grapesy client itself. In particular, +-- immediate cancellation does not result in any client-side exceptions or +-- errors. runTest :: Cmdline -> IO () runTest cmdline = - withConnection def (testServer cmdline) $ \conn -> do - assertThrows (assertEqual GrpcCancelled . grpcError) $ - withRPC conn def (Proxy @StreamingInputCall) $ \_call -> - -- Immediately cancel request - return () + withConnection def (testServer cmdline) $ \conn -> + withRPC conn def (Proxy @StreamingInputCall) $ \_call -> + -- Immediately cancel request + return () diff --git a/interop/Interop/Server.hs b/interop/Interop/Server.hs index 86ebfda7..b2f87a12 100644 --- a/interop/Interop/Server.hs +++ b/interop/Interop/Server.hs @@ -88,7 +88,7 @@ withInteropServer cmdline k = do = ServerConfig { serverSecure = Nothing , serverInsecure = Just InsecureConfig { - insecureHost = Nothing + insecureHost = Just "127.0.0.1" , insecurePort = cmdPort cmdline } } diff --git a/src/Network/GRPC/Client/Call.hs b/src/Network/GRPC/Client/Call.hs index 292f0ce7..50ef186e 100644 --- a/src/Network/GRPC/Client/Call.hs +++ b/src/Network/GRPC/Client/Call.hs @@ -111,10 +111,33 @@ withRPC conn callParams proxy k = fmap fst $ generalBracket (liftIO $ startRPC conn proxy callParams) closeRPC - k + (k . fst) where - closeRPC :: Call rpc -> ExitCase a -> m () - closeRPC call exitCase = liftIO $ do + closeRPC :: (Call rpc, Session.CancelRequest) -> ExitCase a -> m () + closeRPC (call, cancelRequest) exitCase = liftIO $ do + -- When we call 'Session.close', we will terminate the + -- 'sendMessageLoop', @http2@ will interpret this as a clean termination + -- of the stream. We must therefore cancel this stream before calling + -- 'Session.close'. /If/ the final message has already been sent, + -- @http2@ guarantees (as a postcondition of @outBodyPushFinal@) that + -- cancellation will be a no-op. + cancelRequest $ + case exitCase of + ExitCaseSuccess _ -> + -- Error code will be CANCEL + Nothing + ExitCaseAbort -> + -- Error code will be INTERNAL_ERROR. The client aborted with an + -- error that we don't have access to. We want to tell the server + -- that something has gone wrong (i.e. INTERNAL_ERROR), so we must + -- pass an exception, however the exact nature of the exception is + -- not particularly important as it is only recorded locally. + Just . toException $ Session.ChannelAborted callStack + ExitCaseException e -> + -- Error code will be INTERNAL_ERROR + Just e + + -- Close /after/ cancelling mException <- liftIO $ Session.close (callChannel call) exitCase case mException of Nothing -> return () @@ -186,7 +209,7 @@ startRPC :: forall rpc. => Connection -> Proxy rpc -> CallParams rpc - -> IO (Call rpc) + -> IO (Call rpc, Session.CancelRequest) startRPC conn _ callParams = do (connClosed, connToServer) <- Connection.getConnectionToServer conn cOut <- Connection.getOutboundCompression conn @@ -205,7 +228,7 @@ startRPC conn _ callParams = do . grpcClassifyTermination . either trailersOnlyToProperTrailers' id - channel <- + (channel, cancelRequest) <- Session.setupRequestChannel session connToServer @@ -235,7 +258,7 @@ startRPC conn _ callParams = do _mAlreadyClosed <- Session.close channel exitReason return () - return $ Call channel + return (Call channel, cancelRequest) where connParams :: ConnParams connParams = Connection.connParams conn diff --git a/test-grapesy/Test/Driver/Dialogue/Execution.hs b/test-grapesy/Test/Driver/Dialogue/Execution.hs index 13eea7bb..62fd76b4 100644 --- a/test-grapesy/Test/Driver/Dialogue/Execution.hs +++ b/test-grapesy/Test/Driver/Dialogue/Execution.hs @@ -2,7 +2,8 @@ {-# OPTIONS_GHC -Wno-orphans #-} module Test.Driver.Dialogue.Execution ( - execGlobalSteps + ConnUsage(..) + , execGlobalSteps ) where import Control.Concurrent @@ -16,6 +17,7 @@ import Data.Proxy import Data.Text qualified as Text import GHC.Stack import GHC.TypeLits +import Network.HTTP2.Client qualified as HTTP2.Client import Network.GRPC.Client qualified as Client import Network.GRPC.Client.Binary qualified as Client.Binary @@ -271,21 +273,17 @@ clientLocal clock call = \(LocalSteps steps) -> clientGlobal :: TestClock - -> Bool + -> ConnUsage -- ^ Use new connection for each RPC call? -- -- Multiple RPC calls on a single connection /ought/ to be independent of - -- each other, with something going wrong on one should not affect another. - -- This is currently however not the case, I /think/ due to limitations of - -- @http2@. - -- - -- See . + -- each other. Something going wrong on one should not affect another. -> GlobalSteps -> TestClient -clientGlobal clock connPerRPC global connParams testServer delimitTestScope = - if connPerRPC - then go Nothing [] (getGlobalSteps global) - else withConn $ \c -> go (Just c) [] (getGlobalSteps global) +clientGlobal clock connUsage global connParams testServer delimitTestScope = + case connUsage of + ConnPerRPC -> go Nothing [] (getGlobalSteps global) + SharedConn -> withConn $ \c -> go (Just c) [] (getGlobalSteps global) where withConn :: (Client.Connection -> IO ()) -> IO () withConn = Client.withConnection connParams testServer @@ -413,12 +411,7 @@ serverLocal clock call = \(LocalSteps steps) -> do Terminate mErr -> do mInp <- liftIO $ try $ within timeoutReceive action $ Server.Binary.recvInput call - -- TODO: - -- - -- On the server side we cannot distinguish regular client - -- termination from an exception when receiving. - let expectation = isExpectedElem $ NoMoreElems NoMetadata - expect (tick, action) expectation mInp + expect (tick, action) isClientDisconnected mInp modify $ ifPeerAlive $ PeerTerminated $ DeliberateException <$> mErr -- Wait for the client disconnect to become visible @@ -428,12 +421,6 @@ serverLocal clock call = \(LocalSteps steps) -> do -- terminate more-or-less immediately, this does not necessarily indicate -- any kind of failure: the client may simply have put the call in -- half-closed mode. - -- - -- TODO: - -- However, when the client terminates early and we are not using one - -- connection per RPC (i.e. we are sharing a connection), the server will - -- /never/ realize that the client has disappeared. See the discussion in - -- the issue above. waitForClientDisconnect :: IO () waitForClientDisconnect = within timeoutFailure () $ loop @@ -457,6 +444,16 @@ serverLocal clock call = \(LocalSteps steps) -> do isExpectedElem _ (Left _) = False isExpectedElem expectedElem (Right streamElem) = expectedElem == streamElem + isClientDisconnected :: + Either Server.ClientDisconnected (StreamElem NoMetadata Int) + -> Bool + isClientDisconnected (Left (Server.ClientDisconnected e _)) + | Just HTTP2.Client.ConnectionIsClosed <- fromException e + = True + | otherwise + = False + isClientDisconnected _ = False + serverGlobal :: HasCallStack => TestClock @@ -495,8 +492,10 @@ serverGlobal clock globalStepsVar call = do Top-level -------------------------------------------------------------------------------} -execGlobalSteps :: GlobalSteps -> IO ClientServerTest -execGlobalSteps steps = do +data ConnUsage = SharedConn | ConnPerRPC + +execGlobalSteps :: ConnUsage -> GlobalSteps -> IO ClientServerTest +execGlobalSteps connUsage steps = do globalStepsVar <- newMVar (order steps) clock <- TestClock.new @@ -513,7 +512,7 @@ execGlobalSteps steps = do expectEarlyClientTermination = clientTerminatesEarly , expectEarlyServerTermination = serverTerminatesEarly } - , client = clientGlobal clock connPerRPC steps + , client = clientGlobal clock connUsage steps , server = [ handler (Proxy @TestRpc1) , handler (Proxy @TestRpc2) @@ -524,9 +523,6 @@ execGlobalSteps steps = do clientTerminatesEarly, serverTerminatesEarly :: Bool (clientTerminatesEarly, serverTerminatesEarly) = hasEarlyTermination steps - connPerRPC :: Bool - connPerRPC = serverTerminatesEarly || clientTerminatesEarly - -- For 'clientGlobal' the order doesn't matter, because it spawns a thread -- for each 'LocalSteps'. The server however doesn't get this option; the -- threads /get/ spawnwed for each incoming connection, and must feel off diff --git a/test-grapesy/Test/Prop/Dialogue.hs b/test-grapesy/Test/Prop/Dialogue.hs index d7bab278..69d5d1c2 100644 --- a/test-grapesy/Test/Prop/Dialogue.hs +++ b/test-grapesy/Test/Prop/Dialogue.hs @@ -15,39 +15,48 @@ import Test.Driver.Dialogue tests :: TestTree tests = testGroup "Test.Prop.Dialogue" [ testGroup "Regression" [ - testCase "trivial1" $ regression trivial1 - , testCase "trivial2" $ regression trivial2 - , testCase "trivial3" $ regression trivial3 - , testCase "concurrent1" $ regression concurrent1 - , testCase "concurrent2" $ regression concurrent2 - , testCase "concurrent3" $ regression concurrent3 - , testCase "concurrent4" $ regression concurrent4 - , testCase "exception1" $ regression exception1 - , testCase "exception2" $ regression exception2 - , testCase "earlyTermination01" $ regression earlyTermination01 - , testCase "earlyTermination02" $ regression earlyTermination02 - , testCase "earlyTermination03" $ regression earlyTermination03 - , testCase "earlyTermination04" $ regression earlyTermination04 - , testCase "earlyTermination05" $ regression earlyTermination05 - , testCase "earlyTermination06" $ regression earlyTermination06 - , testCase "earlyTermination07" $ regression earlyTermination07 - , testCase "earlyTermination08" $ regression earlyTermination08 - , testCase "earlyTermination09" $ regression earlyTermination09 - , testCase "earlyTermination10" $ regression earlyTermination10 - , testCase "earlyTermination11" $ regression earlyTermination11 - , testCase "earlyTermination12" $ regression earlyTermination12 - , testCase "earlyTermination13" $ regression earlyTermination13 - , testCase "earlyTermination14" $ regression earlyTermination14 - , testCase "allowHalfClosed1" $ regression allowHalfClosed1 - , testCase "allowHalfClosed2" $ regression allowHalfClosed2 - , testCase "allowHalfClosed3" $ regression allowHalfClosed3 + testCase "trivial1" $ regression SharedConn trivial1 + , testCase "trivial2" $ regression SharedConn trivial2 + , testCase "trivial3" $ regression SharedConn trivial3 + , testCase "concurrent1" $ regression SharedConn concurrent1 + , testCase "concurrent2" $ regression SharedConn concurrent2 + , testCase "concurrent3" $ regression SharedConn concurrent3 + , testCase "concurrent4" $ regression SharedConn concurrent4 + , testCase "exception1" $ regression ConnPerRPC exception1 + , testCase "exception2" $ regression ConnPerRPC exception2 + , testCase "earlyTermination01" $ regression ConnPerRPC earlyTermination01 + , testCase "earlyTermination02" $ regression ConnPerRPC earlyTermination02 + , testCase "earlyTermination03" $ regression ConnPerRPC earlyTermination03 + , testCase "earlyTermination04" $ regression ConnPerRPC earlyTermination04 + , testCase "earlyTermination05" $ regression ConnPerRPC earlyTermination05 + , testCase "earlyTermination06" $ regression ConnPerRPC earlyTermination06 + , testCase "earlyTermination07" $ regression ConnPerRPC earlyTermination07 + , testCase "earlyTermination08" $ regression ConnPerRPC earlyTermination08 + , testCase "earlyTermination09" $ regression ConnPerRPC earlyTermination09 + , testCase "earlyTermination10" $ regression ConnPerRPC earlyTermination10 + , testCase "earlyTermination11" $ regression ConnPerRPC earlyTermination11 + , testCase "earlyTermination12" $ regression ConnPerRPC earlyTermination12 + , testCase "earlyTermination13" $ regression ConnPerRPC earlyTermination13 + , testCase "earlyTermination14" $ regression ConnPerRPC earlyTermination14 + , testCase "unilateralTermination1" $ regression SharedConn unilateralTermination1 + , testCase "unilateralTermination2" $ regression SharedConn unilateralTermination2 + , testCase "unilateralTermination3" $ regression SharedConn unilateralTermination3 + , testCase "allowHalfClosed1" $ regression SharedConn allowHalfClosed1 + , testCase "allowHalfClosed2" $ regression SharedConn allowHalfClosed2 + , testCase "allowHalfClosed3" $ regression ConnPerRPC allowHalfClosed3 ] , testGroup "Setup" [ testProperty "shrinkingWellFounded" prop_shrinkingWellFounded ] , testGroup "Arbitrary" [ - testProperty "withoutExceptions" arbitraryWithoutExceptions - , testProperty "withExceptions" arbitraryWithExceptions + testGroup "WithoutExceptions" [ + testProperty "connPerRPC" $ arbitraryWithoutExceptions ConnPerRPC + , testProperty "sharedConn" $ arbitraryWithoutExceptions SharedConn + ] + , testGroup "WithExceptions" [ + testProperty "connPerRPC" $ arbitraryWithExceptions ConnPerRPC + , testProperty "sharedConn" $ arbitraryWithExceptions SharedConn + ] ] ] @@ -66,26 +75,26 @@ prop_shrinkingWellFounded = Running the tests -------------------------------------------------------------------------------} -arbitraryWithoutExceptions :: DialogueWithoutExceptions -> Property -arbitraryWithoutExceptions (DialogueWithoutExceptions dialogue) = - propDialogue dialogue +arbitraryWithoutExceptions :: ConnUsage -> DialogueWithoutExceptions -> Property +arbitraryWithoutExceptions connUsage (DialogueWithoutExceptions dialogue) = + propDialogue connUsage dialogue -arbitraryWithExceptions :: DialogueWithExceptions -> Property -arbitraryWithExceptions (DialogueWithExceptions dialogue) = - propDialogue dialogue +arbitraryWithExceptions :: ConnUsage -> DialogueWithExceptions -> Property +arbitraryWithExceptions connUsage (DialogueWithExceptions dialogue) = + propDialogue connUsage dialogue -propDialogue :: Dialogue -> Property -propDialogue dialogue = +propDialogue :: ConnUsage -> Dialogue -> Property +propDialogue connUsage dialogue = counterexample (show globalSteps) $ - propClientServer $ execGlobalSteps globalSteps + propClientServer $ execGlobalSteps connUsage globalSteps where globalSteps :: GlobalSteps globalSteps = dialogueGlobalSteps dialogue -regression :: Dialogue -> IO () -regression dialogue = +regression :: ConnUsage -> Dialogue -> IO () +regression connUsage dialogue = handle (throwIO . RegressionTestFailed globalSteps) $ - testClientServer =<< execGlobalSteps globalSteps + testClientServer =<< execGlobalSteps connUsage globalSteps where globalSteps :: GlobalSteps globalSteps = dialogueGlobalSteps dialogue @@ -359,6 +368,35 @@ earlyTermination14 = NormalizedDialogue [ , (0, ServerAction $ Terminate (Just (SomeServerException 0))) ] +unilateralTermination1 :: Dialogue +unilateralTermination1 = NormalizedDialogue [ + (1,ClientAction (Initiate (def,RPC1))) + , (1,ServerAction (Send (FinalElem 0 def))) + , (0,ClientAction (Initiate (def,RPC1))) + , (0,ClientAction (Send (NoMoreElems NoMetadata))) + , (0,ServerAction (Send (NoMoreElems def))) + ] + +unilateralTermination2 :: Dialogue +unilateralTermination2 = NormalizedDialogue [ + (1,ClientAction (Initiate (def,RPC1))) + , (1,ServerAction (Send (FinalElem 0 def))) + , (0,ClientAction (Initiate (def,RPC1))) + , (0,ClientAction (Send (NoMoreElems NoMetadata))) + , (0,ServerAction (Send (NoMoreElems def))) + , (2,ClientAction (Initiate (def,RPC1))) + , (2,ServerAction (Send (FinalElem 0 def))) + ] + +unilateralTermination3 :: Dialogue +unilateralTermination3 = NormalizedDialogue [ + (0, ClientAction (Initiate (def,RPC1))) + , (0, ServerAction (Send (NoMoreElems def))) + , (1, ClientAction (Initiate (def,RPC1))) + , (1, ClientAction (Send (FinalElem 0 NoMetadata))) + , (1, ServerAction (Send (NoMoreElems def))) + ] + {------------------------------------------------------------------------------- Dealing correctly with 'AllowHalfClosed' diff --git a/test-grapesy/Test/Sanity/Exception.hs b/test-grapesy/Test/Sanity/Exception.hs index 0e97e6a5..756a92a9 100644 --- a/test-grapesy/Test/Sanity/Exception.hs +++ b/test-grapesy/Test/Sanity/Exception.hs @@ -46,7 +46,7 @@ tests = testGroup "Test.Sanity.Exception" [ -- | Client makes many concurrent calls, throws an exception during one of them. test_clientException :: IO () test_clientException = testClientServer $ ClientServerTest { - config = def + config = def { expectEarlyClientTermination = True } , client = simpleTestClient $ \conn -> do -- Make 100 concurrent calls. 99 of them counting to 50, and one -- more that throws an exception once it reaches 10. @@ -129,22 +129,23 @@ test_serverException = do -- does not wait for client termination. test_earlyTerminationNoWait :: IO () test_earlyTerminationNoWait = testClientServer $ ClientServerTest { - config = def + config = def { expectEarlyClientTermination = True } , client = simpleTestClient $ \conn -> do _mResult <- try @DeliberateException $ Client.withRPC conn def (Proxy @Trivial) $ \_call -> throwIO (DeliberateException $ SomeServerException 0) - Client.withRPC conn def (Proxy @Trivial) $ \call -> do + result <- Client.withRPC conn def (Proxy @Trivial) $ \call -> do Binary.sendFinalInput @Word8 call 0 - _output <- Binary.recvOutput @Word8 call - return () + Binary.recvFinalOutput @Word8 call + + assertEqual "" (1, NoMetadata) result , server = [ Server.someRpcHandler $ Server.mkRpcHandler @Trivial $ \call -> Binary.recvInput @Word8 call >>= \case - _ -> Server.sendTrailers call NoMetadata + _ -> Binary.sendFinalOutput @Word8 call (1, NoMetadata) ] } @@ -180,10 +181,5 @@ incUntilFinal call = do FinalElem n _ -> do Binary.sendFinalOutput @Word64 call (succ n, NoMetadata) NoMoreElems _ -> do - -- TODO: - -- - -- We shouldn't need to handle this case, since our client never - -- explicitly sends 'NoMoreElems'. However, see discussion in the - -- ticket above. - Server.sendTrailers call NoMetadata + -- This is never hit, since our client never sends 'NoMoreElems'. return () diff --git a/util/Network/GRPC/Util/Session.hs b/util/Network/GRPC/Util/Session.hs index ccff3e04..869361d0 100644 --- a/util/Network/GRPC/Util/Session.hs +++ b/util/Network/GRPC/Util/Session.hs @@ -44,6 +44,7 @@ module Network.GRPC.Util.Session ( -- ** Construction -- *** Client , ConnectionToServer(..) + , CancelRequest , setupRequestChannel -- *** Server , ConnectionToClient(..) diff --git a/util/Network/GRPC/Util/Session/Channel.hs b/util/Network/GRPC/Util/Session/Channel.hs index ebb87de9..1d6c5826 100644 --- a/util/Network/GRPC/Util/Session/Channel.hs +++ b/util/Network/GRPC/Util/Session/Channel.hs @@ -434,7 +434,7 @@ close Channel{channelOutbound} reason = do AlreadyTerminated _ -> return $ Nothing AlreadyAborted _err -> - -- Connection_ to the peer was lost prior to closing + -- Connection to the peer was lost prior to closing return $ Nothing Cancelled -> -- Proper procedure for outbound messages was not followed diff --git a/util/Network/GRPC/Util/Session/Client.hs b/util/Network/GRPC/Util/Session/Client.hs index 5ea1df51..bfd74553 100644 --- a/util/Network/GRPC/Util/Session/Client.hs +++ b/util/Network/GRPC/Util/Session/Client.hs @@ -2,10 +2,13 @@ module Network.GRPC.Util.Session.Client ( ConnectionToServer(..) , NoTrailers(..) + , CancelRequest , setupRequestChannel ) where +import Control.Concurrent import Control.Concurrent.STM +import Control.Monad import Control.Monad.Catch import Data.ByteString qualified as BS.Strict import Data.ByteString qualified as Strict (ByteString) @@ -83,6 +86,8 @@ class NoTrailers sess where -- | There is no interesting information in the trailers noTrailers :: Proxy sess -> Trailers (Outbound sess) +type CancelRequest = Maybe SomeException -> IO () + -- | Setup request channel -- -- This initiates a new request. @@ -95,7 +100,7 @@ setupRequestChannel :: forall sess. -- ^ We assume that when the server closes their outbound connection to us, -- the entire conversation is over (i.e., the server cannot "half-close"). -> FlowStart (Outbound sess) - -> IO (Channel sess) + -> IO (Channel sess, CancelRequest) setupRequestChannel sess ConnectionToServer{sendRequest} terminateCall @@ -104,6 +109,10 @@ setupRequestChannel sess channel <- initChannel let requestInfo = buildRequestInfo sess outboundStart + cancelRequestVar <- newEmptyMVar + let cancelRequest :: CancelRequest + cancelRequest e = join . (fmap ($ e)) $ readMVar cancelRequestVar + case outboundStart of FlowStartRegular headers -> do regular <- initFlowStateRegular headers @@ -112,7 +121,7 @@ setupRequestChannel sess (requestMethod requestInfo) (requestPath requestInfo) (requestHeaders requestInfo) - $ outboundThread channel regular + $ outboundThread channel cancelRequestVar regular forkRequest channel req FlowStartNoMessages trailers -> do let state :: FlowState (Outbound sess) @@ -123,6 +132,8 @@ setupRequestChannel sess (requestMethod requestInfo) (requestPath requestInfo) (requestHeaders requestInfo) + -- Can't cancel non-streaming request + putMVar cancelRequestVar $ \_ -> return () atomically $ modifyTVar (channelOutbound channel) $ \oldState -> case oldState of @@ -132,7 +143,7 @@ setupRequestChannel sess error "setupRequestChannel: expected thread state" forkRequest channel req - return channel + return (channel, cancelRequest) where _ = addConstraint @(NoTrailers sess) @@ -174,12 +185,14 @@ setupRequestChannel sess outboundThread :: Channel sess + -> MVar CancelRequest -> RegularFlowState (Outbound sess) -> Client.OutBodyIface -> IO () - outboundThread channel regular iface = + outboundThread channel cancelRequestVar regular iface = threadBody "grapesy:clientOutbound" (channelOutbound channel) $ \markReady _debugId -> do markReady $ FlowStateRegular regular + putMVar cancelRequestVar (Client.outBodyCancel iface) stream <- clientOutputStream iface -- Unlike the client inbound thread, or the inbound/outbound threads -- of the server, http2 knows about this particular thread and may