serverRequestCall: block until request received (#30)

* fix error for client timeouts

* registered calls: block until call received, simplify cleanup

* unregistered calls: block until request received

* Fix up tests
This commit is contained in:
Connor Clark 2016-06-22 10:41:14 -07:00
parent f6e244912a
commit ff73b5eb5a
11 changed files with 160 additions and 241 deletions

View file

@ -1,5 +1,6 @@
{-# LANGUAGE OverloadedLists #-} {-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# OPTIONS_GHC -fno-warn-missing-signatures #-} {-# OPTIONS_GHC -fno-warn-missing-signatures #-}
{-# OPTIONS_GHC -fno-warn-unused-binds #-} {-# OPTIONS_GHC -fno-warn-unused-binds #-}
@ -13,9 +14,10 @@ import qualified Network.GRPC.LowLevel.Call.Unregistered as U
serverMeta :: MetadataMap serverMeta :: MetadataMap
serverMeta = [("test_meta", "test_meta_value")] serverMeta = [("test_meta", "test_meta_value")]
handler :: U.ServerCall -> ByteString -> MetadataMap -> MethodName handler :: U.ServerCall
-> ByteString
-> IO (ByteString, MetadataMap, StatusCode, StatusDetails) -> IO (ByteString, MetadataMap, StatusCode, StatusDetails)
handler _call reqBody _reqMeta _method = do handler U.ServerCall{..} reqBody = do
--putStrLn $ "Got request for method: " ++ show method --putStrLn $ "Got request for method: " ++ show method
--putStrLn $ "Got metadata: " ++ show reqMeta --putStrLn $ "Got metadata: " ++ show reqMeta
return (reqBody, serverMeta, StatusOk, StatusDetails "") return (reqBody, serverMeta, StatusOk, StatusDetails "")
@ -23,7 +25,7 @@ handler _call reqBody _reqMeta _method = do
unregMain :: IO () unregMain :: IO ()
unregMain = withGRPC $ \grpc -> do unregMain = withGRPC $ \grpc -> do
withServer grpc (ServerConfig "localhost" 50051 []) $ \server -> forever $ do withServer grpc (ServerConfig "localhost" 50051 []) $ \server -> forever $ do
result <- U.serverHandleNormalCall server 15 serverMeta handler result <- U.serverHandleNormalCall server serverMeta handler
case result of case result of
Left x -> putStrLn $ "handle call result error: " ++ show x Left x -> putStrLn $ "handle call result error: " ++ show x
Right _ -> return () Right _ -> return ()
@ -34,7 +36,7 @@ regMain = withGRPC $ \grpc -> do
withServer grpc (ServerConfig "localhost" 50051 methods) $ \server -> withServer grpc (ServerConfig "localhost" 50051 methods) $ \server ->
forever $ do forever $ do
let method = head (registeredMethods server) let method = head (registeredMethods server)
result <- serverHandleNormalCall server method 15 serverMeta $ result <- serverHandleNormalCall server method serverMeta $
\_call reqBody _reqMeta -> return (reqBody, serverMeta, StatusOk, \_call reqBody _reqMeta -> return (reqBody, serverMeta, StatusOk,
StatusDetails "") StatusDetails "")
case result of case result of
@ -44,7 +46,7 @@ regMain = withGRPC $ \grpc -> do
-- | loop to fork n times -- | loop to fork n times
regLoop :: Server -> RegisteredMethod -> IO () regLoop :: Server -> RegisteredMethod -> IO ()
regLoop server method = forever $ do regLoop server method = forever $ do
result <- serverHandleNormalCall server method 15 serverMeta $ result <- serverHandleNormalCall server method serverMeta $
\_call reqBody _reqMeta -> return (reqBody, serverMeta, StatusOk, \_call reqBody _reqMeta -> return (reqBody, serverMeta, StatusOk,
StatusDetails "") StatusDetails "")
case result of case result of

View file

@ -65,10 +65,9 @@ clientCallCancel cc = C.grpcCallCancel (unClientCall cc) C.reserved
-- the C state needed to respond to a registered call. -- the C state needed to respond to a registered call.
data ServerCall = ServerCall data ServerCall = ServerCall
{ unServerCall :: C.Call, { unServerCall :: C.Call,
requestMetadataRecv :: Ptr C.MetadataArray, requestMetadataRecv :: MetadataMap,
optionalPayload :: Ptr C.ByteBuffer, optionalPayload :: Maybe ByteString,
parentPtr :: Maybe (Ptr C.Call), parentPtr :: Maybe (Ptr C.Call),
callDeadlinePtr :: C.CTimeSpecPtr,
callDeadline :: TimeSpec callDeadline :: TimeSpec
} }
@ -76,24 +75,6 @@ serverCallCancel :: ServerCall -> C.StatusCode -> String -> IO ()
serverCallCancel sc code reason = serverCallCancel sc code reason =
C.grpcCallCancelWithStatus (unServerCall sc) code reason C.reserved C.grpcCallCancelWithStatus (unServerCall sc) code reason C.reserved
serverCallGetMetadata :: ServerCall -> IO MetadataMap
serverCallGetMetadata ServerCall{..} = do
marray <- peek requestMetadataRecv
C.getAllMetadataArray marray
-- | Extract the client request body from the given call, if present. TODO: the
-- reason this returns @Maybe ByteString@ is because the gRPC library calls the
-- underlying out parameter "optional_payload". I am not sure exactly in what
-- cases it won't be present. The C++ library checks a has_request_payload_ bool
-- and passes in nullptr to request_registered_call if the bool is false, so we
-- may be able to do the payload present/absent check earlier.
serverCallGetPayload :: ServerCall -> IO (Maybe ByteString)
serverCallGetPayload ServerCall{..} = do
bb@(C.ByteBuffer rawPtr) <- peek optionalPayload
if rawPtr == nullPtr
then return Nothing
else Just <$> C.copyByteBufferToByteString bb
serverCallIsExpired :: ServerCall -> IO Bool serverCallIsExpired :: ServerCall -> IO Bool
serverCallIsExpired sc = do serverCallIsExpired sc = do
currTime <- getTime Monotonic currTime <- getTime Monotonic
@ -110,24 +91,16 @@ debugClientCall = const $ return ()
debugServerCall :: ServerCall -> IO () debugServerCall :: ServerCall -> IO ()
#ifdef DEBUG #ifdef DEBUG
debugServerCall call@(ServerCall (C.Call ptr) _ _ _ _ _) = do debugServerCall call@(ServerCall (C.Call ptr) _ _ _ _) = do
grpcDebug $ "debugServerCall(R): server call: " ++ (show ptr) grpcDebug $ "debugServerCall(R): server call: " ++ (show ptr)
grpcDebug $ "debugServerCall(R): metadata ptr: " grpcDebug $ "debugServerCall(R): metadata ptr: "
++ show (requestMetadataRecv call) ++ show (requestMetadataRecv call)
metadataArr <- peek (requestMetadataRecv call)
metadata <- C.getAllMetadataArray metadataArr
grpcDebug $ "debugServerCall(R): metadata received: " ++ (show metadata)
grpcDebug $ "debugServerCall(R): payload ptr: " ++ show (optionalPayload call) grpcDebug $ "debugServerCall(R): payload ptr: " ++ show (optionalPayload call)
payload <- peek (optionalPayload call)
bs <- C.copyByteBufferToByteString payload
grpcDebug $ "debugServerCall(R): payload contents: " ++ show bs
forM_ (parentPtr call) $ \parentPtr' -> do forM_ (parentPtr call) $ \parentPtr' -> do
grpcDebug $ "debugServerCall(R): parent ptr: " ++ show parentPtr' grpcDebug $ "debugServerCall(R): parent ptr: " ++ show parentPtr'
(C.Call parent) <- peek parentPtr' (C.Call parent) <- peek parentPtr'
grpcDebug $ "debugServerCall(R): parent: " ++ show parent grpcDebug $ "debugServerCall(R): parent: " ++ show parent
grpcDebug $ "debugServerCall(R): deadline ptr: " ++ show (callDeadline call) grpcDebug $ "debugServerCall(R): deadline ptr: " ++ show (callDeadline call)
timespec <- peek (callDeadlinePtr call)
grpcDebug $ "debugServerCall(R): deadline: " ++ show (C.timeSpec timespec)
#else #else
{-# INLINE debugServerCall #-} {-# INLINE debugServerCall #-}
debugServerCall = const $ return () debugServerCall = const $ return ()
@ -144,11 +117,5 @@ destroyServerCall call@ServerCall{..} = do
debugServerCall call debugServerCall call
grpcDebug $ "Destroying server-side call object: " ++ show unServerCall grpcDebug $ "Destroying server-side call object: " ++ show unServerCall
C.grpcCallDestroy unServerCall C.grpcCallDestroy unServerCall
grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecv
C.metadataArrayDestroy requestMetadataRecv
grpcDebug $ "destroying optional payload" ++ show optionalPayload
C.destroyReceivingByteBuffer optionalPayload
grpcDebug $ "freeing parentPtr: " ++ show parentPtr grpcDebug $ "freeing parentPtr: " ++ show parentPtr
forM_ parentPtr free forM_ parentPtr free
grpcDebug $ "destroying deadline." ++ show callDeadline
C.timespecDestroy callDeadlinePtr

View file

@ -6,6 +6,8 @@ import Control.Monad
import Foreign.Marshal.Alloc (free) import Foreign.Marshal.Alloc (free)
import Foreign.Ptr (Ptr) import Foreign.Ptr (Ptr)
import Foreign.Storable (peek) import Foreign.Storable (peek)
import System.Clock (TimeSpec)
import Network.GRPC.LowLevel.Call (Host (..), MethodName (..)) import Network.GRPC.LowLevel.Call (Host (..), MethodName (..))
import Network.GRPC.LowLevel.GRPC (MetadataMap, grpcDebug) import Network.GRPC.LowLevel.GRPC (MetadataMap, grpcDebug)
import qualified Network.GRPC.Unsafe as C import qualified Network.GRPC.Unsafe as C
@ -17,28 +19,17 @@ import qualified Network.GRPC.Unsafe.Op as C
-- call. -- call.
data ServerCall = ServerCall data ServerCall = ServerCall
{ unServerCall :: C.Call { unServerCall :: C.Call
, requestMetadataRecv :: Ptr C.MetadataArray , requestMetadataRecv :: MetadataMap
, parentPtr :: Maybe (Ptr C.Call) , parentPtr :: Maybe (Ptr C.Call)
, callDetails :: C.CallDetails , callDeadline :: TimeSpec
, callMethod :: MethodName
, callHost :: Host
} }
serverCallCancel :: ServerCall -> C.StatusCode -> String -> IO () serverCallCancel :: ServerCall -> C.StatusCode -> String -> IO ()
serverCallCancel sc code reason = serverCallCancel sc code reason =
C.grpcCallCancelWithStatus (unServerCall sc) code reason C.reserved C.grpcCallCancelWithStatus (unServerCall sc) code reason C.reserved
serverCallGetMetadata :: ServerCall -> IO MetadataMap
serverCallGetMetadata ServerCall{..} = do
marray <- peek requestMetadataRecv
C.getAllMetadataArray marray
serverCallGetMethodName :: ServerCall -> IO MethodName
serverCallGetMethodName ServerCall{..} =
MethodName <$> C.callDetailsGetMethod callDetails
serverCallGetHost :: ServerCall -> IO Host
serverCallGetHost ServerCall{..} =
Host <$> C.callDetailsGetHost callDetails
debugServerCall :: ServerCall -> IO () debugServerCall :: ServerCall -> IO ()
#ifdef DEBUG #ifdef DEBUG
debugServerCall call@(ServerCall (C.Call ptr) _ _ _) = do debugServerCall call@(ServerCall (C.Call ptr) _ _ _) = do
@ -66,9 +57,5 @@ destroyServerCall call@ServerCall{..} = do
debugServerCall call debugServerCall call
grpcDebug $ "Destroying server-side call object: " ++ show unServerCall grpcDebug $ "Destroying server-side call object: " ++ show unServerCall
C.grpcCallDestroy unServerCall C.grpcCallDestroy unServerCall
grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecv
C.metadataArrayDestroy requestMetadataRecv
grpcDebug $ "freeing parentPtr: " ++ show parentPtr grpcDebug $ "freeing parentPtr: " ++ show parentPtr
forM_ parentPtr free forM_ parentPtr free
grpcDebug $ "destroying call details: " ++ show callDetails
C.destroyCallDetails callDetails

View file

@ -34,6 +34,7 @@ import Control.Concurrent.STM (atomically, check)
import Control.Concurrent.STM.TVar (newTVarIO, readTVar, import Control.Concurrent.STM.TVar (newTVarIO, readTVar,
writeTVar) writeTVar)
import Control.Exception (bracket) import Control.Exception (bracket)
import Control.Monad (liftM2)
import Data.IORef (newIORef) import Data.IORef (newIORef)
import Data.List (intersperse) import Data.List (intersperse)
import Foreign.Marshal.Alloc (free, malloc) import Foreign.Marshal.Alloc (free, malloc)
@ -51,6 +52,8 @@ import System.Timeout (timeout)
import Network.GRPC.LowLevel.Call import Network.GRPC.LowLevel.Call
import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.GRPC
import Network.GRPC.LowLevel.CompletionQueue.Internal import Network.GRPC.LowLevel.CompletionQueue.Internal
import qualified Network.GRPC.Unsafe.ByteBuffer as C
import qualified Network.GRPC.Unsafe.Metadata as C
withCompletionQueue :: GRPC -> (CompletionQueue -> IO a) -> IO a withCompletionQueue :: GRPC -> (CompletionQueue -> IO a) -> IO a
withCompletionQueue grpc = bracket (createCompletionQueue grpc) withCompletionQueue grpc = bracket (createCompletionQueue grpc)
@ -131,61 +134,69 @@ channelCreateCall
-- | Create the call object to handle a registered call. -- | Create the call object to handle a registered call.
serverRequestCall :: C.Server serverRequestCall :: C.Server
-> CompletionQueue -> CompletionQueue
-> TimeoutSeconds
-> RegisteredMethod -> RegisteredMethod
-> IO (Either GRPCIOError ServerCall) -> IO (Either GRPCIOError ServerCall)
serverRequestCall serverRequestCall
server cq@CompletionQueue{..} timeLimit RegisteredMethod{..} = server cq@CompletionQueue{..} RegisteredMethod{..} =
withPermission Push cq $ do withPermission Push cq $
-- NOTE: the below stuff is freed when we free the call we return. bracket (liftM2 (,) malloc malloc)
deadlinePtr <- malloc (\(p1,p2) -> free p1 >> free p2)
callPtr <- malloc $ \(deadlinePtr, callPtr) ->
metadataArrayPtr <- C.metadataArrayCreate C.withByteBufferPtr $ \bbPtr ->
metadataArray <- peek metadataArrayPtr C.withMetadataArrayPtr $ \metadataArrayPtr -> do
bbPtr <- malloc metadataArray <- peek metadataArrayPtr
tag <- newTag cq tag <- newTag cq
grpcDebug $ "serverRequestCall(R): tag is " ++ show tag grpcDebug $ "serverRequestCall(R): tag is " ++ show tag
callError <- C.grpcServerRequestRegisteredCall callError <- C.grpcServerRequestRegisteredCall
server methodHandle callPtr deadlinePtr server methodHandle callPtr deadlinePtr
metadataArray bbPtr unsafeCQ unsafeCQ tag metadataArray bbPtr unsafeCQ unsafeCQ tag
grpcDebug $ "serverRequestCall(R): callError: " grpcDebug $ "serverRequestCall(R): callError: "
++ show callError ++ show callError
if callError /= C.CallOk if callError /= C.CallOk
then do grpcDebug "serverRequestCall(R): callError. cleaning up" then do grpcDebug "serverRequestCall(R): callError. cleaning up"
failureCleanup deadlinePtr callPtr metadataArrayPtr bbPtr return $ Left $ GRPCIOCallError callError
return $ Left $ GRPCIOCallError callError else do pluckResult <- pluck cq tag Nothing
else do pluckResult <- pluck cq tag (Just timeLimit) grpcDebug $ "serverRequestCall(R): finished pluck:"
grpcDebug "serverRequestCall(R): finished pluck." ++ show pluckResult
case pluckResult of case pluckResult of
Left x -> do Left x -> do
grpcDebug "serverRequestCall(R): cleanup pluck err" grpcDebug "serverRequestCall(R): cleanup pluck err"
failureCleanup deadlinePtr callPtr metadataArrayPtr bbPtr return $ Left x
return $ Left x Right () -> do
Right () -> do rawCall <- peek callPtr
rawCall <- peek callPtr deadline <- convertDeadline deadlinePtr
deadline <- convertDeadline deadlinePtr payload <- convertPayload bbPtr
let assembledCall = ServerCall rawCall metadataArrayPtr meta <- convertMeta metadataArrayPtr
bbPtr Nothing deadlinePtr deadline let assembledCall = ServerCall rawCall
return $ Right assembledCall meta
--TODO: the gRPC library appears to hold onto these pointers for a random payload
-- amount of time, even after returning from the only call that uses them. Nothing
-- This results in malloc errors if deadline
-- gRPC tries to modify them after we free them. To work around it, grpcDebug "serverRequestCall(R): About to return"
-- we sleep for a while before freeing the objects. We should find a return $ Right assembledCall
-- permanent solution that's more robust. where convertDeadline deadline = do
where failureCleanup deadline callPtr metadataArrayPtr bbPtr = forkIO $ do
threadDelaySecs 30
grpcDebug "serverRequestCall(R): doing delayed cleanup."
C.timespecDestroy deadline
free callPtr
C.metadataArrayDestroy metadataArrayPtr
free bbPtr
convertDeadline deadline = do
--gRPC gives us a deadline that is just a delta, so we convert it --gRPC gives us a deadline that is just a delta, so we convert it
--to a proper deadline. --to a proper deadline.
deadline' <- C.timeSpec <$> peek deadline deadline' <- C.timeSpec <$> peek deadline
now <- getTime Monotonic now <- getTime Monotonic
return $ now + deadline' return $ now + deadline'
convertPayload bbPtr = do
-- TODO: the reason this returns @Maybe ByteString@ is because the
-- gRPC library calls the underlying out parameter
-- "optional_payload". I am not sure exactly in what cases it
-- won't be present. The C++ library checks a
-- has_request_payload_ bool and passes in nullptr to
-- request_registered_call if the bool is false, so we
-- may be able to do the payload present/absent check earlier.
bb@(C.ByteBuffer rawPtr) <- peek bbPtr
if rawPtr == nullPtr
then return Nothing
else do bs <- C.copyByteBufferToByteString bb
return $ Just bs
convertMeta requestMetadataRecv = do
mArray <- peek requestMetadataRecv
metamap <- C.getAllMetadataArray mArray
return metamap
-- | Register the server's completion queue. Must be done before the server is -- | Register the server's completion queue. Must be done before the server is
-- started. -- started.

View file

@ -121,6 +121,7 @@ pluck cq@CompletionQueue{..} tag waitSeconds = do
eventToError :: C.Event -> (Either GRPCIOError a) eventToError :: C.Event -> (Either GRPCIOError a)
eventToError (C.Event C.QueueShutdown _ _) = Left GRPCIOShutdown eventToError (C.Event C.QueueShutdown _ _) = Left GRPCIOShutdown
eventToError (C.Event C.QueueTimeout _ _) = Left GRPCIOTimeout eventToError (C.Event C.QueueTimeout _ _) = Left GRPCIOTimeout
eventToError (C.Event C.OpComplete False _) = Left GRPCIOTimeout
eventToError _ = Left GRPCIOUnknownError eventToError _ = Left GRPCIOUnknownError
-- | Returns true iff the given grpc_event was a success. -- | Returns true iff the given grpc_event was a success.

View file

@ -3,6 +3,7 @@
module Network.GRPC.LowLevel.CompletionQueue.Unregistered where module Network.GRPC.LowLevel.CompletionQueue.Unregistered where
import Control.Concurrent (forkIO) import Control.Concurrent (forkIO)
import Control.Exception (bracket)
import Foreign.Marshal.Alloc (free, malloc) import Foreign.Marshal.Alloc (free, malloc)
import Foreign.Storable (peek) import Foreign.Storable (peek)
import Network.GRPC.LowLevel.Call import Network.GRPC.LowLevel.Call
@ -31,50 +32,45 @@ channelCreateCall chan parent mask cq@CompletionQueue{..} meth endpt deadline =
serverRequestCall :: C.Server serverRequestCall :: C.Server
-> CompletionQueue -> CompletionQueue
-> TimeoutSeconds
-> IO (Either GRPCIOError U.ServerCall) -> IO (Either GRPCIOError U.ServerCall)
serverRequestCall server cq@CompletionQueue{..} timeLimit = serverRequestCall server cq@CompletionQueue{..} =
withPermission Push cq $ do withPermission Push cq $
callPtr <- malloc bracket malloc free $ \callPtr ->
grpcDebug $ "serverRequestCall: callPtr is " ++ show callPtr C.withMetadataArrayPtr $ \metadataArrayPtr ->
callDetails <- C.createCallDetails C.withCallDetails $ \callDetails -> do
metadataArrayPtr <- C.metadataArrayCreate grpcDebug $ "serverRequestCall: callPtr is " ++ show callPtr
metadataArray <- peek metadataArrayPtr metadataArray <- peek metadataArrayPtr
tag <- newTag cq tag <- newTag cq
callError <- C.grpcServerRequestCall server callPtr callDetails callError <- C.grpcServerRequestCall server callPtr callDetails
metadataArray unsafeCQ unsafeCQ tag metadataArray unsafeCQ unsafeCQ tag
grpcDebug $ "serverRequestCall: callError was " ++ show callError grpcDebug $ "serverRequestCall: callError was " ++ show callError
if callError /= C.CallOk if callError /= C.CallOk
then do grpcDebug "serverRequestCall: got call error; cleaning up." then do grpcDebug "serverRequestCall: got call error; cleaning up."
failureCleanup callPtr callDetails metadataArrayPtr return $ Left $ GRPCIOCallError callError
return $ Left $ GRPCIOCallError callError else do pluckResult <- pluck cq tag Nothing
else do pluckResult <- pluck cq tag (Just timeLimit) grpcDebug $ "serverRequestCall: pluckResult was "
grpcDebug $ "serverRequestCall: pluckResult was " ++ show pluckResult
++ show pluckResult case pluckResult of
case pluckResult of Left x -> do
Left x -> do grpcDebug "serverRequestCall: pluck error."
grpcDebug "serverRequestCall: pluck error; cleaning up." return $ Left x
failureCleanup callPtr callDetails Right () -> do
metadataArrayPtr rawCall <- peek callPtr
return $ Left x metadata <- C.getAllMetadataArray metadataArray
Right () -> do deadline <- getDeadline callDetails
rawCall <- peek callPtr method <- getMethod callDetails
let call = U.ServerCall rawCall host <- getHost callDetails
metadataArrayPtr let call = U.ServerCall rawCall
Nothing metadata
callDetails Nothing
return $ Right call deadline
method
host
return $ Right call
--TODO: the gRPC library appears to hold onto these pointers for a random where getDeadline callDetails = do
-- amount of time, even after returning from the only call that uses them. C.timeSpec <$> C.callDetailsGetDeadline callDetails
-- This results in malloc errors if getMethod callDetails =
-- gRPC tries to modify them after we free them. To work around it, MethodName <$> C.callDetailsGetMethod callDetails
-- we sleep for a while before freeing the objects. We should find a getHost callDetails =
-- permanent solution that's more robust. Host <$> C.callDetailsGetHost callDetails
where failureCleanup callPtr callDetails metadataArrayPtr = forkIO $ do
threadDelaySecs 30
grpcDebug "serverRequestCall: doing delayed cleanup."
free callPtr
C.destroyCallDetails callDetails
C.metadataArrayDestroy metadataArrayPtr
return ()

View file

@ -126,18 +126,16 @@ serverRegisterMethod _ _ _ _ = error "Streaming methods not implemented yet."
-- method. -- method.
serverCreateCall :: Server serverCreateCall :: Server
-> RegisteredMethod -> RegisteredMethod
-> TimeoutSeconds
-> IO (Either GRPCIOError ServerCall) -> IO (Either GRPCIOError ServerCall)
serverCreateCall Server{..} rm timeLimit = serverCreateCall Server{..} rm =
serverRequestCall internalServer serverCQ timeLimit rm serverRequestCall internalServer serverCQ rm
withServerCall :: Server withServerCall :: Server
-> RegisteredMethod -> RegisteredMethod
-> TimeoutSeconds
-> (ServerCall -> IO (Either GRPCIOError a)) -> (ServerCall -> IO (Either GRPCIOError a))
-> IO (Either GRPCIOError a) -> IO (Either GRPCIOError a)
withServerCall server regmethod timeout f = do withServerCall server regmethod f = do
createResult <- serverCreateCall server regmethod timeout createResult <- serverCreateCall server regmethod
case createResult of case createResult of
Left x -> return $ Left x Left x -> return $ Left x
Right call -> f call `finally` logDestroy call Right call -> f call `finally` logDestroy call
@ -177,22 +175,21 @@ type ServerHandler
-- | Wait for and then handle a normal (non-streaming) call. -- | Wait for and then handle a normal (non-streaming) call.
serverHandleNormalCall :: Server serverHandleNormalCall :: Server
-> RegisteredMethod -> RegisteredMethod
-> TimeoutSeconds
-> MetadataMap -> MetadataMap
-- ^ Initial server metadata -- ^ Initial server metadata
-> ServerHandler -> ServerHandler
-> IO (Either GRPCIOError ()) -> IO (Either GRPCIOError ())
serverHandleNormalCall s@Server{..} rm timeLimit initMeta f = do serverHandleNormalCall s@Server{..} rm initMeta f = do
withServerCall s rm timeLimit $ \call -> do withServerCall s rm $ \call -> do
grpcDebug "serverHandleNormalCall(R): starting batch." grpcDebug "serverHandleNormalCall(R): starting batch."
debugServerCall call debugServerCall call
payload <- serverCallGetPayload call let payload = optionalPayload call
case payload of case payload of
--TODO: what should we do with an empty payload? Have the handler take --TODO: what should we do with an empty payload? Have the handler take
-- @Maybe ByteString@? Need to figure out when/why payload would be empty. -- @Maybe ByteString@? Need to figure out when/why payload would be empty.
Nothing -> error "serverHandleNormalCall(R): payload empty." Nothing -> error "serverHandleNormalCall(R): payload empty."
Just requestBody -> do Just requestBody -> do
requestMeta <- serverCallGetMetadata call let requestMeta = requestMetadataRecv call
(respBody, trailingMeta, status, details) <- f call (respBody, trailingMeta, status, details) <- f call
requestBody requestBody
requestMeta requestMeta

View file

@ -13,16 +13,15 @@ import Network.GRPC.LowLevel.Op (Op(..), OpR
import Network.GRPC.LowLevel.Server (Server (..)) import Network.GRPC.LowLevel.Server (Server (..))
import qualified Network.GRPC.Unsafe.Op as C import qualified Network.GRPC.Unsafe.Op as C
serverCreateCall :: Server -> TimeoutSeconds serverCreateCall :: Server -> IO (Either GRPCIOError ServerCall)
-> IO (Either GRPCIOError ServerCall) serverCreateCall Server{..} =
serverCreateCall Server{..} timeLimit = serverRequestCall internalServer serverCQ
serverRequestCall internalServer serverCQ timeLimit
withServerCall :: Server -> TimeoutSeconds withServerCall :: Server
-> (ServerCall -> IO (Either GRPCIOError a)) -> (ServerCall -> IO (Either GRPCIOError a))
-> IO (Either GRPCIOError a) -> IO (Either GRPCIOError a)
withServerCall server timeout f = do withServerCall server f = do
createResult <- serverCreateCall server timeout createResult <- serverCreateCall server
case createResult of case createResult of
Left x -> return $ Left x Left x -> return $ Left x
Right call -> f call `finally` logDestroy call Right call -> f call `finally` logDestroy call
@ -52,37 +51,29 @@ serverOpsSendNormalResponse body metadata code details =
-- | A handler for an unregistered server call; bytestring arguments are the -- | A handler for an unregistered server call; bytestring arguments are the
-- request body and response body respectively. -- request body and response body respectively.
type ServerHandler type ServerHandler
= ServerCall -> ByteString -> MetadataMap -> MethodName = ServerCall -> ByteString
-> IO (ByteString, MetadataMap, C.StatusCode, StatusDetails) -> IO (ByteString, MetadataMap, C.StatusCode, StatusDetails)
-- | Handle one unregistered call. -- | Handle one unregistered call.
serverHandleNormalCall :: Server serverHandleNormalCall :: Server
-> TimeoutSeconds
-> MetadataMap -- ^ Initial server metadata. -> MetadataMap -- ^ Initial server metadata.
-> ServerHandler -> ServerHandler
-> IO (Either GRPCIOError ()) -> IO (Either GRPCIOError ())
serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do serverHandleNormalCall s@Server{..} srvMetadata f = do
withServerCall s timeLimit $ \call -> do withServerCall s $ \call@ServerCall{..} -> do
grpcDebug "serverHandleNormalCall(U): starting batch." grpcDebug "serverHandleNormalCall(U): starting batch."
let recvOps = serverOpsGetNormalCall srvMetadata let recvOps = serverOpsGetNormalCall srvMetadata
call' = unServerCall call opResults <- runOps unServerCall serverCQ recvOps
opResults <- runOps call' serverCQ recvOps
case opResults of case opResults of
Left x -> do grpcDebug "serverHandleNormalCall(U): ops failed; aborting" Left x -> do grpcDebug "serverHandleNormalCall(U): ops failed; aborting"
return $ Left x return $ Left x
Right [OpRecvMessageResult (Just body)] -> do Right [OpRecvMessageResult (Just body)] -> do
requestMeta <- serverCallGetMetadata call grpcDebug $ "got client metadata: " ++ show requestMetadataRecv
grpcDebug $ "got client metadata: " ++ show requestMeta grpcDebug $ "call_details host is: " ++ show callHost
methodName <- serverCallGetMethodName call (respBody, respMetadata, status, details) <- f call body
hostName <- serverCallGetHost call
grpcDebug $ "call_details host is: " ++ show hostName
(respBody, respMetadata, status, details) <- f call
body
requestMeta
methodName
let respOps = serverOpsSendNormalResponse let respOps = serverOpsSendNormalResponse
respBody respMetadata status details respBody respMetadata status details
respOpsResults <- runOps call' serverCQ respOps respOpsResults <- runOps unServerCall serverCQ respOps
case respOpsResults of case respOpsResults of
Left x -> do grpcDebug "serverHandleNormalCall(U): resp failed." Left x -> do grpcDebug "serverHandleNormalCall(U): resp failed."
return $ Left x return $ Left x

View file

@ -2,6 +2,7 @@
module Network.GRPC.Unsafe where module Network.GRPC.Unsafe where
import Control.Exception (bracket)
import Control.Monad import Control.Monad
import Foreign.C.Types import Foreign.C.Types
@ -46,6 +47,9 @@ deriving instance Show CallDetails
{#fun create_call_details as ^ {} -> `CallDetails'#} {#fun create_call_details as ^ {} -> `CallDetails'#}
{#fun destroy_call_details as ^ {`CallDetails'} -> `()'#} {#fun destroy_call_details as ^ {`CallDetails'} -> `()'#}
withCallDetails :: (CallDetails -> IO a) -> IO a
withCallDetails = bracket createCallDetails destroyCallDetails
-- instance def adapted from -- instance def adapted from
-- https://mail.haskell.org/pipermail/c2hs/2007-June/000800.html -- https://mail.haskell.org/pipermail/c2hs/2007-June/000800.html
instance Storable Call where instance Storable Call where

View file

@ -11,6 +11,7 @@ import Control.Monad
import Data.ByteString (ByteString) import Data.ByteString (ByteString)
import qualified Data.Map as M import qualified Data.Map as M
import Network.GRPC.LowLevel import Network.GRPC.LowLevel
import qualified Network.GRPC.LowLevel.Call.Unregistered as U
import qualified Network.GRPC.LowLevel.Client.Unregistered as U import qualified Network.GRPC.LowLevel.Client.Unregistered as U
import qualified Network.GRPC.LowLevel.Server.Unregistered as U import qualified Network.GRPC.LowLevel.Server.Unregistered as U
import Test.Tasty import Test.Tasty
@ -29,9 +30,6 @@ lowLevelTests = testGroup "Unit tests of low-level Haskell library"
, testClientCall , testClientCall
, testClientTimeoutNoServer , testClientTimeoutNoServer
, testServerCreateDestroy , testServerCreateDestroy
, testServerCall
, testServerTimeoutNoClient
, testWrongEndpoint
, testMixRegisteredUnregistered , testMixRegisteredUnregistered
, testPayload , testPayload
, testPayloadUnregistered , testPayloadUnregistered
@ -65,42 +63,12 @@ testClientTimeoutNoServer =
clientOnlyTest "request timeout when server DNE" $ \c -> do clientOnlyTest "request timeout when server DNE" $ \c -> do
rm <- clientRegisterMethod c "/foo" Normal rm <- clientRegisterMethod c "/foo" Normal
r <- clientRequest c rm 1 "Hello" mempty r <- clientRequest c rm 1 "Hello" mempty
r @?= Left GRPCIOUnknownError r @?= Left GRPCIOTimeout
testServerCreateDestroy :: TestTree testServerCreateDestroy :: TestTree
testServerCreateDestroy = testServerCreateDestroy =
serverOnlyTest "start/stop" [] nop serverOnlyTest "start/stop" [] nop
testServerCall :: TestTree
testServerCall =
serverOnlyTest "create/destroy call" [] $ \s -> do
r <- U.withServerCall s 1 $ const $ return $ Right ()
r @?= Left GRPCIOTimeout
testServerTimeoutNoClient :: TestTree
testServerTimeoutNoClient =
serverOnlyTest "wait timeout when client DNE" [("/foo", Normal)] $ \s -> do
let rm = head (registeredMethods s)
r <- serverHandleNormalCall s rm 1 mempty dummyHandler
r @?= Left GRPCIOTimeout
testWrongEndpoint :: TestTree
testWrongEndpoint =
csTest "client requests unknown endpoint" client server [("/foo", Normal)]
where
-- TODO: possibly factor out dead-simple client/server construction even
-- further
client c = do
rm <- clientRegisterMethod c "/bar" Normal
r <- clientRequest c rm 1 "Hello!" mempty
r @?= Left (GRPCIOBadStatusCode StatusDeadlineExceeded
(StatusDetails "Deadline Exceeded"))
server s = do
length (registeredMethods s) @?= 1
let rm = head (registeredMethods s)
r <- serverHandleNormalCall s rm 2 mempty dummyHandler
r @?= Left GRPCIOTimeout
testMixRegisteredUnregistered :: TestTree testMixRegisteredUnregistered :: TestTree
testMixRegisteredUnregistered = testMixRegisteredUnregistered =
csTest "server uses unregistered calls to handle unknown endpoints" csTest "server uses unregistered calls to handle unknown endpoints"
@ -119,28 +87,21 @@ testMixRegisteredUnregistered =
clientRequest c rm2 1 "bad endpoint" mempty >>= do clientRequest c rm2 1 "bad endpoint" mempty >>= do
checkReqRslt $ \NormalRequestResult{..} -> do checkReqRslt $ \NormalRequestResult{..} -> do
rspBody @?= "" rspBody @?= ""
r3 <- clientRequest c rm1 1 "Hello" mempty
r3 @?= deadlineExceededStatus
return () return ()
server s = do server s = do
concurrently regThread unregThread concurrently regThread unregThread
return () return ()
where regThread = do where regThread = do
let rm = head (registeredMethods s) let rm = head (registeredMethods s)
r <- serverHandleNormalCall s rm 2 dummyMeta $ \_ body _ -> do r <- serverHandleNormalCall s rm dummyMeta $ \_ body _ -> do
body @?= "Hello" body @?= "Hello"
return ("reply test", dummyMeta, StatusOk, StatusDetails "") return ("reply test", dummyMeta, StatusOk, StatusDetails "")
return () return ()
unregThread = do unregThread = do
r1 <- U.serverHandleNormalCall s 2 mempty $ \_ _ _ method -> do r1 <- U.serverHandleNormalCall s mempty $ \call _ -> do
method @?= "/bar" U.callMethod call @?= "/bar"
return ("", mempty, StatusOk, return ("", mempty, StatusOk,
StatusDetails "Wrong endpoint") StatusDetails "Wrong endpoint")
r2 <- U.serverHandleNormalCall s 2 mempty $ \_ _ _ method -> do
method @?= "/bar"
return ("", mempty, StatusNotFound,
StatusDetails "Wrong endpoint")
r2 @?= Left GRPCIOTimeout
return () return ()
-- TODO: There seems to be a race here (and in other client/server pairs, of -- TODO: There seems to be a race here (and in other client/server pairs, of
@ -166,7 +127,7 @@ testPayload =
server s = do server s = do
length (registeredMethods s) @?= 1 length (registeredMethods s) @?= 1
let rm = head (registeredMethods s) let rm = head (registeredMethods s)
r <- serverHandleNormalCall s rm 11 dummyMeta $ \_ reqBody reqMD -> do r <- serverHandleNormalCall s rm dummyMeta $ \_ reqBody reqMD -> do
reqBody @?= "Hello!" reqBody @?= "Hello!"
checkMD "Server metadata mismatch" clientMD reqMD checkMD "Server metadata mismatch" clientMD reqMD
return ("reply test", dummyMeta, StatusOk, return ("reply test", dummyMeta, StatusOk,
@ -185,9 +146,9 @@ testServerCancel =
"Received RST_STREAM err=8")) "Received RST_STREAM err=8"))
server s = do server s = do
let rm = head (registeredMethods s) let rm = head (registeredMethods s)
r <- serverHandleNormalCall s rm 10 mempty $ \c _ _ -> do r <- serverHandleNormalCall s rm mempty $ \c _ _ -> do
serverCallCancel c StatusCancelled "" serverCallCancel c StatusCancelled ""
return (mempty, mempty, StatusOk, "") return (mempty, mempty, StatusCancelled, "")
r @?= Right () r @?= Right ()
testPayloadUnregistered :: TestTree testPayloadUnregistered :: TestTree
@ -201,9 +162,9 @@ testPayloadUnregistered =
rspBody @?= "reply test" rspBody @?= "reply test"
details @?= "details string" details @?= "details string"
server s = do server s = do
r <- U.serverHandleNormalCall s 11 mempty $ \_ body _md meth -> do r <- U.serverHandleNormalCall s mempty $ \U.ServerCall{..} body -> do
body @?= "Hello!" body @?= "Hello!"
meth @?= "/foo" callMethod @?= "/foo"
return ("reply test", mempty, StatusOk, "details string") return ("reply test", mempty, StatusOk, "details string")
r @?= Right () r @?= Right ()
@ -223,10 +184,12 @@ testGoaway =
lastResult == unavailableStatus lastResult == unavailableStatus
|| ||
lastResult == deadlineExceededStatus lastResult == deadlineExceededStatus
||
lastResult == Left GRPCIOTimeout
server s = do server s = do
let rm = head (registeredMethods s) let rm = head (registeredMethods s)
serverHandleNormalCall s rm 11 mempty dummyHandler serverHandleNormalCall s rm mempty dummyHandler
serverHandleNormalCall s rm 11 mempty dummyHandler serverHandleNormalCall s rm mempty dummyHandler
return () return ()
testSlowServer :: TestTree testSlowServer :: TestTree
@ -239,7 +202,7 @@ testSlowServer =
result @?= deadlineExceededStatus result @?= deadlineExceededStatus
server s = do server s = do
let rm = head (registeredMethods s) let rm = head (registeredMethods s)
serverHandleNormalCall s rm 1 mempty $ \_ _ _ -> do serverHandleNormalCall s rm mempty $ \_ _ _ -> do
threadDelay (2*10^(6 :: Int)) threadDelay (2*10^(6 :: Int))
return ("", mempty, StatusOk, StatusDetails "") return ("", mempty, StatusOk, StatusDetails "")
return () return ()
@ -254,7 +217,7 @@ testServerCallExpirationCheck =
return () return ()
server s = do server s = do
let rm = head (registeredMethods s) let rm = head (registeredMethods s)
serverHandleNormalCall s rm 5 mempty $ \c _ _ -> do serverHandleNormalCall s rm mempty $ \c _ _ -> do
exp1 <- serverCallIsExpired c exp1 <- serverCallIsExpired c
assertBool "Call isn't expired when handler starts" $ not exp1 assertBool "Call isn't expired when handler starts" $ not exp1
threadDelaySecs 1 threadDelaySecs 1
@ -263,7 +226,7 @@ testServerCallExpirationCheck =
threadDelaySecs 3 threadDelaySecs 3
exp3 <- serverCallIsExpired c exp3 <- serverCallIsExpired c
assertBool "Call is expired after 4 seconds" exp3 assertBool "Call is expired after 4 seconds" exp3
return ("", mempty, StatusDetails "") return ("", mempty, StatusCancelled, StatusDetails "")
return () return ()
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------

View file

@ -84,7 +84,7 @@ withClientServerUnaryCall grpc f = do
-- created. If later we want to send payloads or metadata, we'll need -- created. If later we want to send payloads or metadata, we'll need
-- to tweak this. -- to tweak this.
clientRes <- runOps (unClientCall cc) (clientCQ c) clientEmptySendOps clientRes <- runOps (unClientCall cc) (clientCQ c) clientEmptySendOps
withServerCall s srm 10 $ \sc -> withServerCall s srm $ \sc ->
f (c, s, cc, sc) f (c, s, cc, sc)
serverConf = (ServerConfig "localhost" 50051 [("/foo", Normal)]) serverConf = (ServerConfig "localhost" 50051 [("/foo", Normal)])