From ff73b5eb5aa9971118bd8fcacc300abb80b9e72f Mon Sep 17 00:00:00 2001 From: Connor Clark Date: Wed, 22 Jun 2016 10:41:14 -0700 Subject: [PATCH] serverRequestCall: block until request received (#30) * fix error for client timeouts * registered calls: block until call received, simplify cleanup * unregistered calls: block until request received * Fix up tests --- examples/echo/echo-server/Main.hs | 12 +- src/Network/GRPC/LowLevel/Call.hs | 39 +------ .../GRPC/LowLevel/Call/Unregistered.hs | 25 +---- src/Network/GRPC/LowLevel/CompletionQueue.hs | 105 ++++++++++-------- .../GRPC/LowLevel/CompletionQueue/Internal.hs | 1 + .../LowLevel/CompletionQueue/Unregistered.hs | 86 +++++++------- src/Network/GRPC/LowLevel/Server.hs | 19 ++-- .../GRPC/LowLevel/Server/Unregistered.hs | 37 +++--- src/Network/GRPC/Unsafe.chs | 4 + tests/LowLevelTests.hs | 71 +++--------- tests/LowLevelTests/Op.hs | 2 +- 11 files changed, 160 insertions(+), 241 deletions(-) diff --git a/examples/echo/echo-server/Main.hs b/examples/echo/echo-server/Main.hs index dac5fdb..9315f0c 100644 --- a/examples/echo/echo-server/Main.hs +++ b/examples/echo/echo-server/Main.hs @@ -1,5 +1,6 @@ {-# LANGUAGE OverloadedLists #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RecordWildCards #-} {-# OPTIONS_GHC -fno-warn-missing-signatures #-} {-# OPTIONS_GHC -fno-warn-unused-binds #-} @@ -13,9 +14,10 @@ import qualified Network.GRPC.LowLevel.Call.Unregistered as U serverMeta :: MetadataMap serverMeta = [("test_meta", "test_meta_value")] -handler :: U.ServerCall -> ByteString -> MetadataMap -> MethodName +handler :: U.ServerCall + -> ByteString -> IO (ByteString, MetadataMap, StatusCode, StatusDetails) -handler _call reqBody _reqMeta _method = do +handler U.ServerCall{..} reqBody = do --putStrLn $ "Got request for method: " ++ show method --putStrLn $ "Got metadata: " ++ show reqMeta return (reqBody, serverMeta, StatusOk, StatusDetails "") @@ -23,7 +25,7 @@ handler _call reqBody _reqMeta _method = do unregMain :: IO () unregMain = withGRPC $ \grpc -> do withServer grpc (ServerConfig "localhost" 50051 []) $ \server -> forever $ do - result <- U.serverHandleNormalCall server 15 serverMeta handler + result <- U.serverHandleNormalCall server serverMeta handler case result of Left x -> putStrLn $ "handle call result error: " ++ show x Right _ -> return () @@ -34,7 +36,7 @@ regMain = withGRPC $ \grpc -> do withServer grpc (ServerConfig "localhost" 50051 methods) $ \server -> forever $ do let method = head (registeredMethods server) - result <- serverHandleNormalCall server method 15 serverMeta $ + result <- serverHandleNormalCall server method serverMeta $ \_call reqBody _reqMeta -> return (reqBody, serverMeta, StatusOk, StatusDetails "") case result of @@ -44,7 +46,7 @@ regMain = withGRPC $ \grpc -> do -- | loop to fork n times regLoop :: Server -> RegisteredMethod -> IO () regLoop server method = forever $ do - result <- serverHandleNormalCall server method 15 serverMeta $ + result <- serverHandleNormalCall server method serverMeta $ \_call reqBody _reqMeta -> return (reqBody, serverMeta, StatusOk, StatusDetails "") case result of diff --git a/src/Network/GRPC/LowLevel/Call.hs b/src/Network/GRPC/LowLevel/Call.hs index aa8a25d..bd222c1 100644 --- a/src/Network/GRPC/LowLevel/Call.hs +++ b/src/Network/GRPC/LowLevel/Call.hs @@ -65,10 +65,9 @@ clientCallCancel cc = C.grpcCallCancel (unClientCall cc) C.reserved -- the C state needed to respond to a registered call. data ServerCall = ServerCall { unServerCall :: C.Call, - requestMetadataRecv :: Ptr C.MetadataArray, - optionalPayload :: Ptr C.ByteBuffer, + requestMetadataRecv :: MetadataMap, + optionalPayload :: Maybe ByteString, parentPtr :: Maybe (Ptr C.Call), - callDeadlinePtr :: C.CTimeSpecPtr, callDeadline :: TimeSpec } @@ -76,24 +75,6 @@ serverCallCancel :: ServerCall -> C.StatusCode -> String -> IO () serverCallCancel sc code reason = C.grpcCallCancelWithStatus (unServerCall sc) code reason C.reserved -serverCallGetMetadata :: ServerCall -> IO MetadataMap -serverCallGetMetadata ServerCall{..} = do - marray <- peek requestMetadataRecv - C.getAllMetadataArray marray - --- | Extract the client request body from the given call, if present. TODO: the --- reason this returns @Maybe ByteString@ is because the gRPC library calls the --- underlying out parameter "optional_payload". I am not sure exactly in what --- cases it won't be present. The C++ library checks a has_request_payload_ bool --- and passes in nullptr to request_registered_call if the bool is false, so we --- may be able to do the payload present/absent check earlier. -serverCallGetPayload :: ServerCall -> IO (Maybe ByteString) -serverCallGetPayload ServerCall{..} = do - bb@(C.ByteBuffer rawPtr) <- peek optionalPayload - if rawPtr == nullPtr - then return Nothing - else Just <$> C.copyByteBufferToByteString bb - serverCallIsExpired :: ServerCall -> IO Bool serverCallIsExpired sc = do currTime <- getTime Monotonic @@ -110,24 +91,16 @@ debugClientCall = const $ return () debugServerCall :: ServerCall -> IO () #ifdef DEBUG -debugServerCall call@(ServerCall (C.Call ptr) _ _ _ _ _) = do +debugServerCall call@(ServerCall (C.Call ptr) _ _ _ _) = do grpcDebug $ "debugServerCall(R): server call: " ++ (show ptr) grpcDebug $ "debugServerCall(R): metadata ptr: " ++ show (requestMetadataRecv call) - metadataArr <- peek (requestMetadataRecv call) - metadata <- C.getAllMetadataArray metadataArr - grpcDebug $ "debugServerCall(R): metadata received: " ++ (show metadata) grpcDebug $ "debugServerCall(R): payload ptr: " ++ show (optionalPayload call) - payload <- peek (optionalPayload call) - bs <- C.copyByteBufferToByteString payload - grpcDebug $ "debugServerCall(R): payload contents: " ++ show bs forM_ (parentPtr call) $ \parentPtr' -> do grpcDebug $ "debugServerCall(R): parent ptr: " ++ show parentPtr' (C.Call parent) <- peek parentPtr' grpcDebug $ "debugServerCall(R): parent: " ++ show parent grpcDebug $ "debugServerCall(R): deadline ptr: " ++ show (callDeadline call) - timespec <- peek (callDeadlinePtr call) - grpcDebug $ "debugServerCall(R): deadline: " ++ show (C.timeSpec timespec) #else {-# INLINE debugServerCall #-} debugServerCall = const $ return () @@ -144,11 +117,5 @@ destroyServerCall call@ServerCall{..} = do debugServerCall call grpcDebug $ "Destroying server-side call object: " ++ show unServerCall C.grpcCallDestroy unServerCall - grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecv - C.metadataArrayDestroy requestMetadataRecv - grpcDebug $ "destroying optional payload" ++ show optionalPayload - C.destroyReceivingByteBuffer optionalPayload grpcDebug $ "freeing parentPtr: " ++ show parentPtr forM_ parentPtr free - grpcDebug $ "destroying deadline." ++ show callDeadline - C.timespecDestroy callDeadlinePtr diff --git a/src/Network/GRPC/LowLevel/Call/Unregistered.hs b/src/Network/GRPC/LowLevel/Call/Unregistered.hs index b14537a..361de7c 100644 --- a/src/Network/GRPC/LowLevel/Call/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/Call/Unregistered.hs @@ -6,6 +6,8 @@ import Control.Monad import Foreign.Marshal.Alloc (free) import Foreign.Ptr (Ptr) import Foreign.Storable (peek) +import System.Clock (TimeSpec) + import Network.GRPC.LowLevel.Call (Host (..), MethodName (..)) import Network.GRPC.LowLevel.GRPC (MetadataMap, grpcDebug) import qualified Network.GRPC.Unsafe as C @@ -17,28 +19,17 @@ import qualified Network.GRPC.Unsafe.Op as C -- call. data ServerCall = ServerCall { unServerCall :: C.Call - , requestMetadataRecv :: Ptr C.MetadataArray + , requestMetadataRecv :: MetadataMap , parentPtr :: Maybe (Ptr C.Call) - , callDetails :: C.CallDetails + , callDeadline :: TimeSpec + , callMethod :: MethodName + , callHost :: Host } serverCallCancel :: ServerCall -> C.StatusCode -> String -> IO () serverCallCancel sc code reason = C.grpcCallCancelWithStatus (unServerCall sc) code reason C.reserved -serverCallGetMetadata :: ServerCall -> IO MetadataMap -serverCallGetMetadata ServerCall{..} = do - marray <- peek requestMetadataRecv - C.getAllMetadataArray marray - -serverCallGetMethodName :: ServerCall -> IO MethodName -serverCallGetMethodName ServerCall{..} = - MethodName <$> C.callDetailsGetMethod callDetails - -serverCallGetHost :: ServerCall -> IO Host -serverCallGetHost ServerCall{..} = - Host <$> C.callDetailsGetHost callDetails - debugServerCall :: ServerCall -> IO () #ifdef DEBUG debugServerCall call@(ServerCall (C.Call ptr) _ _ _) = do @@ -66,9 +57,5 @@ destroyServerCall call@ServerCall{..} = do debugServerCall call grpcDebug $ "Destroying server-side call object: " ++ show unServerCall C.grpcCallDestroy unServerCall - grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecv - C.metadataArrayDestroy requestMetadataRecv grpcDebug $ "freeing parentPtr: " ++ show parentPtr forM_ parentPtr free - grpcDebug $ "destroying call details: " ++ show callDetails - C.destroyCallDetails callDetails diff --git a/src/Network/GRPC/LowLevel/CompletionQueue.hs b/src/Network/GRPC/LowLevel/CompletionQueue.hs index ec060fd..0bac639 100644 --- a/src/Network/GRPC/LowLevel/CompletionQueue.hs +++ b/src/Network/GRPC/LowLevel/CompletionQueue.hs @@ -34,6 +34,7 @@ import Control.Concurrent.STM (atomically, check) import Control.Concurrent.STM.TVar (newTVarIO, readTVar, writeTVar) import Control.Exception (bracket) +import Control.Monad (liftM2) import Data.IORef (newIORef) import Data.List (intersperse) import Foreign.Marshal.Alloc (free, malloc) @@ -51,6 +52,8 @@ import System.Timeout (timeout) import Network.GRPC.LowLevel.Call import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.CompletionQueue.Internal +import qualified Network.GRPC.Unsafe.ByteBuffer as C +import qualified Network.GRPC.Unsafe.Metadata as C withCompletionQueue :: GRPC -> (CompletionQueue -> IO a) -> IO a withCompletionQueue grpc = bracket (createCompletionQueue grpc) @@ -131,61 +134,69 @@ channelCreateCall -- | Create the call object to handle a registered call. serverRequestCall :: C.Server -> CompletionQueue - -> TimeoutSeconds -> RegisteredMethod -> IO (Either GRPCIOError ServerCall) serverRequestCall - server cq@CompletionQueue{..} timeLimit RegisteredMethod{..} = - withPermission Push cq $ do - -- NOTE: the below stuff is freed when we free the call we return. - deadlinePtr <- malloc - callPtr <- malloc - metadataArrayPtr <- C.metadataArrayCreate - metadataArray <- peek metadataArrayPtr - bbPtr <- malloc - tag <- newTag cq - grpcDebug $ "serverRequestCall(R): tag is " ++ show tag - callError <- C.grpcServerRequestRegisteredCall - server methodHandle callPtr deadlinePtr - metadataArray bbPtr unsafeCQ unsafeCQ tag - grpcDebug $ "serverRequestCall(R): callError: " - ++ show callError - if callError /= C.CallOk - then do grpcDebug "serverRequestCall(R): callError. cleaning up" - failureCleanup deadlinePtr callPtr metadataArrayPtr bbPtr - return $ Left $ GRPCIOCallError callError - else do pluckResult <- pluck cq tag (Just timeLimit) - grpcDebug "serverRequestCall(R): finished pluck." - case pluckResult of - Left x -> do - grpcDebug "serverRequestCall(R): cleanup pluck err" - failureCleanup deadlinePtr callPtr metadataArrayPtr bbPtr - return $ Left x - Right () -> do - rawCall <- peek callPtr - deadline <- convertDeadline deadlinePtr - let assembledCall = ServerCall rawCall metadataArrayPtr - bbPtr Nothing deadlinePtr deadline - return $ Right assembledCall - --TODO: the gRPC library appears to hold onto these pointers for a random - -- amount of time, even after returning from the only call that uses them. - -- This results in malloc errors if - -- gRPC tries to modify them after we free them. To work around it, - -- we sleep for a while before freeing the objects. We should find a - -- permanent solution that's more robust. - where failureCleanup deadline callPtr metadataArrayPtr bbPtr = forkIO $ do - threadDelaySecs 30 - grpcDebug "serverRequestCall(R): doing delayed cleanup." - C.timespecDestroy deadline - free callPtr - C.metadataArrayDestroy metadataArrayPtr - free bbPtr - convertDeadline deadline = do + server cq@CompletionQueue{..} RegisteredMethod{..} = + withPermission Push cq $ + bracket (liftM2 (,) malloc malloc) + (\(p1,p2) -> free p1 >> free p2) + $ \(deadlinePtr, callPtr) -> + C.withByteBufferPtr $ \bbPtr -> + C.withMetadataArrayPtr $ \metadataArrayPtr -> do + metadataArray <- peek metadataArrayPtr + tag <- newTag cq + grpcDebug $ "serverRequestCall(R): tag is " ++ show tag + callError <- C.grpcServerRequestRegisteredCall + server methodHandle callPtr deadlinePtr + metadataArray bbPtr unsafeCQ unsafeCQ tag + grpcDebug $ "serverRequestCall(R): callError: " + ++ show callError + if callError /= C.CallOk + then do grpcDebug "serverRequestCall(R): callError. cleaning up" + return $ Left $ GRPCIOCallError callError + else do pluckResult <- pluck cq tag Nothing + grpcDebug $ "serverRequestCall(R): finished pluck:" + ++ show pluckResult + case pluckResult of + Left x -> do + grpcDebug "serverRequestCall(R): cleanup pluck err" + return $ Left x + Right () -> do + rawCall <- peek callPtr + deadline <- convertDeadline deadlinePtr + payload <- convertPayload bbPtr + meta <- convertMeta metadataArrayPtr + let assembledCall = ServerCall rawCall + meta + payload + Nothing + deadline + grpcDebug "serverRequestCall(R): About to return" + return $ Right assembledCall + where convertDeadline deadline = do --gRPC gives us a deadline that is just a delta, so we convert it --to a proper deadline. deadline' <- C.timeSpec <$> peek deadline now <- getTime Monotonic return $ now + deadline' + convertPayload bbPtr = do + -- TODO: the reason this returns @Maybe ByteString@ is because the + -- gRPC library calls the underlying out parameter + -- "optional_payload". I am not sure exactly in what cases it + -- won't be present. The C++ library checks a + -- has_request_payload_ bool and passes in nullptr to + -- request_registered_call if the bool is false, so we + -- may be able to do the payload present/absent check earlier. + bb@(C.ByteBuffer rawPtr) <- peek bbPtr + if rawPtr == nullPtr + then return Nothing + else do bs <- C.copyByteBufferToByteString bb + return $ Just bs + convertMeta requestMetadataRecv = do + mArray <- peek requestMetadataRecv + metamap <- C.getAllMetadataArray mArray + return metamap -- | Register the server's completion queue. Must be done before the server is -- started. diff --git a/src/Network/GRPC/LowLevel/CompletionQueue/Internal.hs b/src/Network/GRPC/LowLevel/CompletionQueue/Internal.hs index 6b9090f..2e64cd4 100644 --- a/src/Network/GRPC/LowLevel/CompletionQueue/Internal.hs +++ b/src/Network/GRPC/LowLevel/CompletionQueue/Internal.hs @@ -121,6 +121,7 @@ pluck cq@CompletionQueue{..} tag waitSeconds = do eventToError :: C.Event -> (Either GRPCIOError a) eventToError (C.Event C.QueueShutdown _ _) = Left GRPCIOShutdown eventToError (C.Event C.QueueTimeout _ _) = Left GRPCIOTimeout +eventToError (C.Event C.OpComplete False _) = Left GRPCIOTimeout eventToError _ = Left GRPCIOUnknownError -- | Returns true iff the given grpc_event was a success. diff --git a/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs b/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs index bc9d0fe..f7b28f6 100644 --- a/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs @@ -3,6 +3,7 @@ module Network.GRPC.LowLevel.CompletionQueue.Unregistered where import Control.Concurrent (forkIO) +import Control.Exception (bracket) import Foreign.Marshal.Alloc (free, malloc) import Foreign.Storable (peek) import Network.GRPC.LowLevel.Call @@ -31,50 +32,45 @@ channelCreateCall chan parent mask cq@CompletionQueue{..} meth endpt deadline = serverRequestCall :: C.Server -> CompletionQueue - -> TimeoutSeconds -> IO (Either GRPCIOError U.ServerCall) -serverRequestCall server cq@CompletionQueue{..} timeLimit = - withPermission Push cq $ do - callPtr <- malloc - grpcDebug $ "serverRequestCall: callPtr is " ++ show callPtr - callDetails <- C.createCallDetails - metadataArrayPtr <- C.metadataArrayCreate - metadataArray <- peek metadataArrayPtr - tag <- newTag cq - callError <- C.grpcServerRequestCall server callPtr callDetails - metadataArray unsafeCQ unsafeCQ tag - grpcDebug $ "serverRequestCall: callError was " ++ show callError - if callError /= C.CallOk - then do grpcDebug "serverRequestCall: got call error; cleaning up." - failureCleanup callPtr callDetails metadataArrayPtr - return $ Left $ GRPCIOCallError callError - else do pluckResult <- pluck cq tag (Just timeLimit) - grpcDebug $ "serverRequestCall: pluckResult was " - ++ show pluckResult - case pluckResult of - Left x -> do - grpcDebug "serverRequestCall: pluck error; cleaning up." - failureCleanup callPtr callDetails - metadataArrayPtr - return $ Left x - Right () -> do - rawCall <- peek callPtr - let call = U.ServerCall rawCall - metadataArrayPtr - Nothing - callDetails - return $ Right call +serverRequestCall server cq@CompletionQueue{..} = + withPermission Push cq $ + bracket malloc free $ \callPtr -> + C.withMetadataArrayPtr $ \metadataArrayPtr -> + C.withCallDetails $ \callDetails -> do + grpcDebug $ "serverRequestCall: callPtr is " ++ show callPtr + metadataArray <- peek metadataArrayPtr + tag <- newTag cq + callError <- C.grpcServerRequestCall server callPtr callDetails + metadataArray unsafeCQ unsafeCQ tag + grpcDebug $ "serverRequestCall: callError was " ++ show callError + if callError /= C.CallOk + then do grpcDebug "serverRequestCall: got call error; cleaning up." + return $ Left $ GRPCIOCallError callError + else do pluckResult <- pluck cq tag Nothing + grpcDebug $ "serverRequestCall: pluckResult was " + ++ show pluckResult + case pluckResult of + Left x -> do + grpcDebug "serverRequestCall: pluck error." + return $ Left x + Right () -> do + rawCall <- peek callPtr + metadata <- C.getAllMetadataArray metadataArray + deadline <- getDeadline callDetails + method <- getMethod callDetails + host <- getHost callDetails + let call = U.ServerCall rawCall + metadata + Nothing + deadline + method + host + return $ Right call - --TODO: the gRPC library appears to hold onto these pointers for a random - -- amount of time, even after returning from the only call that uses them. - -- This results in malloc errors if - -- gRPC tries to modify them after we free them. To work around it, - -- we sleep for a while before freeing the objects. We should find a - -- permanent solution that's more robust. - where failureCleanup callPtr callDetails metadataArrayPtr = forkIO $ do - threadDelaySecs 30 - grpcDebug "serverRequestCall: doing delayed cleanup." - free callPtr - C.destroyCallDetails callDetails - C.metadataArrayDestroy metadataArrayPtr - return () + where getDeadline callDetails = do + C.timeSpec <$> C.callDetailsGetDeadline callDetails + getMethod callDetails = + MethodName <$> C.callDetailsGetMethod callDetails + getHost callDetails = + Host <$> C.callDetailsGetHost callDetails diff --git a/src/Network/GRPC/LowLevel/Server.hs b/src/Network/GRPC/LowLevel/Server.hs index 80f401f..1bb52bc 100644 --- a/src/Network/GRPC/LowLevel/Server.hs +++ b/src/Network/GRPC/LowLevel/Server.hs @@ -126,18 +126,16 @@ serverRegisterMethod _ _ _ _ = error "Streaming methods not implemented yet." -- method. serverCreateCall :: Server -> RegisteredMethod - -> TimeoutSeconds -> IO (Either GRPCIOError ServerCall) -serverCreateCall Server{..} rm timeLimit = - serverRequestCall internalServer serverCQ timeLimit rm +serverCreateCall Server{..} rm = + serverRequestCall internalServer serverCQ rm withServerCall :: Server -> RegisteredMethod - -> TimeoutSeconds -> (ServerCall -> IO (Either GRPCIOError a)) -> IO (Either GRPCIOError a) -withServerCall server regmethod timeout f = do - createResult <- serverCreateCall server regmethod timeout +withServerCall server regmethod f = do + createResult <- serverCreateCall server regmethod case createResult of Left x -> return $ Left x Right call -> f call `finally` logDestroy call @@ -177,22 +175,21 @@ type ServerHandler -- | Wait for and then handle a normal (non-streaming) call. serverHandleNormalCall :: Server -> RegisteredMethod - -> TimeoutSeconds -> MetadataMap -- ^ Initial server metadata -> ServerHandler -> IO (Either GRPCIOError ()) -serverHandleNormalCall s@Server{..} rm timeLimit initMeta f = do - withServerCall s rm timeLimit $ \call -> do +serverHandleNormalCall s@Server{..} rm initMeta f = do + withServerCall s rm $ \call -> do grpcDebug "serverHandleNormalCall(R): starting batch." debugServerCall call - payload <- serverCallGetPayload call + let payload = optionalPayload call case payload of --TODO: what should we do with an empty payload? Have the handler take -- @Maybe ByteString@? Need to figure out when/why payload would be empty. Nothing -> error "serverHandleNormalCall(R): payload empty." Just requestBody -> do - requestMeta <- serverCallGetMetadata call + let requestMeta = requestMetadataRecv call (respBody, trailingMeta, status, details) <- f call requestBody requestMeta diff --git a/src/Network/GRPC/LowLevel/Server/Unregistered.hs b/src/Network/GRPC/LowLevel/Server/Unregistered.hs index 66d4688..ccba940 100644 --- a/src/Network/GRPC/LowLevel/Server/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/Server/Unregistered.hs @@ -13,16 +13,15 @@ import Network.GRPC.LowLevel.Op (Op(..), OpR import Network.GRPC.LowLevel.Server (Server (..)) import qualified Network.GRPC.Unsafe.Op as C -serverCreateCall :: Server -> TimeoutSeconds - -> IO (Either GRPCIOError ServerCall) -serverCreateCall Server{..} timeLimit = - serverRequestCall internalServer serverCQ timeLimit +serverCreateCall :: Server -> IO (Either GRPCIOError ServerCall) +serverCreateCall Server{..} = + serverRequestCall internalServer serverCQ -withServerCall :: Server -> TimeoutSeconds +withServerCall :: Server -> (ServerCall -> IO (Either GRPCIOError a)) -> IO (Either GRPCIOError a) -withServerCall server timeout f = do - createResult <- serverCreateCall server timeout +withServerCall server f = do + createResult <- serverCreateCall server case createResult of Left x -> return $ Left x Right call -> f call `finally` logDestroy call @@ -52,37 +51,29 @@ serverOpsSendNormalResponse body metadata code details = -- | A handler for an unregistered server call; bytestring arguments are the -- request body and response body respectively. type ServerHandler - = ServerCall -> ByteString -> MetadataMap -> MethodName + = ServerCall -> ByteString -> IO (ByteString, MetadataMap, C.StatusCode, StatusDetails) -- | Handle one unregistered call. serverHandleNormalCall :: Server - -> TimeoutSeconds -> MetadataMap -- ^ Initial server metadata. -> ServerHandler -> IO (Either GRPCIOError ()) -serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do - withServerCall s timeLimit $ \call -> do +serverHandleNormalCall s@Server{..} srvMetadata f = do + withServerCall s $ \call@ServerCall{..} -> do grpcDebug "serverHandleNormalCall(U): starting batch." let recvOps = serverOpsGetNormalCall srvMetadata - call' = unServerCall call - opResults <- runOps call' serverCQ recvOps + opResults <- runOps unServerCall serverCQ recvOps case opResults of Left x -> do grpcDebug "serverHandleNormalCall(U): ops failed; aborting" return $ Left x Right [OpRecvMessageResult (Just body)] -> do - requestMeta <- serverCallGetMetadata call - grpcDebug $ "got client metadata: " ++ show requestMeta - methodName <- serverCallGetMethodName call - hostName <- serverCallGetHost call - grpcDebug $ "call_details host is: " ++ show hostName - (respBody, respMetadata, status, details) <- f call - body - requestMeta - methodName + grpcDebug $ "got client metadata: " ++ show requestMetadataRecv + grpcDebug $ "call_details host is: " ++ show callHost + (respBody, respMetadata, status, details) <- f call body let respOps = serverOpsSendNormalResponse respBody respMetadata status details - respOpsResults <- runOps call' serverCQ respOps + respOpsResults <- runOps unServerCall serverCQ respOps case respOpsResults of Left x -> do grpcDebug "serverHandleNormalCall(U): resp failed." return $ Left x diff --git a/src/Network/GRPC/Unsafe.chs b/src/Network/GRPC/Unsafe.chs index 4c8cc45..87f4a14 100644 --- a/src/Network/GRPC/Unsafe.chs +++ b/src/Network/GRPC/Unsafe.chs @@ -2,6 +2,7 @@ module Network.GRPC.Unsafe where +import Control.Exception (bracket) import Control.Monad import Foreign.C.Types @@ -46,6 +47,9 @@ deriving instance Show CallDetails {#fun create_call_details as ^ {} -> `CallDetails'#} {#fun destroy_call_details as ^ {`CallDetails'} -> `()'#} +withCallDetails :: (CallDetails -> IO a) -> IO a +withCallDetails = bracket createCallDetails destroyCallDetails + -- instance def adapted from -- https://mail.haskell.org/pipermail/c2hs/2007-June/000800.html instance Storable Call where diff --git a/tests/LowLevelTests.hs b/tests/LowLevelTests.hs index f472324..004d0e6 100644 --- a/tests/LowLevelTests.hs +++ b/tests/LowLevelTests.hs @@ -11,6 +11,7 @@ import Control.Monad import Data.ByteString (ByteString) import qualified Data.Map as M import Network.GRPC.LowLevel +import qualified Network.GRPC.LowLevel.Call.Unregistered as U import qualified Network.GRPC.LowLevel.Client.Unregistered as U import qualified Network.GRPC.LowLevel.Server.Unregistered as U import Test.Tasty @@ -29,9 +30,6 @@ lowLevelTests = testGroup "Unit tests of low-level Haskell library" , testClientCall , testClientTimeoutNoServer , testServerCreateDestroy - , testServerCall - , testServerTimeoutNoClient - , testWrongEndpoint , testMixRegisteredUnregistered , testPayload , testPayloadUnregistered @@ -65,42 +63,12 @@ testClientTimeoutNoServer = clientOnlyTest "request timeout when server DNE" $ \c -> do rm <- clientRegisterMethod c "/foo" Normal r <- clientRequest c rm 1 "Hello" mempty - r @?= Left GRPCIOUnknownError + r @?= Left GRPCIOTimeout testServerCreateDestroy :: TestTree testServerCreateDestroy = serverOnlyTest "start/stop" [] nop -testServerCall :: TestTree -testServerCall = - serverOnlyTest "create/destroy call" [] $ \s -> do - r <- U.withServerCall s 1 $ const $ return $ Right () - r @?= Left GRPCIOTimeout - -testServerTimeoutNoClient :: TestTree -testServerTimeoutNoClient = - serverOnlyTest "wait timeout when client DNE" [("/foo", Normal)] $ \s -> do - let rm = head (registeredMethods s) - r <- serverHandleNormalCall s rm 1 mempty dummyHandler - r @?= Left GRPCIOTimeout - -testWrongEndpoint :: TestTree -testWrongEndpoint = - csTest "client requests unknown endpoint" client server [("/foo", Normal)] - where - -- TODO: possibly factor out dead-simple client/server construction even - -- further - client c = do - rm <- clientRegisterMethod c "/bar" Normal - r <- clientRequest c rm 1 "Hello!" mempty - r @?= Left (GRPCIOBadStatusCode StatusDeadlineExceeded - (StatusDetails "Deadline Exceeded")) - server s = do - length (registeredMethods s) @?= 1 - let rm = head (registeredMethods s) - r <- serverHandleNormalCall s rm 2 mempty dummyHandler - r @?= Left GRPCIOTimeout - testMixRegisteredUnregistered :: TestTree testMixRegisteredUnregistered = csTest "server uses unregistered calls to handle unknown endpoints" @@ -119,28 +87,21 @@ testMixRegisteredUnregistered = clientRequest c rm2 1 "bad endpoint" mempty >>= do checkReqRslt $ \NormalRequestResult{..} -> do rspBody @?= "" - r3 <- clientRequest c rm1 1 "Hello" mempty - r3 @?= deadlineExceededStatus return () server s = do concurrently regThread unregThread return () where regThread = do let rm = head (registeredMethods s) - r <- serverHandleNormalCall s rm 2 dummyMeta $ \_ body _ -> do + r <- serverHandleNormalCall s rm dummyMeta $ \_ body _ -> do body @?= "Hello" return ("reply test", dummyMeta, StatusOk, StatusDetails "") return () unregThread = do - r1 <- U.serverHandleNormalCall s 2 mempty $ \_ _ _ method -> do - method @?= "/bar" + r1 <- U.serverHandleNormalCall s mempty $ \call _ -> do + U.callMethod call @?= "/bar" return ("", mempty, StatusOk, StatusDetails "Wrong endpoint") - r2 <- U.serverHandleNormalCall s 2 mempty $ \_ _ _ method -> do - method @?= "/bar" - return ("", mempty, StatusNotFound, - StatusDetails "Wrong endpoint") - r2 @?= Left GRPCIOTimeout return () -- TODO: There seems to be a race here (and in other client/server pairs, of @@ -166,7 +127,7 @@ testPayload = server s = do length (registeredMethods s) @?= 1 let rm = head (registeredMethods s) - r <- serverHandleNormalCall s rm 11 dummyMeta $ \_ reqBody reqMD -> do + r <- serverHandleNormalCall s rm dummyMeta $ \_ reqBody reqMD -> do reqBody @?= "Hello!" checkMD "Server metadata mismatch" clientMD reqMD return ("reply test", dummyMeta, StatusOk, @@ -185,9 +146,9 @@ testServerCancel = "Received RST_STREAM err=8")) server s = do let rm = head (registeredMethods s) - r <- serverHandleNormalCall s rm 10 mempty $ \c _ _ -> do + r <- serverHandleNormalCall s rm mempty $ \c _ _ -> do serverCallCancel c StatusCancelled "" - return (mempty, mempty, StatusOk, "") + return (mempty, mempty, StatusCancelled, "") r @?= Right () testPayloadUnregistered :: TestTree @@ -201,9 +162,9 @@ testPayloadUnregistered = rspBody @?= "reply test" details @?= "details string" server s = do - r <- U.serverHandleNormalCall s 11 mempty $ \_ body _md meth -> do + r <- U.serverHandleNormalCall s mempty $ \U.ServerCall{..} body -> do body @?= "Hello!" - meth @?= "/foo" + callMethod @?= "/foo" return ("reply test", mempty, StatusOk, "details string") r @?= Right () @@ -223,10 +184,12 @@ testGoaway = lastResult == unavailableStatus || lastResult == deadlineExceededStatus + || + lastResult == Left GRPCIOTimeout server s = do let rm = head (registeredMethods s) - serverHandleNormalCall s rm 11 mempty dummyHandler - serverHandleNormalCall s rm 11 mempty dummyHandler + serverHandleNormalCall s rm mempty dummyHandler + serverHandleNormalCall s rm mempty dummyHandler return () testSlowServer :: TestTree @@ -239,7 +202,7 @@ testSlowServer = result @?= deadlineExceededStatus server s = do let rm = head (registeredMethods s) - serverHandleNormalCall s rm 1 mempty $ \_ _ _ -> do + serverHandleNormalCall s rm mempty $ \_ _ _ -> do threadDelay (2*10^(6 :: Int)) return ("", mempty, StatusOk, StatusDetails "") return () @@ -254,7 +217,7 @@ testServerCallExpirationCheck = return () server s = do let rm = head (registeredMethods s) - serverHandleNormalCall s rm 5 mempty $ \c _ _ -> do + serverHandleNormalCall s rm mempty $ \c _ _ -> do exp1 <- serverCallIsExpired c assertBool "Call isn't expired when handler starts" $ not exp1 threadDelaySecs 1 @@ -263,7 +226,7 @@ testServerCallExpirationCheck = threadDelaySecs 3 exp3 <- serverCallIsExpired c assertBool "Call is expired after 4 seconds" exp3 - return ("", mempty, StatusDetails "") + return ("", mempty, StatusCancelled, StatusDetails "") return () -------------------------------------------------------------------------------- diff --git a/tests/LowLevelTests/Op.hs b/tests/LowLevelTests/Op.hs index a27fb8f..a031bfa 100644 --- a/tests/LowLevelTests/Op.hs +++ b/tests/LowLevelTests/Op.hs @@ -84,7 +84,7 @@ withClientServerUnaryCall grpc f = do -- created. If later we want to send payloads or metadata, we'll need -- to tweak this. clientRes <- runOps (unClientCall cc) (clientCQ c) clientEmptySendOps - withServerCall s srm 10 $ \sc -> + withServerCall s srm $ \sc -> f (c, s, cc, sc) serverConf = (ServerConfig "localhost" 50051 [("/foo", Normal)])