Merge pull request #23 from awakenetworks/joel/unreg-vs-reg-prelim-refactor

Reg vs. unreg call module org and naming (preliminary refactor)
This commit is contained in:
Joel Stanley 2016-06-08 16:26:29 -05:00
commit e46d0b1b7e
16 changed files with 729 additions and 649 deletions

View file

@ -5,15 +5,16 @@
import Control.Monad import Control.Monad
import Network.GRPC.LowLevel import Network.GRPC.LowLevel
import qualified Network.GRPC.LowLevel.Client.Unregistered as U
echoMethod = MethodName "/echo.Echo/DoEcho" echoMethod = MethodName "/echo.Echo/DoEcho"
unregistered c = do unregistered c = do
clientRequest c echoMethod 1 "hi" mempty U.clientRequest c echoMethod 1 "hi" mempty
registered c = do registered c = do
meth <- clientRegisterMethod c echoMethod Normal meth <- clientRegisterMethod c echoMethod Normal
clientRegisteredRequest c meth 1 "hi" mempty clientRequest c meth 1 "hi" mempty
run f = withGRPC $ \g -> withClient g (ClientConfig "localhost" 50051) $ \c -> run f = withGRPC $ \g -> withClient g (ClientConfig "localhost" 50051) $ \c ->
f c >>= \case f c >>= \case

View file

