From 096d399b30d16da5ee3991660cdaeeb678f60efc Mon Sep 17 00:00:00 2001 From: Joel Stanley Date: Mon, 15 Aug 2016 12:55:40 -0500 Subject: [PATCH] MONAPP-1035: Fix errant hang on metadata exchange in Haskell client-side bidirectional streaming (#70) * (wip) Change ClientRWHandler to use an IO action for metadata acqui * Demonstrate placement of WaitForInitialMetadata() in hellos_client * Make hellos cpp server always send metadata first, for now * Show getMD placement in haskell hellos-client * Add prelim bidi metadata exchange behavior tweaks to line up with C++ API conventions --- examples/hellos/hellos-client/Main.hs | 3 +- examples/hellos/hellos-cpp/hellos_client.cc | 3 + examples/hellos/hellos-cpp/hellos_server.cc | 1 + src/Network/GRPC/LowLevel/Client.hs | 101 +++++++++++++++----- src/Network/GRPC/LowLevel/Op.hs | 22 +++-- tests/LowLevelTests.hs | 6 +- 6 files changed, 101 insertions(+), 35 deletions(-) diff --git a/examples/hellos/hellos-client/Main.hs b/examples/hellos/hellos-client/Main.hs index 0b87a28..4519c5f 100644 --- a/examples/hellos/hellos-client/Main.hs +++ b/examples/hellos/hellos-client/Main.hs @@ -84,7 +84,7 @@ doHelloBi c n = do let pay = BiRqtRpy "bidi payload" enc = BL.toStrict . toLazyByteString $ pay err desc e = fail $ "doHelloBi: " ++ desc ++ " error: " ++ show e - eea <- clientRW c rm n mempty $ \_ recv send writesDone -> do + eea <- clientRW c rm n mempty $ \_getMD recv send writesDone -> do -- perform n writes on a worker thread thd <- async $ do replicateM_ n $ send enc >>= \case @@ -94,6 +94,7 @@ doHelloBi c n = do Left e -> err "writesDone" e _ -> return () -- perform reads on this thread until the stream is terminated + -- emd <- getMD; putStrLn ("getMD result: " ++ show emd) fix $ \go -> recv >>= \case Left e -> err "recv" e Right Nothing -> return () diff --git a/examples/hellos/hellos-cpp/hellos_client.cc b/examples/hellos/hellos-cpp/hellos_client.cc index 661e53d..a61307e 100644 --- a/examples/hellos/hellos-cpp/hellos_client.cc +++ b/examples/hellos/hellos-cpp/hellos_client.cc @@ -94,6 +94,8 @@ class HellosClient { std::shared_ptr > strm(stub_->HelloBi(&ctx)); + // strm->WaitForInitialMetadata(); + // Spawn a writer thread which sends rqt to the server n times. std::thread writer([strm,rqt,n]() { for(unsigned i = 0; i < n; ++i) { @@ -107,6 +109,7 @@ class HellosClient { // there's nothing left to read. BiRqtRpy rpy; unsigned rpyCnt = 0; + while(strm->Read(&rpy)) { if (rpy.message() != pay) Die("DoHelloBi/rpy: expected payload '" + pay + diff --git a/examples/hellos/hellos-cpp/hellos_server.cc b/examples/hellos/hellos-cpp/hellos_server.cc index 988b271..2c7ae2b 100644 --- a/examples/hellos/hellos-cpp/hellos_server.cc +++ b/examples/hellos/hellos-cpp/hellos_server.cc @@ -56,6 +56,7 @@ class HellosImpl final : public Hellos::Service { Status HelloBi(ServerContext* context, ServerReaderWriter* strm) override { BiRqtRpy rqt; + strm->SendInitialMetadata(); while (strm->Read(&rqt)) { strm->Write(rqt); } diff --git a/src/Network/GRPC/LowLevel/Client.hs b/src/Network/GRPC/LowLevel/Client.hs index da912e7..48e5ad5 100644 --- a/src/Network/GRPC/LowLevel/Client.hs +++ b/src/Network/GRPC/LowLevel/Client.hs @@ -3,6 +3,7 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE TupleSections #-} {-# LANGUAGE ViewPatterns #-} -- | This module defines data structures and operations pertaining to registered @@ -11,6 +12,7 @@ module Network.GRPC.LowLevel.Client where import Control.Exception (bracket, finally) +import Control.Concurrent.MVar import Control.Monad import Control.Monad.IO.Class import Control.Monad.Trans.Except @@ -263,41 +265,94 @@ pattern CWRFinal mmsg initMD trailMD st ds -- clientRW (client side of bidirectional streaming mode) type ClientRWHandler - = MetadataMap + = IO (Either GRPCIOError MetadataMap) -> StreamRecv ByteString -> StreamSend ByteString -> WritesDone -> IO () type ClientRWResult = (MetadataMap, C.StatusCode, StatusDetails) --- | The most generic version of clientRW. It does not assume anything about --- threading model; caller must invoke the WritesDone operation, exactly once, --- for the half-close, after all threads have completed writing. TODO: It'd be --- nice to find a way to type-enforce this usage pattern rather than accomplish --- it via usage convention and documentation. clientRW :: Client -> RegisteredMethod 'BiDiStreaming -> TimeoutSeconds -> MetadataMap -> ClientRWHandler -> IO (Either GRPCIOError ClientRWResult) -clientRW cl@(clientCQ -> cq) rm tm initMeta f = withClientCall cl rm tm go - where - go (unsafeCC -> c) = runExceptT $ do - sendInitialMetadata c cq initMeta - srvMeta <- recvInitialMetadata c cq - liftIO $ f srvMeta (streamRecvPrim c cq) (streamSendPrim c cq) (writesDonePrim c cq) - -- NB: We could consider having the passed writesDone action safely set a - -- flag once it had been called, and invoke it ourselves if not set after - -- returning from the handler (although this is actually borked in the - -- concurrent case, because a reader may remain blocked without the - -- half-close and thus not return control to us -- doh). Alternately, we - -- can document just this general-purpose function well, and then create - -- slightly simpler versions of the bidi interface which support (a) - -- monothreaded send/recv interleaving with implicit half-close and (b) - -- send/recv threads with implicit half-close after writer thread - -- termination. - recvStatusOnClient c cq -- Finish() +clientRW cl rm tm initMeta f = + withClientCall cl rm tm (\cc -> clientRW' cl cc initMeta f) + +-- | The most generic version of clientRW. It does not assume anything about +-- threading model; caller must invoke the WritesDone operation, exactly once, +-- for the half-close, after all threads have completed writing. TODO: It'd be +-- nice to find a way to type-enforce this usage pattern rather than accomplish +-- it via usage convention and documentation. +clientRW' :: Client + -> ClientCall + -> MetadataMap + -> ClientRWHandler + -> IO (Either GRPCIOError ClientRWResult) +clientRW' (clientCQ -> cq) (unsafeCC -> c) initMeta f = runExceptT $ do + sendInitialMetadata c cq initMeta + + -- 'mdmv' is used to synchronize between callers of 'getMD' and 'recv' + -- below. The behavior of these two operations is different based on their + -- call order w.r.t. each other, and by whether or not metadata has already + -- been received. + -- + -- Regardless of call order, metadata reception is done exactly once. The + -- result of doing so is cached for subsequent calls to 'getMD'. + -- + -- 'getMD' will always return the received metadata (or an error if it + -- occurred), regardless of call order. + -- + -- When 'getMD' is invoked before 'recv' (and no metadata has been obtained), + -- metadata is received via a singleton batch, returned, and cached for later + -- access via 'getMD'. This scenario is analagous to preceding the first read + -- with WaitForInitialMetadata() in the C++ API. + -- + -- When 'recv' is invoked before 'getMD' (and no metadata has been obtained), + -- metadata is received alongside the first payload via an aggregate batch, + -- and cached for later access via 'getMD'. This scenario is analagous to just + -- issuing Read() in the C++ API, and having the metadata available via + -- ClientContext afterwards. + -- + -- TODO: This is not the whole story about metadata exchange ordering, but + -- allows us to at least have parity on the client side with the C++ API + -- w.r.t. when/how metadata is exchanged. We may need to revisit this a bit as + -- we experiment with other bindings, new GRPC releases come out, and so + -- forth, but at least this provides us with basic functionality, albeit with + -- a great deal more caveat programmer than desirable :( + + mdmv <- liftIO (newMVar Nothing) + let + getMD = modifyMVar mdmv $ \case + Just emd -> return (Just emd, emd) + Nothing -> do -- getMD invoked before recv + emd <- runExceptT (recvInitialMetadata c cq) + return (Just emd, emd) + + recv = modifyMVar mdmv $ \case + Just emd -> (Just emd,) <$> streamRecvPrim c cq + Nothing -> -- recv invoked before getMD + runExceptT (recvInitialMsgMD c cq) >>= \case + Left e -> return (Just (Left e), Left e) + Right (mbs, md) -> return (Just (Right md), Right mbs) + + send = streamSendPrim c cq + + -- TODO: Regarding usage of writesDone so that there isn't such a burden on + -- the end user programmer (i.e. must invoke it, and only once): we can just + -- document this general-purpose function well, and then create slightly + -- simpler versions of the bidi interface which support (a) monothreaded + -- send/recv interleaving, with an implicit half-close and (b) separate + -- send/recv threads, with an implicit half-close after the writer thread + -- terminates. These simpler versions model the most common use cases + -- without having to expose the half-close semantics to the end user + -- programmer. + writesDone = writesDonePrim c cq + + liftIO (f getMD recv send writesDone) + recvStatusOnClient c cq -- Finish() -------------------------------------------------------------------------------- -- clientRequest (client side of normal request/response) diff --git a/src/Network/GRPC/LowLevel/Op.hs b/src/Network/GRPC/LowLevel/Op.hs index c448d46..c895753 100644 --- a/src/Network/GRPC/LowLevel/Op.hs +++ b/src/Network/GRPC/LowLevel/Op.hs @@ -268,6 +268,8 @@ type RecvSingle a -> CompletionQueue -> ExceptT GRPCIOError IO a +pattern RecvMsgRslt mmsg <- Right [OpRecvMessageResult mmsg] + sendSingle :: SendSingle Op sendSingle c cq op = void (runOps' c cq [op]) @@ -278,25 +280,29 @@ sendStatusFromServer :: SendSingle (MetadataMap, C.StatusCode, StatusDetails) sendStatusFromServer c cq (md, st, ds) = sendSingle c cq (OpSendStatusFromServer md st ds) +recvInitialMessage :: RecvSingle ByteString +recvInitialMessage c cq = ExceptT (streamRecvPrim c cq ) >>= \case + Nothing -> throwE (GRPCIOInternalUnexpectedRecv "recvInitialMessage: no message.") + Just bs -> return bs + recvInitialMetadata :: RecvSingle MetadataMap recvInitialMetadata c cq = runOps' c cq [OpRecvInitialMetadata] >>= \case [OpRecvInitialMetadataResult md] -> return md _ -> throwE (GRPCIOInternalUnexpectedRecv "recvInitialMetadata") +recvInitialMsgMD :: RecvSingle (Maybe ByteString, MetadataMap) +recvInitialMsgMD c cq = runOps' c cq [OpRecvInitialMetadata, OpRecvMessage] >>= \case + [ OpRecvInitialMetadataResult md, OpRecvMessageResult mmsg] + -> return (mmsg, md) + _ -> throwE (GRPCIOInternalUnexpectedRecv "recvInitialMsgMD") + recvStatusOnClient :: RecvSingle (MetadataMap, C.StatusCode, StatusDetails) recvStatusOnClient c cq = runOps' c cq [OpRecvStatusOnClient] >>= \case [OpRecvStatusOnClientResult md st ds] -> return (md, st, StatusDetails ds) _ -> throwE (GRPCIOInternalUnexpectedRecv "recvStatusOnClient") -recvInitialMessage :: RecvSingle ByteString -recvInitialMessage c cq = runOps' c cq [OpRecvMessage] >>= \case - [OpRecvMessageResult (Just bs)] - -> return bs - [OpRecvMessageResult Nothing] - -> throwE (GRPCIOInternalUnexpectedRecv "recvInitialMessage: no message.") - _ -> throwE (GRPCIOInternalUnexpectedRecv "recvInitialMessage") -------------------------------------------------------------------------------- -- Streaming types and helpers @@ -309,8 +315,6 @@ streamRecvPrim c cq = f <$> runOps c cq [OpRecvMessage] f Right{} = Left (GRPCIOInternalUnexpectedRecv "streamRecvPrim") f (Left e) = Left e -pattern RecvMsgRslt mmsg <- Right [OpRecvMessageResult mmsg] - type StreamSend a = a -> IO (Either GRPCIOError ()) streamSendPrim :: C.Call -> CompletionQueue -> StreamSend ByteString streamSendPrim c cq bs = f <$> runOps c cq [OpSendMessage bs] diff --git a/tests/LowLevelTests.hs b/tests/LowLevelTests.hs index fb10593..5ecee59 100644 --- a/tests/LowLevelTests.hs +++ b/tests/LowLevelTests.hs @@ -289,7 +289,8 @@ testBiDiStreaming = client c = do rm <- clientRegisterMethodBiDiStreaming c "/bidi" - eea <- clientRW c rm 10 clientInitMD $ \_srvInitMD recv send writesDone -> do + eea <- clientRW c rm 10 clientInitMD $ \getMD recv send writesDone -> do + either clientFail (checkMD "Server rsp metadata mismatch" serverInitMD) =<< getMD send "cw0" `is` Right () recv `is` Right (Just "sw0") send "cw1" `is` Right () @@ -324,7 +325,8 @@ testBiDiStreamingUnregistered = client c = do rm <- clientRegisterMethodBiDiStreaming c "/bidi" - eea <- clientRW c rm 10 clientInitMD $ \_srvInitMD recv send writesDone -> do + eea <- clientRW c rm 10 clientInitMD $ \getMD recv send writesDone -> do + either clientFail (checkMD "Server rsp metadata mismatch" serverInitMD) =<< getMD send "cw0" `is` Right () recv `is` Right (Just "sw0") send "cw1" `is` Right ()