@ -1,18 +1,20 @@
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
{-# OPTIONS_GHC -fno-warn-missing-signatures #-}
{-# OPTIONS_GHC -fno-warn-unused-binds #-}
import Control.Concurrent.Async (async, wait) import Control.Concurrent.Async (async, wait)
import Control.Monad (forever) import Control.Monad (forever)
import Data.ByteString (ByteString) import Data.ByteString (ByteString)
import qualified Data.Map as M
import Network.GRPC.LowLevel import Network.GRPC.LowLevel
import qualified Network.GRPC.LowLevel.Server.Unregistered as U
serverMeta :: MetadataMap serverMeta :: MetadataMap
serverMeta = M.fromList [("test_meta", "test_meta_value")] serverMeta = [("test_meta", "test_meta_value")]
handler :: ByteString -> MetadataMap -> MethodName handler :: ByteString -> MetadataMap -> MethodName
-> IO (ByteString, MetadataMap, StatusDetails) -> IO (ByteString, MetadataMap, StatusDetails)
handler reqBody reqMeta method = do handler reqBody _reqMeta _method = do
--putStrLn $ "Got request for method: " ++ show method --putStrLn $ "Got request for method: " ++ show method
--putStrLn $ "Got metadata: " ++ show reqMeta --putStrLn $ "Got metadata: " ++ show reqMeta
return (reqBody, serverMeta, StatusDetails "") return (reqBody, serverMeta, StatusDetails "")
@ -20,7 +22,7 @@ handler reqBody reqMeta method = do
unregMain :: IO () unregMain :: IO ()
unregMain = withGRPC $ \grpc -> do unregMain = withGRPC $ \grpc -> do
withServer grpc (ServerConfig "localhost" 50051 []) $ \server -> forever $ do withServer grpc (ServerConfig "localhost" 50051 []) $ \server -> forever $ do
result <- serverHandleNormalCall server 15 serverMeta handler result <- U.serverHandleNormalCall server 15 serverMeta handler
case result of case result of
Left x -> putStrLn $ "handle call result error: " ++ show x Left x -> putStrLn $ "handle call result error: " ++ show x
Right _ -> return () Right _ -> return ()
@ -31,7 +33,7 @@ regMain = withGRPC $ \grpc -> do
withServer grpc (ServerConfig "localhost" 50051 methods) $ \server -> withServer grpc (ServerConfig "localhost" 50051 methods) $ \server ->
forever $ do forever $ do
let method = head (registeredMethods server) let method = head (registeredMethods server)
result <- serverHandleNormalRegisteredCall server method 15 serverMeta $ result <- serverHandleNormalCall server method 15 serverMeta $
\reqBody _reqMeta -> return (reqBody, serverMeta, serverMeta, \reqBody _reqMeta -> return (reqBody, serverMeta, serverMeta,
StatusDetails "") StatusDetails "")
case result of case result of
@ -41,7 +43,7 @@ regMain = withGRPC $ \grpc -> do
-- | loop to fork n times -- | loop to fork n times
regLoop :: Server -> RegisteredMethod -> IO () regLoop :: Server -> RegisteredMethod -> IO ()
regLoop server method = forever $ do regLoop server method = forever $ do
result <- serverHandleNormalRegisteredCall server method 15 serverMeta $ result <- serverHandleNormalCall server method 15 serverMeta $
\reqBody _reqMeta -> return (reqBody, serverMeta, serverMeta, \reqBody _reqMeta -> return (reqBody, serverMeta, serverMeta,
StatusDetails "") StatusDetails "")
case result of case result of

View file

@ -42,12 +42,17 @@ library
Network.GRPC.Unsafe.Op Network.GRPC.Unsafe.Op
Network.GRPC.Unsafe Network.GRPC.Unsafe
Network.GRPC.LowLevel Network.GRPC.LowLevel
Network.GRPC.LowLevel.Server.Unregistered
Network.GRPC.LowLevel.Client.Unregistered
other-modules: other-modules:
Network.GRPC.LowLevel.CompletionQueue Network.GRPC.LowLevel.CompletionQueue
Network.GRPC.LowLevel.CompletionQueue.Internal
Network.GRPC.LowLevel.CompletionQueue.Unregistered
Network.GRPC.LowLevel.GRPC Network.GRPC.LowLevel.GRPC
Network.GRPC.LowLevel.Op Network.GRPC.LowLevel.Op
Network.GRPC.LowLevel.Server Network.GRPC.LowLevel.Server
Network.GRPC.LowLevel.Call Network.GRPC.LowLevel.Call
Network.GRPC.LowLevel.Call.Unregistered
Network.GRPC.LowLevel.Client Network.GRPC.LowLevel.Client
extra-libraries: extra-libraries:
grpc grpc

View file

@ -29,14 +29,11 @@ GRPC
-- * Server -- * Server
, ServerConfig(..) , ServerConfig(..)
, Server , Server
, ServerRegCall , ServerCall
, ServerUnregCall
, registeredMethods , registeredMethods
, withServer , withServer
, serverHandleNormalRegisteredCall
, serverHandleNormalCall , serverHandleNormalCall
, withServerUnregCall , withServerCall
, withServerRegisteredCall
-- * Client -- * Client
, ClientConfig(..) , ClientConfig(..)
@ -46,14 +43,10 @@ GRPC
, clientConnectivity , clientConnectivity
, withClient , withClient
, clientRegisterMethod , clientRegisterMethod
, clientRegisteredRequest
, clientRequest , clientRequest
, withClientCall , withClientCall
-- * Ops -- * Ops
, runClientOps
, runServerRegOps
, runServerUnregOps
, Op(..) , Op(..)
, OpRecvResult(..) , OpRecvResult(..)

View file

@ -1,6 +1,9 @@
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE RecordWildCards #-}
-- | This module defines data structures and operations pertaining to registered
-- calls; for unregistered call support, see
-- `Network.GRPC.LowLevel.Call.Unregistered`.
module Network.GRPC.LowLevel.Call where module Network.GRPC.LowLevel.Call where
import Control.Monad import Control.Monad
@ -11,11 +14,11 @@ import Foreign.Ptr (Ptr, nullPtr)
import Foreign.Storable (peek) import Foreign.Storable (peek)
import qualified Network.GRPC.Unsafe as C import qualified Network.GRPC.Unsafe as C
import qualified Network.GRPC.Unsafe.Time as C
import qualified Network.GRPC.Unsafe.Metadata as C
import qualified Network.GRPC.Unsafe.ByteBuffer as C import qualified Network.GRPC.Unsafe.ByteBuffer as C
import qualified Network.GRPC.Unsafe.Metadata as C
import qualified Network.GRPC.Unsafe.Time as C
import Network.GRPC.LowLevel.GRPC (grpcDebug, MetadataMap) import Network.GRPC.LowLevel.GRPC (MetadataMap, grpcDebug)
-- | Models the four types of RPC call supported by gRPC. We currently only -- | Models the four types of RPC call supported by gRPC. We currently only
-- support the first alternative, and only in a preliminary fashion. -- support the first alternative, and only in a preliminary fashion.
@ -51,59 +54,36 @@ data RegisteredMethod = RegisteredMethod {methodType :: GRPCMethodType,
-- | Represents one GRPC call (i.e. request) on the client. -- | Represents one GRPC call (i.e. request) on the client.
-- This is used to associate send/receive 'Op's with a request. -- This is used to associate send/receive 'Op's with a request.
data ClientCall = ClientCall {internalClientCall :: C.Call} data ClientCall = ClientCall { unClientCall :: C.Call }
-- | Represents one registered GRPC call on the server. -- | Represents one registered GRPC call on the server. Contains pointers to all
-- Contains pointers to all the C state needed to respond to a registered call. -- the C state needed to respond to a registered call.
data ServerRegCall = ServerRegCall data ServerCall = ServerCall
{internalServerRegCall :: C.Call, { unServerCall :: C.Call,
requestMetadataRecvReg :: Ptr C.MetadataArray, requestMetadataRecv :: Ptr C.MetadataArray,
optionalPayload :: Ptr C.ByteBuffer, optionalPayload :: Ptr C.ByteBuffer,
parentPtrReg :: Maybe (Ptr C.Call), parentPtr :: Maybe (Ptr C.Call),
callDeadline :: C.CTimeSpecPtr callDeadline :: C.CTimeSpecPtr
} }
serverRegCallGetMetadata :: ServerRegCall -> IO MetadataMap serverCallGetMetadata :: ServerCall -> IO MetadataMap
serverRegCallGetMetadata ServerRegCall{..} = do serverCallGetMetadata ServerCall{..} = do
marray <- peek requestMetadataRecvReg marray <- peek requestMetadataRecv
C.getAllMetadataArray marray C.getAllMetadataArray marray
-- | Extract the client request body from the given registered call, if present. -- | Extract the client request body from the given call, if present. TODO: the
-- TODO: the reason this returns @Maybe ByteString@ is because the gRPC library -- reason this returns @Maybe ByteString@ is because the gRPC library calls the
-- calls the underlying out parameter "optional_payload". I am not sure exactly -- underlying out parameter "optional_payload". I am not sure exactly in what
-- in what cases it won't be present. The C++ library checks a -- cases it won't be present. The C++ library checks a has_request_payload_ bool
-- has_request_payload_ bool and passes in nullptr to request_registered_call -- and passes in nullptr to request_registered_call if the bool is false, so we
-- if the bool is false, so we may be able to do the payload present/absent -- may be able to do the payload present/absent check earlier.
-- check earlier. serverCallGetPayload :: ServerCall -> IO (Maybe ByteString)
serverRegCallGetPayload :: ServerRegCall -> IO (Maybe ByteString) serverCallGetPayload ServerCall{..} = do
serverRegCallGetPayload ServerRegCall{..} = do
bb@(C.ByteBuffer rawPtr) <- peek optionalPayload bb@(C.ByteBuffer rawPtr) <- peek optionalPayload
if rawPtr == nullPtr if rawPtr == nullPtr
then return Nothing then return Nothing
else Just <$> C.copyByteBufferToByteString bb else Just <$> C.copyByteBufferToByteString bb
-- | Represents one unregistered GRPC call on the server.
-- Contains pointers to all the C state needed to respond to an unregistered
-- call.
data ServerUnregCall = ServerUnregCall
{internalServerUnregCall :: C.Call,
requestMetadataRecvUnreg :: Ptr C.MetadataArray,
parentPtrUnreg :: Maybe (Ptr C.Call),
callDetails :: C.CallDetails}
serverUnregCallGetMetadata :: ServerUnregCall -> IO MetadataMap
serverUnregCallGetMetadata ServerUnregCall{..} = do
marray <- peek requestMetadataRecvUnreg
C.getAllMetadataArray marray
serverUnregCallGetMethodName :: ServerUnregCall -> IO MethodName
serverUnregCallGetMethodName ServerUnregCall{..} =
MethodName <$> C.callDetailsGetMethod callDetails
serverUnregCallGetHost :: ServerUnregCall -> IO Host
serverUnregCallGetHost ServerUnregCall{..} =
Host <$> C.callDetailsGetHost callDetails
debugClientCall :: ClientCall -> IO () debugClientCall :: ClientCall -> IO ()
{-# INLINE debugClientCall #-} {-# INLINE debugClientCall #-}
#ifdef DEBUG #ifdef DEBUG
@ -113,84 +93,47 @@ debugClientCall (ClientCall (C.Call ptr)) =
debugClientCall = const $ return () debugClientCall = const $ return ()
#endif #endif
debugServerRegCall :: ServerRegCall -> IO () debugServerCall :: ServerCall -> IO ()
#ifdef DEBUG #ifdef DEBUG
debugServerRegCall call@(ServerRegCall (C.Call ptr) _ _ _ _) = do debugServerCall call@(ServerCall (C.Call ptr) _ _ _ _) = do
grpcDebug $ "debugServerRegCall: server call: " ++ (show ptr) grpcDebug $ "debugServerCall(R): server call: " ++ (show ptr)
grpcDebug $ "debugServerRegCall: metadata ptr: " grpcDebug $ "debugServerCall(R): metadata ptr: "
++ show (requestMetadataRecvReg call) ++ show (requestMetadataRecv call)
metadataArr <- peek (requestMetadataRecvReg call) metadataArr <- peek (requestMetadataRecv call)
metadata <- C.getAllMetadataArray metadataArr metadata <- C.getAllMetadataArray metadataArr
grpcDebug $ "debugServerRegCall: metadata received: " ++ (show metadata) grpcDebug $ "debugServerCall(R): metadata received: " ++ (show metadata)
grpcDebug $ "debugServerRegCall: payload ptr: " ++ show (optionalPayload call) grpcDebug $ "debugServerCall(R): payload ptr: " ++ show (optionalPayload call)
payload <- peek (optionalPayload call) payload <- peek (optionalPayload call)
bs <- C.copyByteBufferToByteString payload bs <- C.copyByteBufferToByteString payload
grpcDebug $ "debugServerRegCall: payload contents: " ++ show bs grpcDebug $ "debugServerCall(R): payload contents: " ++ show bs
forM_ (parentPtrReg call) $ \parentPtr' -> do forM_ (parentPtr call) $ \parentPtr' -> do
grpcDebug $ "debugServerRegCall: parent ptr: " ++ show parentPtr' grpcDebug $ "debugServerCall(R): parent ptr: " ++ show parentPtr'
(C.Call parent) <- peek parentPtr' (C.Call parent) <- peek parentPtr'
grpcDebug $ "debugServerRegCall: parent: " ++ show parent grpcDebug $ "debugServerCall(R): parent: " ++ show parent
grpcDebug $ "debugServerRegCall: deadline ptr: " ++ show (callDeadline call) grpcDebug $ "debugServerCall(R): deadline ptr: " ++ show (callDeadline call)
timespec <- peek (callDeadline call) timespec <- peek (callDeadline call)
grpcDebug $ "debugServerRegCall: deadline: " ++ show (C.timeSpec timespec) grpcDebug $ "debugServerCall(R): deadline: " ++ show (C.timeSpec timespec)
#else #else
{-# INLINE debugServerRegCall #-} {-# INLINE debugServerCall #-}
debugServerRegCall = const $ return () debugServerCall = const $ return ()
#endif #endif
debugServerUnregCall :: ServerUnregCall -> IO ()
#ifdef DEBUG
debugServerUnregCall call@(ServerUnregCall (C.Call ptr) _ _ _) = do
grpcDebug $ "debugServerUnregCall: server call: " ++ (show ptr)
grpcDebug $ "debugServerUnregCall: metadata ptr: "
++ show (requestMetadataRecvUnreg call)
metadataArr <- peek (requestMetadataRecvUnreg call)
metadata <- C.getAllMetadataArray metadataArr
grpcDebug $ "debugServerUnregCall: metadata received: " ++ (show metadata)
forM_ (parentPtrUnreg call) $ \parentPtr' -> do
grpcDebug $ "debugServerRegCall: parent ptr: " ++ show parentPtr'
(C.Call parent) <- peek parentPtr'
grpcDebug $ "debugServerRegCall: parent: " ++ show parent
grpcDebug $ "debugServerUnregCall: callDetails ptr: "
++ show (callDetails call)
--TODO: need functions for getting data out of call_details.
#else
{-# INLINE debugServerUnregCall #-}
debugServerUnregCall = const $ return ()
#endif
destroyClientCall :: ClientCall -> IO () destroyClientCall :: ClientCall -> IO ()
destroyClientCall ClientCall{..} = do destroyClientCall ClientCall{..} = do
grpcDebug "Destroying client-side call object." grpcDebug "Destroying client-side call object."
C.grpcCallDestroy internalClientCall C.grpcCallDestroy unClientCall
destroyServerRegCall :: ServerRegCall -> IO () destroyServerCall :: ServerCall -> IO ()
destroyServerRegCall call@ServerRegCall{..} = do destroyServerCall call@ServerCall{..} = do
grpcDebug "destroyServerRegCall: entered." grpcDebug "destroyServerCall(R): entered."
debugServerRegCall call debugServerCall call
grpcDebug $ "Destroying server-side call object: " grpcDebug $ "Destroying server-side call object: " ++ show unServerCall
++ show internalServerRegCall C.grpcCallDestroy unServerCall
C.grpcCallDestroy internalServerRegCall grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecv
grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecvReg C.metadataArrayDestroy requestMetadataRecv
C.metadataArrayDestroy requestMetadataRecvReg
grpcDebug $ "destroying optional payload" ++ show optionalPayload grpcDebug $ "destroying optional payload" ++ show optionalPayload
C.destroyReceivingByteBuffer optionalPayload C.destroyReceivingByteBuffer optionalPayload
grpcDebug $ "freeing parentPtr: " ++ show parentPtrReg grpcDebug $ "freeing parentPtr: " ++ show parentPtr
forM_ parentPtrReg free forM_ parentPtr free
grpcDebug $ "destroying deadline." ++ show callDeadline grpcDebug $ "destroying deadline." ++ show callDeadline
C.timespecDestroy callDeadline C.timespecDestroy callDeadline
destroyServerUnregCall :: ServerUnregCall -> IO ()
destroyServerUnregCall call@ServerUnregCall{..} = do
grpcDebug "destroyServerUnregCall: entered."
debugServerUnregCall call
grpcDebug $ "Destroying server-side call object: "
++ show internalServerUnregCall
C.grpcCallDestroy internalServerUnregCall
grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecvUnreg
C.metadataArrayDestroy requestMetadataRecvUnreg
grpcDebug $ "freeing parentPtrUnreg: " ++ show parentPtrUnreg
forM_ parentPtrUnreg free
grpcDebug $ "destroying call details: " ++ show callDetails
C.destroyCallDetails callDetails

View file

@ -0,0 +1,69 @@
{-# LANGUAGE RecordWildCards #-}
module Network.GRPC.LowLevel.Call.Unregistered where
import Control.Monad
import Foreign.Marshal.Alloc (free)
import Foreign.Ptr (Ptr)
import Foreign.Storable (peek)
import Network.GRPC.LowLevel.Call (Host (..), MethodName (..))
import Network.GRPC.LowLevel.GRPC (MetadataMap, grpcDebug)
import qualified Network.GRPC.Unsafe as C
import qualified Network.GRPC.Unsafe.Metadata as C
-- | Represents one unregistered GRPC call on the server.
-- Contains pointers to all the C state needed to respond to an unregistered
-- call.
data ServerCall = ServerCall
{ unServerCall :: C.Call
, requestMetadataRecv :: Ptr C.MetadataArray
, parentPtr :: Maybe (Ptr C.Call)
, callDetails :: C.CallDetails
}
serverCallGetMetadata :: ServerCall -> IO MetadataMap
serverCallGetMetadata ServerCall{..} = do
marray <- peek requestMetadataRecv
C.getAllMetadataArray marray
serverCallGetMethodName :: ServerCall -> IO MethodName
serverCallGetMethodName ServerCall{..} =
MethodName <$> C.callDetailsGetMethod callDetails
serverCallGetHost :: ServerCall -> IO Host
serverCallGetHost ServerCall{..} =
Host <$> C.callDetailsGetHost callDetails
debugServerCall :: ServerCall -> IO ()
#ifdef DEBUG
debugServerCall call@(ServerCall (C.Call ptr) _ _ _) = do
grpcDebug $ "debugServerCall(U): server call: " ++ (show ptr)
grpcDebug $ "debugServerCall(U): metadata ptr: "
++ show (requestMetadataRecv call)
metadataArr <- peek (requestMetadataRecv call)
metadata <- C.getAllMetadataArray metadataArr
grpcDebug $ "debugServerCall(U): metadata received: " ++ (show metadata)
forM_ (parentPtr call) $ \parentPtr' -> do
grpcDebug $ "debugServerCall(U): parent ptr: " ++ show parentPtr'
(C.Call parent) <- peek parentPtr'
grpcDebug $ "debugServerCall(U): parent: " ++ show parent
grpcDebug $ "debugServerCall(U): callDetails ptr: "
++ show (callDetails call)
--TODO: need functions for getting data out of call_details.
#else
{-# INLINE debugServerCall #-}
debugServerCall = const $ return ()
#endif
destroyServerCall :: ServerCall -> IO ()
destroyServerCall call@ServerCall{..} = do
grpcDebug "destroyServerCall(U): entered."
debugServerCall call
grpcDebug $ "Destroying server-side call object: " ++ show unServerCall
C.grpcCallDestroy unServerCall
grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecv
C.metadataArrayDestroy requestMetadataRecv
grpcDebug $ "freeing parentPtr: " ++ show parentPtr
forM_ parentPtr free
grpcDebug $ "destroying call details: " ++ show callDetails
C.destroyCallDetails callDetails

View file

@ -1,5 +1,8 @@
{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE RecordWildCards #-}
-- | This module defines data structures and operations pertaining to registered
-- clients using registered calls; for unregistered support, see
-- `Network.GRPC.LowLevel.Client.Unregistered`.
module Network.GRPC.LowLevel.Client where module Network.GRPC.LowLevel.Client where
import Control.Exception (bracket, finally) import Control.Exception (bracket, finally)
@ -7,13 +10,13 @@ import Control.Monad (join)
import Data.ByteString (ByteString) import Data.ByteString (ByteString)
import Foreign.Ptr (nullPtr) import Foreign.Ptr (nullPtr)
import qualified Network.GRPC.Unsafe as C import qualified Network.GRPC.Unsafe as C
import qualified Network.GRPC.Unsafe.Time as C
import qualified Network.GRPC.Unsafe.Constants as C import qualified Network.GRPC.Unsafe.Constants as C
import qualified Network.GRPC.Unsafe.Op as C import qualified Network.GRPC.Unsafe.Op as C
import qualified Network.GRPC.Unsafe.Time as C
import Network.GRPC.LowLevel.GRPC
import Network.GRPC.LowLevel.CompletionQueue
import Network.GRPC.LowLevel.Call import Network.GRPC.LowLevel.Call
import Network.GRPC.LowLevel.CompletionQueue
import Network.GRPC.LowLevel.GRPC
import Network.GRPC.LowLevel.Op import Network.GRPC.LowLevel.Op
-- | Represents the context needed to perform client-side gRPC operations. -- | Represents the context needed to perform client-side gRPC operations.
@ -55,7 +58,7 @@ clientConnectivity Client{..} =
C.grpcChannelCheckConnectivityState clientChannel False C.grpcChannelCheckConnectivityState clientChannel False
-- | Register a method on the client so that we can call it with -- | Register a method on the client so that we can call it with
-- 'clientRegisteredRequest'. -- 'clientRequest'.
clientRegisterMethod :: Client clientRegisterMethod :: Client
-> MethodName -> MethodName
-- ^ method name, e.g. "/foo" -- ^ method name, e.g. "/foo"
@ -71,53 +74,30 @@ clientRegisterMethod _ _ _ = error "Streaming methods not yet implemented."
-- | Create a new call on the client for a registered method. -- | Create a new call on the client for a registered method.
-- Returns 'Left' if the CQ is shutting down or if the job to create a call -- Returns 'Left' if the CQ is shutting down or if the job to create a call
-- timed out. -- timed out.
clientCreateRegisteredCall :: Client -> RegisteredMethod -> TimeoutSeconds clientCreateCall :: Client
-> RegisteredMethod
-> TimeoutSeconds
-> IO (Either GRPCIOError ClientCall) -> IO (Either GRPCIOError ClientCall)
clientCreateRegisteredCall Client{..} RegisteredMethod{..} timeout = do clientCreateCall Client{..} RegisteredMethod{..} timeout = do
let parentCall = C.Call nullPtr --Unsure what this does. null is safe, though. let parentCall = C.Call nullPtr --Unsure what this does. null is safe, though.
C.withDeadlineSeconds timeout $ \deadline -> do C.withDeadlineSeconds timeout $ \deadline -> do
channelCreateRegisteredCall clientChannel parentCall C.propagateDefaults channelCreateCall clientChannel parentCall C.propagateDefaults
clientCQ methodHandle deadline clientCQ methodHandle deadline
-- TODO: the error-handling refactor made this quite ugly. It could be fixed -- TODO: the error-handling refactor made this quite ugly. It could be fixed
-- by switching to ExceptT IO. -- by switching to ExceptT IO.
-- | Handles safe creation and cleanup of a client call -- | Handles safe creation and cleanup of a client call
withClientRegisteredCall :: Client -> RegisteredMethod -> TimeoutSeconds
-> (ClientCall
-> IO (Either GRPCIOError a))
-> IO (Either GRPCIOError a)
withClientRegisteredCall client regmethod timeout f = do
createResult <- clientCreateRegisteredCall client regmethod timeout
case createResult of
Left x -> return $ Left x
Right call -> f call `finally` logDestroy call
where logDestroy c = grpcDebug "withClientRegisteredCall: destroying."
>> destroyClientCall c
-- | Create a call on the client for an endpoint without using the
-- method registration machinery. In practice, we'll probably only use the
-- registered method version, but we include this for completeness and testing.
clientCreateCall :: Client
-> MethodName
-> TimeoutSeconds
-> IO (Either GRPCIOError ClientCall)
clientCreateCall Client{..} meth timeout = do
let parentCall = C.Call nullPtr
C.withDeadlineSeconds timeout $ \deadline -> do
channelCreateCall clientChannel parentCall C.propagateDefaults
clientCQ meth (clientEndpoint clientConfig) deadline
withClientCall :: Client withClientCall :: Client
-> MethodName -> RegisteredMethod
-> TimeoutSeconds -> TimeoutSeconds
-> (ClientCall -> IO (Either GRPCIOError a)) -> (ClientCall -> IO (Either GRPCIOError a))
-> IO (Either GRPCIOError a) -> IO (Either GRPCIOError a)
withClientCall client method timeout f = do withClientCall client regmethod timeout f = do
createResult <- clientCreateCall client method timeout createResult <- clientCreateCall client regmethod timeout
case createResult of case createResult of
Left x -> return $ Left x Left x -> return $ Left x
Right call -> f call `finally` logDestroy call Right call -> f call `finally` logDestroy call
where logDestroy c = grpcDebug "withClientCall: destroying." where logDestroy c = grpcDebug "withClientCall(R): destroying."
>> destroyClientCall c >> destroyClientCall c
data NormalRequestResult = NormalRequestResult data NormalRequestResult = NormalRequestResult
@ -154,71 +134,48 @@ compileNormalRequestResults x =
-- server's response. TODO: This is preliminary until we figure out how many -- server's response. TODO: This is preliminary until we figure out how many
-- different variations on sending request ops will be needed for full gRPC -- different variations on sending request ops will be needed for full gRPC
-- functionality. -- functionality.
clientRegisteredRequest :: Client clientRequest :: Client
-> RegisteredMethod -> RegisteredMethod
-> TimeoutSeconds -> TimeoutSeconds
-- ^ Timeout of both the grpc_call and the -- ^ Timeout of both the grpc_call and the max time to wait for
-- max time to wait for the completion of the batch. -- the completion of the batch. TODO: I think we will need to
-- TODO: I think we will need to decouple the -- decouple the lifetime of the call from the queue deadline once
-- lifetime of the call from the queue deadline once -- we expose functionality for streaming calls, where one call
-- we expose functionality for streaming calls, where -- object persists across many batches.
-- one call object persists across many batches.
-> ByteString -> ByteString
-- ^ The body of the request. -- ^ The body of the request
-> MetadataMap -> MetadataMap
-- ^ Metadata to send with the request. -- ^ Metadata to send with the request
-> IO (Either GRPCIOError NormalRequestResult) -> IO (Either GRPCIOError NormalRequestResult)
clientRegisteredRequest client@(Client{..}) rm@(RegisteredMethod{..}) clientRequest client@(Client{..}) rm@(RegisteredMethod{..})
timeLimit body meta = timeLimit body meta =
fmap join $ case methodType of fmap join $ case methodType of
Normal -> withClientRegisteredCall client rm timeLimit $ \call -> do Normal -> withClientCall client rm timeLimit $ \call -> do
grpcDebug "clientRegisteredRequest: created call." grpcDebug "clientRequest(R): created call."
debugClientCall call debugClientCall call
let call' = unClientCall call
-- NOTE: sendOps and recvOps *must* be in separate batches or -- NOTE: sendOps and recvOps *must* be in separate batches or
-- the client hangs when the server can't be reached. -- the client hangs when the server can't be reached.
let sendOps = [OpSendInitialMetadata meta let sendOps = [OpSendInitialMetadata meta
, OpSendMessage body , OpSendMessage body
, OpSendCloseFromClient] , OpSendCloseFromClient]
sendRes <- runClientOps call clientCQ sendOps timeLimit sendRes <- runOps call' clientCQ sendOps timeLimit
case sendRes of case sendRes of
Left x -> do grpcDebug "clientRegisteredRequest: batch error." Left x -> do grpcDebug "clientRequest(R) : batch error."
return $ Left x return $ Left x
Right rs -> do Right rs -> do
let recvOps = [OpRecvInitialMetadata, let recvOps = [OpRecvInitialMetadata,
OpRecvMessage, OpRecvMessage,
OpRecvStatusOnClient] OpRecvStatusOnClient]
recvRes <- runClientOps call clientCQ recvOps timeLimit recvRes <- runOps call' clientCQ recvOps timeLimit
case recvRes of case recvRes of
Left x -> do Left x -> do
grpcDebug "clientRegisteredRequest: batch error." grpcDebug "clientRequest(R): batch error."
return $ Left x return $ Left x
Right rs' -> do Right rs' -> do
return $ Right $ compileNormalRequestResults (rs ++ rs') return $ Right $ compileNormalRequestResults (rs ++ rs')
_ -> error "Streaming methods not yet implemented." _ -> error "Streaming methods not yet implemented."
-- | Makes a normal (non-streaming) request without needing to register a method
-- first. Probably only useful for testing. TODO: This is preliminary, like
-- 'clientRegisteredRequest'.
clientRequest :: Client
-> MethodName
-- ^ Method name, e.g. "/foo"
-> TimeoutSeconds
-- ^ "Number of seconds until request times out"
-> ByteString
-- ^ Request body.
-> MetadataMap
-- ^ Request metadata.
-> IO (Either GRPCIOError NormalRequestResult)
clientRequest client@Client{..} meth timeLimit body meta =
fmap join $ do
withClientCall client meth timeLimit $ \call -> do
let ops = clientNormalRequestOps body meta
results <- runClientOps call clientCQ ops timeLimit
grpcDebug "clientRequest: ops ran."
case results of
Left x -> return $ Left x
Right rs -> return $ Right $ compileNormalRequestResults rs
clientNormalRequestOps :: ByteString -> MetadataMap -> [Op] clientNormalRequestOps :: ByteString -> MetadataMap -> [Op]
clientNormalRequestOps body metadata = clientNormalRequestOps body metadata =
[OpSendInitialMetadata metadata, [OpSendInitialMetadata metadata,

View file

@ -0,0 +1,70 @@
{-# LANGUAGE RecordWildCards #-}
module Network.GRPC.LowLevel.Client.Unregistered where
import Control.Exception (finally)
import Control.Monad (join)
import Data.ByteString (ByteString)
import Foreign.Ptr (nullPtr)
import qualified Network.GRPC.Unsafe as C
import qualified Network.GRPC.Unsafe.Constants as C
import qualified Network.GRPC.Unsafe.Time as C
import Network.GRPC.LowLevel.Call
import Network.GRPC.LowLevel.Client (Client (..),
NormalRequestResult (..),
clientEndpoint,
clientNormalRequestOps,
compileNormalRequestResults)
import Network.GRPC.LowLevel.CompletionQueue (TimeoutSeconds)
import qualified Network.GRPC.LowLevel.CompletionQueue.Unregistered as U
import Network.GRPC.LowLevel.GRPC
import Network.GRPC.LowLevel.Op
-- | Create a call on the client for an endpoint without using the
-- method registration machinery. In practice, we'll probably only use the
-- registered method version, but we include this for completeness and testing.
clientCreateCall :: Client
-> MethodName
-> TimeoutSeconds
-> IO (Either GRPCIOError ClientCall)
clientCreateCall Client{..} meth timeout = do
let parentCall = C.Call nullPtr
C.withDeadlineSeconds timeout $ \deadline -> do
U.channelCreateCall clientChannel parentCall C.propagateDefaults
clientCQ meth (clientEndpoint clientConfig) deadline
withClientCall :: Client
-> MethodName
-> TimeoutSeconds
-> (ClientCall -> IO (Either GRPCIOError a))
-> IO (Either GRPCIOError a)
withClientCall client method timeout f = do
createResult <- clientCreateCall client method timeout
case createResult of
Left x -> return $ Left x
Right call -> f call `finally` logDestroy call
where logDestroy c = grpcDebug "withClientCall(U): destroying."
>> destroyClientCall c
-- | Makes a normal (non-streaming) request without needing to register a method
-- first. Probably only useful for testing.
clientRequest :: Client
-> MethodName
-- ^ Method name, e.g. "/foo"
-> TimeoutSeconds
-- ^ "Number of seconds until request times out"
-> ByteString
-- ^ Request body.
-> MetadataMap
-- ^ Request metadata.
-> IO (Either GRPCIOError NormalRequestResult)
clientRequest client@Client{..} meth timeLimit body meta =
fmap join $ do
withClientCall client meth timeLimit $ \call -> do
let ops = clientNormalRequestOps body meta
results <- runOps (unClientCall call) clientCQ ops timeLimit
grpcDebug "clientRequest(U): ops ran."
case results of
Left x -> return $ Left x
Right rs -> return $ Right $ compileNormalRequestResults rs

View file

@ -3,6 +3,12 @@
-- cause race conditions, so we only expose functions that are thread safe. -- cause race conditions, so we only expose functions that are thread safe.
-- However, some of the functions we export here can cause memory leaks if used -- However, some of the functions we export here can cause memory leaks if used
-- improperly. -- improperly.
--
-- When definition operations which pertain to calls, this module only provides
-- definitions for registered calls; for unregistered variants, see
-- `Network.GRPC.LowLevel.CompletionQueue.Unregistered`. Type definitions and
-- implementation details to both are kept in
-- `Network.GRPC.LowLevel.CompletionQueue.Internal`.
{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE RecordWildCards #-}
@ -13,28 +19,24 @@ module Network.GRPC.LowLevel.CompletionQueue
, shutdownCompletionQueue , shutdownCompletionQueue
, pluck , pluck
, startBatch , startBatch
, channelCreateRegisteredCall
, channelCreateCall , channelCreateCall
, TimeoutSeconds , TimeoutSeconds
, isEventSuccessful , isEventSuccessful
, serverRegisterCompletionQueue , serverRegisterCompletionQueue
, serverShutdownAndNotify , serverShutdownAndNotify
, serverRequestRegisteredCall
, serverRequestCall , serverRequestCall
, newTag , newTag
) )
where where
import Control.Concurrent (forkIO, threadDelay) import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically, check, retry) import Control.Concurrent.STM (atomically, check)
import Control.Concurrent.STM.TVar (TVar, modifyTVar', newTVarIO, import Control.Concurrent.STM.TVar (newTVarIO, readTVar,
readTVar, writeTVar) writeTVar)
import Control.Exception (bracket) import Control.Exception (bracket)
import Data.IORef (IORef, atomicModifyIORef', import Data.IORef (newIORef)
newIORef)
import Data.List (intersperse) import Data.List (intersperse)
import Foreign.Marshal.Alloc (free, malloc) import Foreign.Marshal.Alloc (free, malloc)
import Foreign.Ptr (nullPtr, plusPtr)
import Foreign.Storable (peek) import Foreign.Storable (peek)
import qualified Network.GRPC.Unsafe as C import qualified Network.GRPC.Unsafe as C
import qualified Network.GRPC.Unsafe.Constants as C import qualified Network.GRPC.Unsafe.Constants as C
@ -45,98 +47,7 @@ import System.Timeout (timeout)
import Network.GRPC.LowLevel.Call import Network.GRPC.LowLevel.Call
import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.GRPC
import Network.GRPC.LowLevel.CompletionQueue.Internal
-- NOTE: the concurrency requirements for a CompletionQueue are a little
-- complicated. There are two read operations: next and pluck. We can either
-- call next on a CQ or call pluck up to 'maxCompletionQueuePluckers' times
-- concurrently, but we can't mix next and pluck calls. Fortunately, we only
-- need to use next when we are shutting down the queue. Thus, we do two things
-- to shut down:
-- 1. Set the shuttingDown 'TVar' to 'True'. When this is set, no new pluck
-- calls will be allowed to start.
-- 2. Wait until no threads are plucking, as counted by 'currentPluckers'.
-- This logic can be seen in 'pluck' and 'shutdownCompletionQueue'.
-- NOTE: There is one more possible race condition: pushing work onto the queue
-- after we begin to shut down.
-- Solution: another counter, which must reach zero before the shutdown
-- can start.
-- TODO: 'currentPushers' currently imposes an arbitrary limit on the number of
-- concurrent pushers to the CQ, but I don't know what the limit should be set
-- to. I haven't found any documentation that suggests there is a limit imposed
-- by the gRPC library, but there might be. Need to investigate further.
-- | Wraps the state necessary to use a gRPC completion queue. Completion queues
-- are used to wait for batches gRPC operations ('Op's) to finish running, as
-- well as wait for various other operations, such as server shutdown, pinging,
-- checking to see if we've been disconnected, and so forth.
data CompletionQueue = CompletionQueue {unsafeCQ :: C.CompletionQueue,
-- ^ All access to this field must be
-- guarded by a check of 'shuttingDown'.
currentPluckers :: TVar Int,
-- ^ Used to limit the number of
-- concurrent calls to pluck on this
-- queue.
-- The max value is set by gRPC in
-- 'C.maxCompletionQueuePluckers'
currentPushers :: TVar Int,
-- ^ Used to prevent new work from
-- being pushed onto the queue when
-- the queue begins to shut down.
shuttingDown :: TVar Bool,
-- ^ Used to prevent new pluck calls on
-- the queue when the queue begins to
-- shut down.
nextTag :: IORef Int
-- ^ Used to supply unique tags for work
-- items pushed onto the queue.
}
-- | Create a new 'C.Tag' for identifying work items on the 'CompletionQueue'.
-- This will eventually wrap around after reaching @maxBound :: Int@, but from a
-- practical perspective, that should be safe.
newTag :: CompletionQueue -> IO C.Tag
newTag CompletionQueue{..} = do
i <- atomicModifyIORef' nextTag (\i -> (i+1,i))
return $ C.Tag $ plusPtr nullPtr i
maxWorkPushers :: Int
maxWorkPushers = 100 --TODO: figure out what this should be.
data CQOpType = Push | Pluck deriving (Show, Eq, Enum)
getCount :: CQOpType -> CompletionQueue -> TVar Int
getCount Push = currentPushers
getCount Pluck = currentPluckers
getLimit :: CQOpType -> Int
getLimit Push = maxWorkPushers
getLimit Pluck = C.maxCompletionQueuePluckers
-- | Safely brackets an operation that pushes work onto or plucks results from
-- the given 'CompletionQueue'.
withPermission :: CQOpType
-> CompletionQueue
-> IO (Either GRPCIOError a)
-> IO (Either GRPCIOError a)
withPermission op cq f =
bracket acquire release doOp
where acquire = atomically $ do
isShuttingDown <- readTVar (shuttingDown cq)
if isShuttingDown
then return False
else do currCount <- readTVar $ getCount op cq
if currCount < getLimit op
then modifyTVar' (getCount op cq) (+1) >> return True
else retry
doOp gotResource = if gotResource
then f
else return $ Left GRPCIOShutdown
release gotResource =
if gotResource
then atomically $ modifyTVar' (getCount op cq) (subtract 1)
else return ()
withCompletionQueue :: GRPC -> (CompletionQueue -> IO a) -> IO a withCompletionQueue :: GRPC -> (CompletionQueue -> IO a) -> IO a
withCompletionQueue grpc = bracket (createCompletionQueue grpc) withCompletionQueue grpc = bracket (createCompletionQueue grpc)
@ -151,35 +62,6 @@ createCompletionQueue _ = do
nextTag <- newIORef minBound nextTag <- newIORef minBound
return $ CompletionQueue{..} return $ CompletionQueue{..}
type TimeoutSeconds = Int
-- | Translate 'C.Event' to an error. The caller is responsible for ensuring
-- that the event actually corresponds to an error condition; a successful event
-- will be translated to a 'GRPCIOUnknownError'.
eventToError :: C.Event -> (Either GRPCIOError a)
eventToError (C.Event C.QueueShutdown _ _) = Left GRPCIOShutdown
eventToError (C.Event C.QueueTimeout _ _) = Left GRPCIOTimeout
eventToError _ = Left GRPCIOUnknownError
-- | Returns true iff the given grpc_event was a success.
isEventSuccessful :: C.Event -> Bool
isEventSuccessful (C.Event C.OpComplete True _) = True
isEventSuccessful _ = False
-- | Waits for the given number of seconds for the given tag to appear on the
-- completion queue. Throws 'GRPCIOShutdown' if the completion queue is shutting
-- down and cannot handle new requests.
pluck :: CompletionQueue -> C.Tag -> TimeoutSeconds
-> IO (Either GRPCIOError ())
pluck cq@CompletionQueue{..} tag waitSeconds = do
grpcDebug $ "pluck: called with tag: " ++ show tag
++ " and wait: " ++ show waitSeconds
withPermission Pluck cq $ do
C.withDeadlineSeconds waitSeconds $ \deadline -> do
ev <- C.grpcCompletionQueuePluck unsafeCQ tag deadline C.reserved
grpcDebug $ "pluck: finished. Event: " ++ show ev
return $ if isEventSuccessful ev then Right () else eventToError ev
-- TODO: I'm thinking it might be easier to use 'Either' uniformly everywhere -- TODO: I'm thinking it might be easier to use 'Either' uniformly everywhere
-- even when it's isomorphic to 'Maybe'. If that doesn't turn out to be the -- even when it's isomorphic to 'Maybe'. If that doesn't turn out to be the
-- case, switch these to 'Maybe'. -- case, switch these to 'Maybe'.
@ -222,14 +104,17 @@ shutdownCompletionQueue (CompletionQueue{..}) = do
C.QueueTimeout -> drainLoop C.QueueTimeout -> drainLoop
C.OpComplete -> drainLoop C.OpComplete -> drainLoop
channelCreateRegisteredCall :: C.Channel -> C.Call -> C.PropagationMask channelCreateCall :: C.Channel
-> CompletionQueue -> C.CallHandle -> C.Call
-> C.PropagationMask
-> CompletionQueue
-> C.CallHandle
-> C.CTimeSpecPtr -> C.CTimeSpecPtr
-> IO (Either GRPCIOError ClientCall) -> IO (Either GRPCIOError ClientCall)
channelCreateRegisteredCall channelCreateCall
chan parent mask cq@CompletionQueue{..} handle deadline = chan parent mask cq@CompletionQueue{..} handle deadline =
withPermission Push cq $ do withPermission Push cq $ do
grpcDebug $ "channelCreateRegisteredCall: call with " grpcDebug $ "channelCreateCall: call with "
++ concat (intersperse " " [show chan, show parent, show mask, ++ concat (intersperse " " [show chan, show parent, show mask,
show unsafeCQ, show handle, show unsafeCQ, show handle,
show deadline]) show deadline])
@ -237,25 +122,14 @@ channelCreateRegisteredCall
handle deadline C.reserved handle deadline C.reserved
return $ Right $ ClientCall call return $ Right $ ClientCall call
channelCreateCall :: C.Channel
-> C.Call
-> C.PropagationMask
-> CompletionQueue
-> MethodName
-> Endpoint
-> C.CTimeSpecPtr
-> IO (Either GRPCIOError ClientCall)
channelCreateCall chan parent mask cq@CompletionQueue{..} meth endpt deadline =
withPermission Push cq $ do
call <- C.grpcChannelCreateCall chan parent mask unsafeCQ
(unMethodName meth) (unEndpoint endpt) deadline C.reserved
return $ Right $ ClientCall call
-- | Create the call object to handle a registered call. -- | Create the call object to handle a registered call.
serverRequestRegisteredCall :: C.Server -> CompletionQueue -> TimeoutSeconds serverRequestCall :: C.Server
-> RegisteredMethod -> MetadataMap -> CompletionQueue
-> IO (Either GRPCIOError ServerRegCall) -> TimeoutSeconds
serverRequestRegisteredCall -> RegisteredMethod
-> MetadataMap
-> IO (Either GRPCIOError ServerCall)
serverRequestCall
server cq@CompletionQueue{..} timeLimit RegisteredMethod{..} initMeta = server cq@CompletionQueue{..} timeLimit RegisteredMethod{..} initMeta =
withPermission Push cq $ do withPermission Push cq $ do
-- TODO: Is gRPC supposed to populate this deadline? -- TODO: Is gRPC supposed to populate this deadline?
@ -278,81 +152,33 @@ serverRequestRegisteredCall
callError <- C.grpcServerRequestRegisteredCall callError <- C.grpcServerRequestRegisteredCall
server methodHandle callPtr deadline server methodHandle callPtr deadline
metadataArray bbPtr unsafeCQ unsafeCQ tag metadataArray bbPtr unsafeCQ unsafeCQ tag
grpcDebug $ "serverRequestRegisteredCall: callError: " grpcDebug $ "serverRequestCall(R): callError: "
++ show callError ++ show callError
if callError /= C.CallOk if callError /= C.CallOk
then do grpcDebug "serverRequestRegisteredCall: callError. cleaning up" then do grpcDebug "serverRequestCall(R): callError. cleaning up"
failureCleanup deadline callPtr metadataArrayPtr bbPtr failureCleanup deadline callPtr metadataArrayPtr bbPtr
return $ Left $ GRPCIOCallError callError return $ Left $ GRPCIOCallError callError
else do pluckResult <- pluck cq tag timeLimit else do pluckResult <- pluck cq tag timeLimit
grpcDebug "serverRequestRegisteredCall: finished pluck." grpcDebug "serverRequestCall(R): finished pluck."
case pluckResult of case pluckResult of
Left x -> do Left x -> do
grpcDebug "serverRequestRegisteredCall: cleanup pluck err" grpcDebug "serverRequestCall(R): cleanup pluck err"
failureCleanup deadline callPtr metadataArrayPtr bbPtr failureCleanup deadline callPtr metadataArrayPtr bbPtr
return $ Left x return $ Left x
Right () -> do Right () -> do
rawCall <- peek callPtr rawCall <- peek callPtr
let assembledCall = ServerRegCall rawCall metadataArrayPtr let assembledCall = ServerCall rawCall metadataArrayPtr
bbPtr Nothing deadline bbPtr Nothing deadline
return $ Right assembledCall return $ Right assembledCall
-- TODO: see TODO for failureCleanup in serverRequestCall. -- TODO: see TODO for failureCleanup in serverRequestCall.
where failureCleanup deadline callPtr metadataArrayPtr bbPtr = forkIO $ do where failureCleanup deadline callPtr metadataArrayPtr bbPtr = forkIO $ do
threadDelaySecs 30 threadDelaySecs 30
grpcDebug "serverRequestRegisteredCall: doing delayed cleanup." grpcDebug "serverRequestCall(R): doing delayed cleanup."
C.timespecDestroy deadline C.timespecDestroy deadline
free callPtr free callPtr
C.metadataArrayDestroy metadataArrayPtr C.metadataArrayDestroy metadataArrayPtr
free bbPtr free bbPtr
serverRequestCall :: C.Server -> CompletionQueue -> TimeoutSeconds
-> IO (Either GRPCIOError ServerUnregCall)
serverRequestCall server cq@CompletionQueue{..} timeLimit =
withPermission Push cq $ do
callPtr <- malloc
grpcDebug $ "serverRequestCall: callPtr is " ++ show callPtr
callDetails <- C.createCallDetails
metadataArrayPtr <- C.metadataArrayCreate
metadataArray <- peek metadataArrayPtr
tag <- newTag cq
callError <- C.grpcServerRequestCall server callPtr callDetails
metadataArray unsafeCQ unsafeCQ tag
grpcDebug $ "serverRequestCall: callError was " ++ show callError
if callError /= C.CallOk
then do grpcDebug "serverRequestCall: got call error; cleaning up."
failureCleanup callPtr callDetails metadataArrayPtr
return $ Left $ GRPCIOCallError callError
else do pluckResult <- pluck cq tag timeLimit
grpcDebug $ "serverRequestCall: pluckResult was "
++ show pluckResult
case pluckResult of
Left x -> do
grpcDebug "serverRequestCall: pluck error; cleaning up."
failureCleanup callPtr callDetails
metadataArrayPtr
return $ Left x
Right () -> do
rawCall <- peek callPtr
let call = ServerUnregCall rawCall
metadataArrayPtr
Nothing
callDetails
return $ Right call
--TODO: the gRPC library appears to hold onto these pointers for a random
-- amount of time, even after returning from the only call that uses them.
-- This results in malloc errors if
-- gRPC tries to modify them after we free them. To work around it,
-- we sleep for a while before freeing the objects. We should find a
-- permanent solution that's more robust.
where failureCleanup callPtr callDetails metadataArrayPtr = forkIO $ do
threadDelaySecs 30
grpcDebug "serverRequestCall: doing delayed cleanup."
free callPtr
C.destroyCallDetails callDetails
C.metadataArrayDestroy metadataArrayPtr
return ()
-- | Register the server's completion queue. Must be done before the server is -- | Register the server's completion queue. Must be done before the server is
-- started. -- started.
serverRegisterCompletionQueue :: C.Server -> CompletionQueue -> IO () serverRegisterCompletionQueue :: C.Server -> CompletionQueue -> IO ()
@ -362,6 +188,3 @@ serverRegisterCompletionQueue server CompletionQueue{..} =
serverShutdownAndNotify :: C.Server -> CompletionQueue -> C.Tag -> IO () serverShutdownAndNotify :: C.Server -> CompletionQueue -> C.Tag -> IO ()
serverShutdownAndNotify server CompletionQueue{..} tag = serverShutdownAndNotify server CompletionQueue{..} tag =
C.grpcServerShutdownAndNotify server unsafeCQ tag C.grpcServerShutdownAndNotify server unsafeCQ tag
threadDelaySecs :: Int -> IO ()
threadDelaySecs = threadDelay . (* 10^(6::Int))

View file

@ -0,0 +1,134 @@
{-# LANGUAGE RecordWildCards #-}
module Network.GRPC.LowLevel.CompletionQueue.Internal where
import Control.Concurrent.STM (atomically, retry)
import Control.Concurrent.STM.TVar (TVar, modifyTVar', readTVar)
import Control.Exception (bracket)
import Data.IORef (IORef, atomicModifyIORef')
import Foreign.Ptr (nullPtr, plusPtr)
import Network.GRPC.LowLevel.GRPC
import qualified Network.GRPC.Unsafe as C
import qualified Network.GRPC.Unsafe.Constants as C
import qualified Network.GRPC.Unsafe.Time as C
-- NOTE: the concurrency requirements for a CompletionQueue are a little
-- complicated. There are two read operations: next and pluck. We can either
-- call next on a CQ or call pluck up to 'maxCompletionQueuePluckers' times
-- concurrently, but we can't mix next and pluck calls. Fortunately, we only
-- need to use next when we are shutting down the queue. Thus, we do two things
-- to shut down:
-- 1. Set the shuttingDown 'TVar' to 'True'. When this is set, no new pluck
-- calls will be allowed to start.
-- 2. Wait until no threads are plucking, as counted by 'currentPluckers'.
-- This logic can be seen in 'pluck' and 'shutdownCompletionQueue'.
-- NOTE: There is one more possible race condition: pushing work onto the queue
-- after we begin to shut down.
-- Solution: another counter, which must reach zero before the shutdown
-- can start.
-- TODO: 'currentPushers' currently imposes an arbitrary limit on the number of
-- concurrent pushers to the CQ, but I don't know what the limit should be set
-- to. I haven't found any documentation that suggests there is a limit imposed
-- by the gRPC library, but there might be. Need to investigate further.
-- | Wraps the state necessary to use a gRPC completion queue. Completion queues
-- are used to wait for batches gRPC operations ('Op's) to finish running, as
-- well as wait for various other operations, such as server shutdown, pinging,
-- checking to see if we've been disconnected, and so forth.
data CompletionQueue = CompletionQueue {unsafeCQ :: C.CompletionQueue,
-- ^ All access to this field must be
-- guarded by a check of 'shuttingDown'.
currentPluckers :: TVar Int,
-- ^ Used to limit the number of
-- concurrent calls to pluck on this
-- queue.
-- The max value is set by gRPC in
-- 'C.maxCompletionQueuePluckers'
currentPushers :: TVar Int,
-- ^ Used to prevent new work from
-- being pushed onto the queue when
-- the queue begins to shut down.
shuttingDown :: TVar Bool,
-- ^ Used to prevent new pluck calls on
-- the queue when the queue begins to
-- shut down.
nextTag :: IORef Int
-- ^ Used to supply unique tags for work
-- items pushed onto the queue.
}
type TimeoutSeconds = Int
data CQOpType = Push | Pluck deriving (Show, Eq, Enum)
-- | Create a new 'C.Tag' for identifying work items on the 'CompletionQueue'.
-- This will eventually wrap around after reaching @maxBound :: Int@, but from a
-- practical perspective, that should be safe.
newTag :: CompletionQueue -> IO C.Tag
newTag CompletionQueue{..} = do
i <- atomicModifyIORef' nextTag (\i -> (i+1,i))
return $ C.Tag $ plusPtr nullPtr i
-- | Safely brackets an operation that pushes work onto or plucks results from
-- the given 'CompletionQueue'.
withPermission :: CQOpType
-> CompletionQueue
-> IO (Either GRPCIOError a)
-> IO (Either GRPCIOError a)
withPermission op cq f =
bracket acquire release doOp
where acquire = atomically $ do
isShuttingDown <- readTVar (shuttingDown cq)
if isShuttingDown
then return False
else do currCount <- readTVar $ getCount op cq
if currCount < getLimit op
then modifyTVar' (getCount op cq) (+1) >> return True
else retry
doOp gotResource = if gotResource
then f
else return $ Left GRPCIOShutdown
release gotResource =
if gotResource
then atomically $ modifyTVar' (getCount op cq) (subtract 1)
else return ()
-- | Waits for the given number of seconds for the given tag to appear on the
-- completion queue. Throws 'GRPCIOShutdown' if the completion queue is shutting
-- down and cannot handle new requests.
pluck :: CompletionQueue -> C.Tag -> TimeoutSeconds
-> IO (Either GRPCIOError ())
pluck cq@CompletionQueue{..} tag waitSeconds = do
grpcDebug $ "pluck: called with tag: " ++ show tag
++ " and wait: " ++ show waitSeconds
withPermission Pluck cq $ do
C.withDeadlineSeconds waitSeconds $ \deadline -> do
ev <- C.grpcCompletionQueuePluck unsafeCQ tag deadline C.reserved
grpcDebug $ "pluck: finished. Event: " ++ show ev
return $ if isEventSuccessful ev then Right () else eventToError ev
-- | Translate 'C.Event' to an error. The caller is responsible for ensuring
-- that the event actually corresponds to an error condition; a successful event
-- will be translated to a 'GRPCIOUnknownError'.
eventToError :: C.Event -> (Either GRPCIOError a)
eventToError (C.Event C.QueueShutdown _ _) = Left GRPCIOShutdown
eventToError (C.Event C.QueueTimeout _ _) = Left GRPCIOTimeout
eventToError _ = Left GRPCIOUnknownError
-- | Returns true iff the given grpc_event was a success.
isEventSuccessful :: C.Event -> Bool
isEventSuccessful (C.Event C.OpComplete True _) = True
isEventSuccessful _ = False
maxWorkPushers :: Int
maxWorkPushers = 100 --TODO: figure out what this should be.
getCount :: CQOpType -> CompletionQueue -> TVar Int
getCount Push = currentPushers
getCount Pluck = currentPluckers
getLimit :: CQOpType -> Int
getLimit Push = maxWorkPushers
getLimit Pluck = C.maxCompletionQueuePluckers

View file

@ -0,0 +1,80 @@
{-# LANGUAGE RecordWildCards #-}
module Network.GRPC.LowLevel.CompletionQueue.Unregistered where
import Control.Concurrent (forkIO)
import Foreign.Marshal.Alloc (free, malloc)
import Foreign.Storable (peek)
import Network.GRPC.LowLevel.Call
import qualified Network.GRPC.LowLevel.Call.Unregistered as U
import Network.GRPC.LowLevel.CompletionQueue.Internal
import Network.GRPC.LowLevel.GRPC
import qualified Network.GRPC.Unsafe as C
import qualified Network.GRPC.Unsafe.Constants as C
import qualified Network.GRPC.Unsafe.Metadata as C
import qualified Network.GRPC.Unsafe.Time as C
channelCreateCall :: C.Channel
-> C.Call
-> C.PropagationMask
-> CompletionQueue
-> MethodName
-> Endpoint
-> C.CTimeSpecPtr
-> IO (Either GRPCIOError ClientCall)
channelCreateCall chan parent mask cq@CompletionQueue{..} meth endpt deadline =
withPermission Push cq $ do
call <- C.grpcChannelCreateCall chan parent mask unsafeCQ
(unMethodName meth) (unEndpoint endpt) deadline C.reserved
return $ Right $ ClientCall call
serverRequestCall :: C.Server
-> CompletionQueue
-> TimeoutSeconds
-> IO (Either GRPCIOError U.ServerCall)
serverRequestCall server cq@CompletionQueue{..} timeLimit =
withPermission Push cq $ do
callPtr <- malloc
grpcDebug $ "serverRequestCall: callPtr is " ++ show callPtr
callDetails <- C.createCallDetails
metadataArrayPtr <- C.metadataArrayCreate
metadataArray <- peek metadataArrayPtr
tag <- newTag cq
callError <- C.grpcServerRequestCall server callPtr callDetails
metadataArray unsafeCQ unsafeCQ tag
grpcDebug $ "serverRequestCall: callError was " ++ show callError
if callError /= C.CallOk
then do grpcDebug "serverRequestCall: got call error; cleaning up."
failureCleanup callPtr callDetails metadataArrayPtr
return $ Left $ GRPCIOCallError callError
else do pluckResult <- pluck cq tag timeLimit
grpcDebug $ "serverRequestCall: pluckResult was "
++ show pluckResult
case pluckResult of
Left x -> do
grpcDebug "serverRequestCall: pluck error; cleaning up."
failureCleanup callPtr callDetails
metadataArrayPtr
return $ Left x
Right () -> do
rawCall <- peek callPtr
let call = U.ServerCall rawCall
metadataArrayPtr
Nothing
callDetails
return $ Right call
--TODO: the gRPC library appears to hold onto these pointers for a random
-- amount of time, even after returning from the only call that uses them.
-- This results in malloc errors if
-- gRPC tries to modify them after we free them. To work around it,
-- we sleep for a while before freeing the objects. We should find a
-- permanent solution that's more robust.
where failureCleanup callPtr callDetails metadataArrayPtr = forkIO $ do
threadDelaySecs 30
grpcDebug "serverRequestCall: doing delayed cleanup."
free callPtr
C.destroyCallDetails callDetails
C.metadataArrayDestroy metadataArrayPtr
return ()

View file

@ -3,12 +3,7 @@
{-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE MultiParamTypeClasses #-}
module Network.GRPC.LowLevel.GRPC where module Network.GRPC.LowLevel.GRPC where
{- import Control.Concurrent (threadDelay)
-- TODO: remove if not needed
import Control.Monad.IO.Class (liftIO, MonadIO)
import Control.Monad.Except (ExceptT(..), runExceptT, throwError,
MonadError)
-}
import Control.Exception import Control.Exception
import qualified Data.ByteString as B import qualified Data.ByteString as B
import qualified Data.Map as M import qualified Data.Map as M
@ -63,6 +58,9 @@ grpcDebug str = do tid <- myThreadId
grpcDebug _ = return () grpcDebug _ = return ()
#endif #endif
threadDelaySecs :: Int -> IO ()
threadDelaySecs = threadDelay . (* 10^(6::Int))
{- {-
-- TODO: remove this once finally decided on whether to use it. -- TODO: remove this once finally decided on whether to use it.
-- | Monad for running gRPC operations. -- | Monad for running gRPC operations.

View file

@ -7,22 +7,20 @@ import Control.Exception
import qualified Data.ByteString as B import qualified Data.ByteString as B
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes) import Data.Maybe (catMaybes)
import Data.String (IsString)
import Foreign.C.String (CString) import Foreign.C.String (CString)
import Foreign.C.Types (CInt) import Foreign.C.Types (CInt)
import Foreign.Marshal.Alloc (free, malloc, import Foreign.Marshal.Alloc (free, malloc,
mallocBytes) mallocBytes)
import Foreign.Ptr (Ptr, nullPtr) import Foreign.Ptr (Ptr, nullPtr)
import Foreign.Storable (peek, poke) import Foreign.Storable (peek, poke)
import Network.GRPC.LowLevel.Call
import Network.GRPC.LowLevel.CompletionQueue
import Network.GRPC.LowLevel.GRPC
import qualified Network.GRPC.Unsafe as C (Call) import qualified Network.GRPC.Unsafe as C (Call)
import qualified Network.GRPC.Unsafe.ByteBuffer as C import qualified Network.GRPC.Unsafe.ByteBuffer as C
import qualified Network.GRPC.Unsafe.Metadata as C import qualified Network.GRPC.Unsafe.Metadata as C
import qualified Network.GRPC.Unsafe.Op as C import qualified Network.GRPC.Unsafe.Op as C
import Network.GRPC.LowLevel.Call
import Network.GRPC.LowLevel.CompletionQueue
import Network.GRPC.LowLevel.GRPC
-- | Sum describing all possible send and receive operations that can be batched -- | Sum describing all possible send and receive operations that can be batched
-- and executed by gRPC. Usually these are processed in a handful of -- and executed by gRPC. Usually these are processed in a handful of
-- combinations depending on the 'MethodType' of the call being run. -- combinations depending on the 'MethodType' of the call being run.
@ -183,20 +181,36 @@ resultFromOpContext _ = do
grpcDebug "resultFromOpContext: saw non-result op type." grpcDebug "resultFromOpContext: saw non-result op type."
return Nothing return Nothing
--TODO: the list of 'Op's type is less specific than it could be. There are only -- | For a given call, run the given 'Op's on the given completion queue with
-- a few different sequences of 'Op's we will see in practice. Once we figure -- the given tag. Blocks until the ops are complete or the given number of
-- out what those are, we should create a more specific sum type. However, since -- seconds have elapsed. TODO: now that we distinguish between different types
-- ops can fail, the list of 'OpRecvResult' returned by 'runOps' can vary in -- of calls at the type level, we could try to limit the input 'Op's more
-- their contents and are perhaps less amenable to simplification. -- appropriately. E.g., we don't use an 'OpRecvInitialMetadata' when receiving a
-- In the meantime, from looking at the core tests, it looks like it is safe to -- registered call, because gRPC handles that for us.
-- say that we always get a GRPC_CALL_ERROR_TOO_MANY_OPERATIONS error if we use
-- the same 'Op' twice in the same batch, so we might want to change the list to -- TODO: the list of 'Op's type is less specific than it could be. There are
-- a set. I don't think order matters within a batch. Need to check. -- only a few different sequences of 'Op's we will see in practice. Once we
-- figure out what those are, we should create a more specific sum
-- type. However, since ops can fail, the list of 'OpRecvResult' returned by
-- 'runOps' can vary in their contents and are perhaps less amenable to
-- simplification. In the meantime, from looking at the core tests, it looks
-- like it is safe to say that we always get a
-- GRPC_CALL_ERROR_TOO_MANY_OPERATIONS error if we use the same 'Op' twice in
-- the same batch, so we might want to change the list to a set. I don't think
-- order matters within a batch. Need to check.
runOps :: C.Call runOps :: C.Call
-- ^ 'Call' that this batch is associated with. One call can be
-- associated with many batches.
-> CompletionQueue -> CompletionQueue
-- ^ Queue on which our tag will be placed once our ops are done
-- running.
-> [Op] -> [Op]
-- ^ The list of 'Op's to execute.
-> TimeoutSeconds -> TimeoutSeconds
-- ^ How long to block waiting for the tag to appear on the queue. If
-- we time out, the result of this action will be @CallBatchError
-- BatchTimeout@.
-> IO (Either GRPCIOError [OpRecvResult]) -> IO (Either GRPCIOError [OpRecvResult])
runOps call cq ops timeLimit = runOps call cq ops timeLimit =
let l = length ops in let l = length ops in
@ -220,48 +234,11 @@ runOps call cq ops timeLimit =
fmap (Right . catMaybes) $ mapM resultFromOpContext contexts fmap (Right . catMaybes) $ mapM resultFromOpContext contexts
Left err -> return $ Left err Left err -> return $ Left err
-- | For a given call, run the given 'Op's on the given completion queue with
-- the given tag. Blocks until the ops are complete or the given number of
-- seconds have elapsed.
-- TODO: now that 'ServerRegCall' and 'ServerUnregCall' are separate types, we
-- could try to limit the input 'Op's more appropriately. E.g., we don't use
-- an 'OpRecvInitialMetadata' when receiving a registered call, because gRPC
-- handles that for us.
runServerRegOps :: ServerRegCall
-- ^ 'Call' that this batch is associated with. One call can be
-- associated with many batches.
-> CompletionQueue
-- ^ Queue on which our tag will be placed once our ops are done
-- running.
-> [Op]
-- ^ The list of 'Op's to execute.
-> TimeoutSeconds
-- ^ How long to block waiting for the tag to appear on the
--queue. If we time out, the result of this action will be
-- @CallBatchError BatchTimeout@.
-> IO (Either GRPCIOError [OpRecvResult])
runServerRegOps = runOps . internalServerRegCall
runServerUnregOps :: ServerUnregCall
-> CompletionQueue
-> [Op]
-> TimeoutSeconds
-> IO (Either GRPCIOError [OpRecvResult])
runServerUnregOps = runOps . internalServerUnregCall
-- | Like 'runServerOps', but for client-side calls.
runClientOps :: ClientCall
-> CompletionQueue
-> [Op]
-> TimeoutSeconds
-> IO (Either GRPCIOError [OpRecvResult])
runClientOps = runOps . internalClientCall
-- | If response status info is present in the given 'OpRecvResult's, returns -- | If response status info is present in the given 'OpRecvResult's, returns
-- a tuple of trailing metadata, status code, and status details. -- a tuple of trailing metadata, status code, and status details.
extractStatusInfo :: [OpRecvResult] extractStatusInfo :: [OpRecvResult]
-> Maybe (MetadataMap, C.StatusCode, B.ByteString) -> Maybe (MetadataMap, C.StatusCode, B.ByteString)
extractStatusInfo [] = Nothing extractStatusInfo [] = Nothing
extractStatusInfo (res@(OpRecvStatusOnClientResult meta code details):_) = extractStatusInfo (OpRecvStatusOnClientResult meta code details:_) =
Just (meta, code, details) Just (meta, code, details)
extractStatusInfo (_:xs) = extractStatusInfo xs extractStatusInfo (_:xs) = extractStatusInfo xs

View file

@ -1,15 +1,14 @@
{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE RecordWildCards #-}
-- | This module defines data structures and operations pertaining to registered
-- servers using registered calls; for unregistered support, see
-- `Network.GRPC.LowLevel.Server.Unregistered`.
module Network.GRPC.LowLevel.Server where module Network.GRPC.LowLevel.Server where
import Control.Concurrent (threadDelay)
import Control.Exception (bracket, finally) import Control.Exception (bracket, finally)
import Control.Monad import Control.Monad
import Data.ByteString (ByteString) import Data.ByteString (ByteString)
import Foreign.Ptr (nullPtr) import Foreign.Ptr (nullPtr)
import qualified Network.GRPC.Unsafe as C
import qualified Network.GRPC.Unsafe.Op as C
import Network.GRPC.LowLevel.Call import Network.GRPC.LowLevel.Call
import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue, import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue,
TimeoutSeconds, TimeoutSeconds,
@ -17,11 +16,12 @@ import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue,
pluck, pluck,
serverRegisterCompletionQueue, serverRegisterCompletionQueue,
serverRequestCall, serverRequestCall,
serverRequestRegisteredCall,
serverShutdownAndNotify, serverShutdownAndNotify,
shutdownCompletionQueue) shutdownCompletionQueue)
import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.GRPC
import Network.GRPC.LowLevel.Op import Network.GRPC.LowLevel.Op
import qualified Network.GRPC.Unsafe as C
import qualified Network.GRPC.Unsafe.Op as C
-- | Wraps various gRPC state needed to run a server. -- | Wraps various gRPC state needed to run a server.
data Server = Server data Server = Server
@ -122,40 +122,27 @@ serverRegisterMethod _ _ _ _ = error "Streaming methods not implemented yet."
-- | Create a 'Call' with which to wait for the invocation of a registered -- | Create a 'Call' with which to wait for the invocation of a registered
-- method. -- method.
serverCreateRegisteredCall :: Server -> RegisteredMethod -> TimeoutSeconds serverCreateCall :: Server
-> RegisteredMethod
-> TimeoutSeconds
-> MetadataMap -> MetadataMap
-> IO (Either GRPCIOError ServerRegCall) -> IO (Either GRPCIOError ServerCall)
serverCreateRegisteredCall Server{..} rm timeLimit initMeta = serverCreateCall Server{..} rm timeLimit initMeta =
serverRequestRegisteredCall internalServer serverCQ timeLimit rm initMeta serverRequestCall internalServer serverCQ timeLimit rm initMeta
withServerRegisteredCall :: Server -> RegisteredMethod -> TimeoutSeconds withServerCall :: Server
-> RegisteredMethod
-> TimeoutSeconds
-> MetadataMap -> MetadataMap
-> (ServerRegCall -> (ServerCall -> IO (Either GRPCIOError a))
-> IO (Either GRPCIOError a))
-> IO (Either GRPCIOError a) -> IO (Either GRPCIOError a)
withServerRegisteredCall server regmethod timeout initMeta f = do withServerCall server regmethod timeout initMeta f = do
createResult <- serverCreateRegisteredCall server regmethod timeout initMeta createResult <- serverCreateCall server regmethod timeout initMeta
case createResult of case createResult of
Left x -> return $ Left x Left x -> return $ Left x
Right call -> f call `finally` logDestroy call Right call -> f call `finally` logDestroy call
where logDestroy c = grpcDebug "withServerRegisteredCall: destroying." where logDestroy c = grpcDebug "withServerRegisteredCall: destroying."
>> destroyServerRegCall c >> destroyServerCall c
serverCreateUnregCall :: Server -> TimeoutSeconds
-> IO (Either GRPCIOError ServerUnregCall)
serverCreateUnregCall Server{..} timeLimit =
serverRequestCall internalServer serverCQ timeLimit
withServerUnregCall :: Server -> TimeoutSeconds
-> (ServerUnregCall -> IO (Either GRPCIOError a))
-> IO (Either GRPCIOError a)
withServerUnregCall server timeout f = do
createResult <- serverCreateUnregCall server timeout
case createResult of
Left x -> return $ Left x
Right call -> f call `finally` logDestroy call
where logDestroy c = grpcDebug "withServerCall: destroying."
>> destroyServerUnregCall c
-- | Sequence of 'Op's needed to receive a normal (non-streaming) call. -- | Sequence of 'Op's needed to receive a normal (non-streaming) call.
serverOpsGetNormalCall :: MetadataMap -> [Op] serverOpsGetNormalCall :: MetadataMap -> [Op]
@ -189,84 +176,49 @@ serverOpsSendNormalRegisteredResponse
OpSendMessage body, OpSendMessage body,
OpSendStatusFromServer trailingMeta code details] OpSendStatusFromServer trailingMeta code details]
-- | A handler for an registered server call; bytestring parameter is request
-- body, with the bytestring response body in the result tuple. The first
-- metadata parameter refers to the request metadata, with the two metadata
-- values in the result tuple being the initial and trailing metadata
-- respectively.
-- TODO: make a more rigid type for this with a Maybe MetadataMap for the
-- trailing meta, and use it for both kinds of call handlers.
type ServerHandler
= ByteString -> MetadataMap
-> IO (ByteString, MetadataMap, MetadataMap, StatusDetails)
-- TODO: we will want to replace this with some more general concept that also -- TODO: we will want to replace this with some more general concept that also
-- works with streaming calls in the future. -- works with streaming calls in the future.
-- | Wait for and then handle a normal (non-streaming) call. -- | Wait for and then handle a normal (non-streaming) call.
serverHandleNormalRegisteredCall :: Server serverHandleNormalCall :: Server
-> RegisteredMethod -> RegisteredMethod
-> TimeoutSeconds -> TimeoutSeconds
-> MetadataMap -> MetadataMap
-- ^ Initial server metadata -- ^ Initial server metadata
-> (ByteString -> MetadataMap -> ServerHandler
-> IO (ByteString,
MetadataMap,
MetadataMap,
StatusDetails))
-- ^ Handler function takes a request body and
-- metadata and returns a response body and
-- metadata.
-> IO (Either GRPCIOError ()) -> IO (Either GRPCIOError ())
serverHandleNormalRegisteredCall s@Server{..} rm timeLimit srvMetadata f = do serverHandleNormalCall s@Server{..} rm timeLimit srvMetadata f = do
-- TODO: we use this timeLimit twice, so the max time spent is 2*timeLimit. -- TODO: we use this timeLimit twice, so the max time spent is 2*timeLimit.
-- Should we just hard-code time limits instead? Not sure if client -- Should we just hard-code time limits instead? Not sure if client
-- programmer cares, since this function will likely just be put in a loop -- programmer cares, since this function will likely just be put in a loop
-- anyway. -- anyway.
withServerRegisteredCall s rm timeLimit srvMetadata $ \call -> do withServerCall s rm timeLimit srvMetadata $ \call -> do
grpcDebug "serverHandleNormalRegisteredCall: starting batch." grpcDebug "serverHandleNormalCall(R): starting batch."
debugServerRegCall call debugServerCall call
payload <- serverRegCallGetPayload call payload <- serverCallGetPayload call
case payload of case payload of
--TODO: what should we do with an empty payload? Have the handler take --TODO: what should we do with an empty payload? Have the handler take
-- @Maybe ByteString@? Need to figure out when/why payload would be empty. -- @Maybe ByteString@? Need to figure out when/why payload would be empty.
Nothing -> error "serverHandleNormalRegisteredCall: payload empty." Nothing -> error "serverHandleNormalCall(R): payload empty."
Just requestBody -> do Just requestBody -> do
requestMeta <- serverRegCallGetMetadata call requestMeta <- serverCallGetMetadata call
(respBody, initMeta, trailingMeta, details) <- f requestBody requestMeta (respBody, initMeta, trailingMeta, details) <- f requestBody requestMeta
let status = C.GrpcStatusOk let status = C.GrpcStatusOk
let respOps = serverOpsSendNormalRegisteredResponse let respOps = serverOpsSendNormalRegisteredResponse
respBody initMeta trailingMeta status details respBody initMeta trailingMeta status details
respOpsResults <- runServerRegOps call serverCQ respOps timeLimit respOpsResults <- runOps (unServerCall call) serverCQ respOps timeLimit
grpcDebug "serverHandleNormalRegisteredCall: finished response ops." grpcDebug "serverHandleNormalCall(R): finished response ops."
case respOpsResults of case respOpsResults of
Left x -> return $ Left x Left x -> return $ Left x
Right _ -> return $ Right () Right _ -> return $ Right ()
-- TODO: This is preliminary.
-- We still need to provide the method name to the handler.
-- | Handle one unregistered call.
serverHandleNormalCall :: Server -> TimeoutSeconds
-> MetadataMap
-- ^ Initial server metadata.
-> (ByteString -> MetadataMap -> MethodName
-> IO (ByteString, MetadataMap, StatusDetails))
-- ^ Handler function takes a request body and
-- metadata and returns a response body and metadata.
-> IO (Either GRPCIOError ())
serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do
withServerUnregCall s timeLimit $ \call -> do
grpcDebug "serverHandleNormalCall: starting batch."
let recvOps = serverOpsGetNormalCall srvMetadata
opResults <- runServerUnregOps call serverCQ recvOps timeLimit
case opResults of
Left x -> do grpcDebug "serverHandleNormalCall: ops failed; aborting"
return $ Left x
Right [OpRecvMessageResult (Just body)] -> do
requestMeta <- serverUnregCallGetMetadata call
grpcDebug $ "got client metadata: " ++ show requestMeta
methodName <- serverUnregCallGetMethodName call
hostName <- serverUnregCallGetHost call
grpcDebug $ "call_details host is: " ++ show hostName
(respBody, respMetadata, details) <- f body requestMeta methodName
let status = C.GrpcStatusOk
let respOps = serverOpsSendNormalResponse
respBody respMetadata status details
respOpsResults <- runServerUnregOps call serverCQ respOps timeLimit
case respOpsResults of
Left x -> do grpcDebug "serverHandleNormalCall: resp failed."
return $ Left x
Right _ -> grpcDebug "serverHandleNormalCall: ops done."
>> return (Right ())
x -> error $ "impossible pattern match: " ++ show x
_nowarn_unused :: a
_nowarn_unused = undefined threadDelay

View file

@ -0,0 +1,71 @@
{-# LANGUAGE RecordWildCards #-}
module Network.GRPC.LowLevel.Server.Unregistered where
import Control.Exception (finally)
import Data.ByteString (ByteString)
import Network.GRPC.LowLevel.Call (MethodName)
import Network.GRPC.LowLevel.Call.Unregistered
import Network.GRPC.LowLevel.CompletionQueue (TimeoutSeconds)
import Network.GRPC.LowLevel.CompletionQueue.Unregistered (serverRequestCall)
import Network.GRPC.LowLevel.GRPC
import Network.GRPC.LowLevel.Op (OpRecvResult (..), runOps)
import Network.GRPC.LowLevel.Server (Server (..),
serverOpsGetNormalCall,
serverOpsSendNormalResponse)
import qualified Network.GRPC.Unsafe.Op as C
serverCreateCall :: Server -> TimeoutSeconds
-> IO (Either GRPCIOError ServerCall)
serverCreateCall Server{..} timeLimit =
serverRequestCall internalServer serverCQ timeLimit
withServerCall :: Server -> TimeoutSeconds
-> (ServerCall -> IO (Either GRPCIOError a))
-> IO (Either GRPCIOError a)
withServerCall server timeout f = do
createResult <- serverCreateCall server timeout
case createResult of
Left x -> return $ Left x
Right call -> f call `finally` logDestroy call
where logDestroy c = grpcDebug "withServerCall: destroying."
>> destroyServerCall c
-- | A handler for an unregistered server call; bytestring arguments are the
-- request body and response body respectively.
type ServerHandler
= ByteString -> MetadataMap -> MethodName
-> IO (ByteString, MetadataMap, StatusDetails)
-- | Handle one unregistered call.
serverHandleNormalCall :: Server
-> TimeoutSeconds
-> MetadataMap -- ^ Initial server metadata.
-> ServerHandler
-> IO (Either GRPCIOError ())
serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do
withServerCall s timeLimit $ \call -> do
grpcDebug "serverHandleNormalCall(U): starting batch."
let recvOps = serverOpsGetNormalCall srvMetadata
call' = unServerCall call
opResults <- runOps call' serverCQ recvOps timeLimit
case opResults of
Left x -> do grpcDebug "serverHandleNormalCall(U): ops failed; aborting"
return $ Left x
Right [OpRecvMessageResult (Just body)] -> do
requestMeta <- serverCallGetMetadata call
grpcDebug $ "got client metadata: " ++ show requestMeta
methodName <- serverCallGetMethodName call
hostName <- serverCallGetHost call
grpcDebug $ "call_details host is: " ++ show hostName
(respBody, respMetadata, details) <- f body requestMeta methodName
let status = C.GrpcStatusOk
let respOps = serverOpsSendNormalResponse
respBody respMetadata status details
respOpsResults <- runOps call' serverCQ respOps timeLimit
case respOpsResults of
Left x -> do grpcDebug "serverHandleNormalCall(U): resp failed."
return $ Left x
Right _ -> grpcDebug "serverHandleNormalCall(U): ops done."
>> return (Right ())
x -> error $ "impossible pattern match: " ++ show x

View file

@ -11,19 +11,24 @@ import Control.Monad
import Data.ByteString (ByteString) import Data.ByteString (ByteString)
import qualified Data.Map as M import qualified Data.Map as M
import Network.GRPC.LowLevel import Network.GRPC.LowLevel
import qualified Network.GRPC.LowLevel.Client.Unregistered as U
import qualified Network.GRPC.LowLevel.Server.Unregistered as U
import Test.Tasty import Test.Tasty
import Test.Tasty.HUnit as HU (Assertion, assertEqual, import Test.Tasty.HUnit as HU (Assertion,
assertFailure, testCase, (@?=)) assertEqual,
assertFailure,
testCase,
(@?=))
lowLevelTests :: TestTree lowLevelTests :: TestTree
lowLevelTests = testGroup "Unit tests of low-level Haskell library" lowLevelTests = testGroup "Unit tests of low-level Haskell library"
[ testGRPCBracket [ testGRPCBracket
, testCompletionQueueCreateDestroy , testCompletionQueueCreateDestroy
, testServerCreateDestroy
, testClientCreateDestroy , testClientCreateDestroy
, testWithServerCall , testClientCall
, testWithClientCall
, testClientTimeoutNoServer , testClientTimeoutNoServer
, testServerCreateDestroy
, testServerCall
, testServerTimeoutNoClient , testServerTimeoutNoClient
-- , testWrongEndpoint -- , testWrongEndpoint
, testPayload , testPayload
@ -39,38 +44,38 @@ testCompletionQueueCreateDestroy =
testCase "Create/destroy CQ" $ withGRPC $ \g -> testCase "Create/destroy CQ" $ withGRPC $ \g ->
withCompletionQueue g nop withCompletionQueue g nop
testServerCreateDestroy :: TestTree
testServerCreateDestroy =
serverOnlyTest "start/stop" [] nop
testClientCreateDestroy :: TestTree testClientCreateDestroy :: TestTree
testClientCreateDestroy = testClientCreateDestroy =
clientOnlyTest "start/stop" nop clientOnlyTest "start/stop" nop
testWithServerCall :: TestTree testClientCall :: TestTree
testWithServerCall = testClientCall =
serverOnlyTest "create/destroy call" [] $ \s -> do
r <- withServerUnregCall s 1 $ const $ return $ Right ()
r @?= Left GRPCIOTimeout
testWithClientCall :: TestTree
testWithClientCall =
clientOnlyTest "create/destroy call" $ \c -> do clientOnlyTest "create/destroy call" $ \c -> do
r <- withClientCall c "foo" 10 $ const $ return $ Right () r <- U.withClientCall c "/foo" 10 $ const $ return $ Right ()
r @?= Right () r @?= Right ()
testClientTimeoutNoServer :: TestTree testClientTimeoutNoServer :: TestTree
testClientTimeoutNoServer = testClientTimeoutNoServer =
clientOnlyTest "request timeout when server DNE" $ \c -> do clientOnlyTest "request timeout when server DNE" $ \c -> do
rm <- clientRegisterMethod c "/foo" Normal rm <- clientRegisterMethod c "/foo" Normal
r <- clientRegisteredRequest c rm 1 "Hello" mempty r <- clientRequest c rm 1 "Hello" mempty
r @?= Left GRPCIOTimeout
testServerCreateDestroy :: TestTree
testServerCreateDestroy =
serverOnlyTest "start/stop" [] nop
testServerCall :: TestTree
testServerCall =
serverOnlyTest "create/destroy call" [] $ \s -> do
r <- U.withServerCall s 1 $ const $ return $ Right ()
r @?= Left GRPCIOTimeout r @?= Left GRPCIOTimeout
testServerTimeoutNoClient :: TestTree testServerTimeoutNoClient :: TestTree
testServerTimeoutNoClient = testServerTimeoutNoClient =
serverOnlyTest "wait timeout when client DNE" [("/foo", Normal)] $ \s -> do serverOnlyTest "wait timeout when client DNE" [("/foo", Normal)] $ \s -> do
let rm = head (registeredMethods s) let rm = head (registeredMethods s)
r <- serverHandleNormalRegisteredCall s rm 1 mempty $ \_ _ -> r <- serverHandleNormalCall s rm 1 mempty $ \_ _ ->
return ("", mempty, mempty, StatusDetails "details") return ("", mempty, mempty, StatusDetails "details")
r @?= Left GRPCIOTimeout r @?= Left GRPCIOTimeout
@ -85,13 +90,13 @@ testWrongEndpoint =
-- further -- further
client c = do client c = do
rm <- clientRegisterMethod c "/bar" Normal rm <- clientRegisterMethod c "/bar" Normal
r <- clientRegisteredRequest c rm 1 "Hello!" mempty r <- clientRequest c rm 1 "Hello!" mempty
r @?= Left (GRPCIOBadStatusCode GrpcStatusDeadlineExceeded r @?= Left (GRPCIOBadStatusCode GrpcStatusDeadlineExceeded
(StatusDetails "Deadline Exceeded")) (StatusDetails "Deadline Exceeded"))
server s = do server s = do
length (registeredMethods s) @?= 1 length (registeredMethods s) @?= 1
let rm = head (registeredMethods s) let rm = head (registeredMethods s)
r <- serverHandleNormalRegisteredCall s rm 10 mempty $ \_ _ -> do r <- serverHandleNormalCall s rm 10 mempty $ \_ _ -> do
return ("reply test", dummyMeta, dummyMeta, StatusDetails "details string") return ("reply test", dummyMeta, dummyMeta, StatusDetails "details string")
r @?= Right () r @?= Right ()
@ -108,7 +113,7 @@ testPayload =
clientMD = [("foo_key", "foo_val"), ("bar_key", "bar_val")] clientMD = [("foo_key", "foo_val"), ("bar_key", "bar_val")]
client c = do client c = do
rm <- clientRegisterMethod c "/foo" Normal rm <- clientRegisterMethod c "/foo" Normal
clientRegisteredRequest c rm 10 "Hello!" clientMD >>= do clientRequest c rm 10 "Hello!" clientMD >>= do
checkReqRslt $ \NormalRequestResult{..} -> do checkReqRslt $ \NormalRequestResult{..} -> do
rspCode @?= GrpcStatusOk rspCode @?= GrpcStatusOk
rspBody @?= "reply test" rspBody @?= "reply test"
@ -118,7 +123,7 @@ testPayload =
server s = do server s = do
length (registeredMethods s) @?= 1 length (registeredMethods s) @?= 1
let rm = head (registeredMethods s) let rm = head (registeredMethods s)
r <- serverHandleNormalRegisteredCall s rm 11 mempty $ \reqBody reqMD -> do r <- serverHandleNormalCall s rm 11 mempty $ \reqBody reqMD -> do
reqBody @?= "Hello!" reqBody @?= "Hello!"
checkMD "Server metadata mismatch" clientMD reqMD checkMD "Server metadata mismatch" clientMD reqMD
return ("reply test", dummyMeta, dummyMeta, StatusDetails "details string") return ("reply test", dummyMeta, dummyMeta, StatusDetails "details string")
@ -129,13 +134,13 @@ testPayloadUnregistered =
csTest "unregistered normal request/response" client server [] csTest "unregistered normal request/response" client server []
where where
client c = do client c = do
clientRequest c "/foo" 10 "Hello!" mempty >>= do U.clientRequest c "/foo" 10 "Hello!" mempty >>= do
checkReqRslt $ \NormalRequestResult{..} -> do checkReqRslt $ \NormalRequestResult{..} -> do
rspCode @?= GrpcStatusOk rspCode @?= GrpcStatusOk
rspBody @?= "reply test" rspBody @?= "reply test"
details @?= "details string" details @?= "details string"
server s = do server s = do
r <- serverHandleNormalCall s 11 mempty $ \body _md meth -> do r <- U.serverHandleNormalCall s 11 mempty $ \body _md meth -> do
body @?= "Hello!" body @?= "Hello!"
meth @?= "/foo" meth @?= "/foo"
return ("reply test", mempty, "details string") return ("reply test", mempty, "details string")