mirror of
synced 2025-01-26 19:04:58 +01:00
Improve call-related code (#14)
* projections from CallDetails * refactor Call, refactor clientRegisteredRequest, handle null error * split ServerCall into separate reg/unreg types * pass method name to unreg call handler, finish destroyServerUnregCall
This commit is contained in:
11 changed files with 324 additions and 201 deletions
@ -359,3 +359,15 @@ grpc_call* grpc_channel_create_registered_call_(
propagation_mask, completion_queue, registered_call_handle,
*deadline, reserved);
char* call_details_get_method(grpc_call_details* details){
return details->method;
char* call_details_get_host(grpc_call_details* details){
return details->host;
gpr_timespec* call_details_get_deadline(grpc_call_details* details){
return &(details->deadline);
@ -123,4 +123,10 @@ grpc_call* grpc_channel_create_registered_call_(
grpc_completion_queue *completion_queue, void *registered_call_handle,
gpr_timespec *deadline, void *reserved);
char* call_details_get_method(grpc_call_details* details);
char* call_details_get_host(grpc_call_details* details);
gpr_timespec* call_details_get_deadline(grpc_call_details* details);
@ -21,22 +21,24 @@ GRPC
-- * Calls
, GRPCMethodType(..)
, RegisteredMethod
, Call
, NormalRequestResult(..)
-- * Server
, ServerConfig(..)
, Server
, ServerRegCall
, ServerUnregCall
, registeredMethods
, withServer
, serverHandleNormalRegisteredCall
, serverHandleNormalCall
, withServerCall
, withServerUnregCall
, withServerRegisteredCall
-- * Client
, ClientConfig(..)
, Client
, ClientCall
, withClient
, clientRegisterMethod
, clientRegisteredRequest
@ -44,7 +46,9 @@ GRPC
, withClientCall
-- * Ops
, runOps
, runClientOps
, runServerRegOps
, runServerUnregOps
, Op(..)
, OpRecvResult(..)
, StatusDetails(..)
@ -4,9 +4,10 @@
module Network.GRPC.LowLevel.Call where
import Control.Monad
import Data.ByteString (ByteString)
import Data.String (IsString)
import Foreign.Marshal.Alloc (free)
import Foreign.Ptr (Ptr, castPtr)
import Foreign.Ptr (Ptr, nullPtr)
import Foreign.Storable (peek)
import qualified Network.GRPC.Unsafe as C
@ -14,7 +15,7 @@ import qualified Network.GRPC.Unsafe.Time as C
import qualified Network.GRPC.Unsafe.Metadata as C
import qualified Network.GRPC.Unsafe.ByteBuffer as C
import Network.GRPC.LowLevel.GRPC (grpcDebug)
import Network.GRPC.LowLevel.GRPC (grpcDebug, MetadataMap)
-- | Models the four types of RPC call supported by gRPC. We currently only
-- support the first alternative, and only in a preliminary fashion.
@ -38,78 +39,141 @@ data RegisteredMethod = RegisteredMethod {methodType :: GRPCMethodType,
methodHost :: Host,
methodHandle :: C.CallHandle}
-- | Represents one GRPC call (i.e. request). This type is used on both the
-- client and server. Contains pointers to all the necessary C state needed to
-- send and respond to a call.
-- | Represents one GRPC call (i.e. request) on the client.
-- This is used to associate send/receive 'Op's with a request.
-- There are separate functions for creating these depending on whether the
-- method is registered and whether the call is on the client or server side.
data Call = ClientCall {internalCall :: C.Call}
| ServerCall
{internalCall :: C.Call,
requestMetadataRecv :: (Ptr C.MetadataArray),
optionalPayload :: Maybe (Ptr C.ByteBuffer),
parentPtr :: Maybe (Ptr C.Call),
callDetails :: Maybe (C.CallDetails),
-- ^ used on the server for non-registered calls
--, to identify the endpoint being used.
callDeadline :: Maybe C.CTimeSpecPtr
data ClientCall = ClientCall {internalClientCall :: C.Call}
debugCall :: Call -> IO ()
-- | Represents one registered GRPC call on the server.
-- Contains pointers to all the C state needed to respond to a registered call.
data ServerRegCall = ServerRegCall
{internalServerRegCall :: C.Call,
requestMetadataRecvReg :: Ptr C.MetadataArray,
optionalPayload :: Ptr C.ByteBuffer,
parentPtrReg :: Maybe (Ptr C.Call),
callDeadline :: C.CTimeSpecPtr
serverRegCallGetMetadata :: ServerRegCall -> IO MetadataMap
serverRegCallGetMetadata ServerRegCall{..} = do
marray <- peek requestMetadataRecvReg
C.getAllMetadataArray marray
-- | Extract the client request body from the given registered call, if present.
-- TODO: the reason this returns @Maybe ByteString@ is because the gRPC library
-- calls the underlying out parameter "optional_payload". I am not sure exactly
-- in what cases it won't be present.
serverRegCallGetPayload :: ServerRegCall -> IO (Maybe ByteString)
serverRegCallGetPayload ServerRegCall{..} = do
bb@(C.ByteBuffer rawPtr) <- peek optionalPayload
if rawPtr == nullPtr
then return Nothing
else Just <$> C.copyByteBufferToByteString bb
-- | Represents one unregistered GRPC call on the server.
-- Contains pointers to all the C state needed to respond to an unregistered
-- call.
data ServerUnregCall = ServerUnregCall
{internalServerUnregCall :: C.Call,
requestMetadataRecvUnreg :: Ptr C.MetadataArray,
parentPtrUnreg :: Maybe (Ptr C.Call),
callDetails :: C.CallDetails}
serverUnregCallGetMetadata :: ServerUnregCall -> IO MetadataMap
serverUnregCallGetMetadata ServerUnregCall{..} = do
marray <- peek requestMetadataRecvUnreg
C.getAllMetadataArray marray
serverUnregCallGetMethodName :: ServerUnregCall -> IO MethodName
serverUnregCallGetMethodName ServerUnregCall{..} =
MethodName <$> C.callDetailsGetMethod callDetails
debugClientCall :: ClientCall -> IO ()
{-# INLINE debugClientCall #-}
#ifdef DEBUG
debugCall (ClientCall (C.Call ptr)) =
debugClientCall (ClientCall (C.Call ptr)) =
grpcDebug $ "debugCall: client call: " ++ (show ptr)
debugCall call@(ServerCall (C.Call ptr) _ _ _ _ _) = do
grpcDebug $ "debugCall: server call: " ++ (show ptr)
grpcDebug $ "debugCall: metadata ptr: " ++ show (requestMetadataRecv call)
metadataArr <- peek (requestMetadataRecv call)
metadata <- C.getAllMetadataArray metadataArr
grpcDebug $ "debugCall: metadata received: " ++ (show metadata)
forM_ (optionalPayload call) $ \payloadPtr -> do
grpcDebug $ "debugCall: payload ptr: " ++ show payloadPtr
payload <- peek payloadPtr
bs <- C.copyByteBufferToByteString payload
grpcDebug $ "debugCall: payload contents: " ++ show bs
forM_ (parentPtr call) $ \parentPtr' -> do
grpcDebug $ "debugCall: parent ptr: " ++ show parentPtr'
(C.Call parent) <- peek parentPtr'
grpcDebug $ "debugCall: parent: " ++ show parent
forM_ (callDetails call) $ \(C.CallDetails callDetailsPtr) -> do
grpcDebug $ "debugCall: callDetails ptr: " ++ show callDetailsPtr
--TODO: need functions for getting data out of call_details.
forM_ (callDeadline call) $ \timespecptr -> do
grpcDebug $ "debugCall: deadline ptr: " ++ show timespecptr
timespec <- peek timespecptr
grpcDebug $ "debugCall: deadline: " ++ show (C.timeSpec timespec)
{-# INLINE debugCall #-}
debugCall = const $ return ()
debugClientCall = const $ return ()
-- | Destroys a 'Call'.
destroyCall :: Call -> IO ()
destroyCall ClientCall{..} = do
grpcDebug "Destroying client-side call object."
C.grpcCallDestroy internalCall
destroyCall call@ServerCall{..} = do
grpcDebug "destroyCall: entered."
debugCall call
grpcDebug $ "Destroying server-side call object: " ++ show internalCall
C.grpcCallDestroy internalCall
grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecv
C.metadataArrayDestroy requestMetadataRecv
grpcDebug $ "destroying optional payload" ++ show optionalPayload
forM_ optionalPayload C.destroyReceivingByteBuffer
grpcDebug $ "freeing parentPtr: " ++ show parentPtr
forM_ parentPtr free
grpcDebug $ "destroying call details" ++ show callDetails
forM_ callDetails C.destroyCallDetails
grpcDebug $ "destroying deadline." ++ show callDeadline
forM_ callDeadline C.timespecDestroy
debugServerRegCall :: ServerRegCall -> IO ()
#ifdef DEBUG
debugServerRegCall call@(ServerRegCall (C.Call ptr) _ _ _ _) = do
grpcDebug $ "debugServerRegCall: server call: " ++ (show ptr)
grpcDebug $ "debugServerRegCall: metadata ptr: "
++ show (requestMetadataRecvReg call)
metadataArr <- peek (requestMetadataRecvReg call)
metadata <- C.getAllMetadataArray metadataArr
grpcDebug $ "debugServerRegCall: metadata received: " ++ (show metadata)
grpcDebug $ "debugServerRegCall: payload ptr: " ++ show (optionalPayload call)
payload <- peek (optionalPayload call)
bs <- C.copyByteBufferToByteString payload
grpcDebug $ "debugServerRegCall: payload contents: " ++ show bs
forM_ (parentPtrReg call) $ \parentPtr' -> do
grpcDebug $ "debugServerRegCall: parent ptr: " ++ show parentPtr'
(C.Call parent) <- peek parentPtr'
grpcDebug $ "debugServerRegCall: parent: " ++ show parent
grpcDebug $ "debugServerRegCall: deadline ptr: " ++ show (callDeadline call)
timespec <- peek (callDeadline call)
grpcDebug $ "debugServerRegCall: deadline: " ++ show (C.timeSpec timespec)
{-# INLINE debugServerRegCall #-}
debugServerRegCall = const $ return ()
_nowarn_unused :: a
_nowarn_unused =
castPtr `undefined`
(peek :: Ptr Int -> IO Int) `undefined`
debugServerUnregCall :: ServerUnregCall -> IO ()
#ifdef DEBUG
debugServerUnregCall call@(ServerUnregCall (C.Call ptr) _ _ _) = do
grpcDebug $ "debugServerUnregCall: server call: " ++ (show ptr)
grpcDebug $ "debugServerUnregCall: metadata ptr: "
++ show (requestMetadataRecvUnreg call)
metadataArr <- peek (requestMetadataRecvUnreg call)
metadata <- C.getAllMetadataArray metadataArr
grpcDebug $ "debugServerUnregCall: metadata received: " ++ (show metadata)
forM_ (parentPtrUnreg call) $ \parentPtr' -> do
grpcDebug $ "debugServerRegCall: parent ptr: " ++ show parentPtr'
(C.Call parent) <- peek parentPtr'
grpcDebug $ "debugServerRegCall: parent: " ++ show parent
grpcDebug $ "debugServerUnregCall: callDetails ptr: "
++ show (callDetails call)
--TODO: need functions for getting data out of call_details.
{-# INLINE debugServerUnregCall #-}
debugServerUnregCall = const $ return ()
destroyClientCall :: ClientCall -> IO ()
destroyClientCall ClientCall{..} = do
grpcDebug "Destroying client-side call object."
C.grpcCallDestroy internalClientCall
destroyServerRegCall :: ServerRegCall -> IO ()
destroyServerRegCall call@ServerRegCall{..} = do
grpcDebug "destroyServerRegCall: entered."
debugServerRegCall call
grpcDebug $ "Destroying server-side call object: "
++ show internalServerRegCall
C.grpcCallDestroy internalServerRegCall
grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecvReg
C.metadataArrayDestroy requestMetadataRecvReg
grpcDebug $ "destroying optional payload" ++ show optionalPayload
C.destroyReceivingByteBuffer optionalPayload
grpcDebug $ "freeing parentPtr: " ++ show parentPtrReg
forM_ parentPtrReg free
grpcDebug $ "destroying deadline." ++ show callDeadline
C.timespecDestroy callDeadline
destroyServerUnregCall :: ServerUnregCall -> IO ()
destroyServerUnregCall call@ServerUnregCall{..} = do
grpcDebug "destroyServerUnregCall: entered."
debugServerUnregCall call
grpcDebug $ "Destroying server-side call object: "
++ show internalServerUnregCall
C.grpcCallDestroy internalServerUnregCall
grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecvUnreg
C.metadataArrayDestroy requestMetadataRecvUnreg
grpcDebug $ "freeing parentPtrUnreg: " ++ show parentPtrUnreg
forM_ parentPtrUnreg free
grpcDebug $ "destroying call details: " ++ show callDetails
C.destroyCallDetails callDetails
@ -3,6 +3,7 @@
module Network.GRPC.LowLevel.Client where
import Control.Exception (bracket, finally)
import Control.Monad (join)
import Data.ByteString (ByteString)
import Foreign.Ptr (nullPtr)
import qualified Network.GRPC.Unsafe as C
@ -63,7 +64,7 @@ clientRegisterMethod _ _ _ _ = error "Streaming methods not yet implemented."
-- Returns 'Left' if the CQ is shutting down or if the job to create a call
-- timed out.
clientCreateRegisteredCall :: Client -> RegisteredMethod -> TimeoutSeconds
-> IO (Either GRPCIOError Call)
-> IO (Either GRPCIOError ClientCall)
clientCreateRegisteredCall Client{..} RegisteredMethod{..} timeout = do
let parentCall = C.Call nullPtr --Unsure what this does. null is safe, though.
C.withDeadlineSeconds timeout $ \deadline -> do
@ -74,7 +75,7 @@ clientCreateRegisteredCall Client{..} RegisteredMethod{..} timeout = do
-- by switching to ExceptT IO.
-- | Handles safe creation and cleanup of a client call
withClientRegisteredCall :: Client -> RegisteredMethod -> TimeoutSeconds
-> (Call
-> (ClientCall
-> IO (Either GRPCIOError a))
-> IO (Either GRPCIOError a)
withClientRegisteredCall client regmethod timeout f = do
@ -83,7 +84,7 @@ withClientRegisteredCall client regmethod timeout f = do
Left x -> return $ Left x
Right call -> f call `finally` logDestroy call
where logDestroy c = grpcDebug "withClientRegisteredCall: destroying."
>> destroyCall c
>> destroyClientCall c
-- | Create a call on the client for an endpoint without using the
-- method registration machinery. In practice, we'll probably only use the
@ -94,7 +95,7 @@ clientCreateCall :: Client
-> Host
-- ^ The host.
-> TimeoutSeconds
-> IO (Either GRPCIOError Call)
-> IO (Either GRPCIOError ClientCall)
clientCreateCall Client{..} method host timeout = do
let parentCall = C.Call nullPtr
C.withDeadlineSeconds timeout $ \deadline -> do
@ -102,7 +103,7 @@ clientCreateCall Client{..} method host timeout = do
clientCQ method host deadline
withClientCall :: Client -> MethodName -> Host -> TimeoutSeconds
-> (Call -> IO (Either GRPCIOError a))
-> (ClientCall -> IO (Either GRPCIOError a))
-> IO (Either GRPCIOError a)
withClientCall client method host timeout f = do
createResult <- clientCreateCall client method host timeout
@ -110,7 +111,7 @@ withClientCall client method host timeout f = do
Left x -> return $ Left x
Right call -> f call `finally` logDestroy call
where logDestroy c = grpcDebug "withClientCall: destroying."
>> destroyCall c
>> destroyClientCall c
data NormalRequestResult = NormalRequestResult
@ -121,26 +122,25 @@ data NormalRequestResult = NormalRequestResult
deriving (Show, Eq)
-- | Function for assembling call result when the 'MethodType' is 'Normal'.
compileNormalRequestResults :: [OpRecvResult] -> NormalRequestResult
compileNormalRequestResults :: [OpRecvResult]
-> Either GRPCIOError NormalRequestResult
--TODO: consider using more precise type instead of match.
-- Whether we do so depends on whether this layer of abstraction is supposed
-- to be a safe interface to the gRPC C core library, or something that makes
-- core use cases easy.
[OpRecvInitialMetadataResult m,
OpRecvMessageResult body,
OpRecvMessageResult (Just body),
OpRecvStatusOnClientResult m2 status details]
= NormalRequestResult body (Just m) m2 status (StatusDetails details)
= Right $ NormalRequestResult body (Just m) m2 status
(StatusDetails details)
-- TODO: it seems registered request responses on the server
-- don't send initial metadata. Hence the 'Maybe'. Investigate.
[OpRecvMessageResult body,
[OpRecvMessageResult (Just body),
OpRecvStatusOnClientResult m2 status details]
= NormalRequestResult body Nothing m2 status (StatusDetails details)
compileNormalRequestResults _ =
--TODO: impossible case should be enforced by more precise types.
error "non-normal request input to compileNormalRequestResults."
= Right $ NormalRequestResult body Nothing m2 status (StatusDetails details)
compileNormalRequestResults x =
case extractStatus x of
Nothing -> Left GRPCIOUnknownError
Just (OpRecvStatusOnClientResult _ status details) ->
Left (GRPCIOBadStatusCode status (StatusDetails details))
-- | Make a request of the given method with the given body. Returns the
-- server's response. TODO: This is preliminary until we figure out how many
@ -162,37 +162,28 @@ clientRegisteredRequest :: Client
-> IO (Either GRPCIOError NormalRequestResult)
clientRegisteredRequest client@(Client{..}) rm@(RegisteredMethod{..})
timeLimit body meta =
case methodType of
fmap join $ case methodType of
Normal -> withClientRegisteredCall client rm timeLimit $ \call -> do
grpcDebug "clientRegisteredRequest: created call."
debugCall call
--TODO: doing one op at a time to debug. Some were hanging.
let op1 = [OpSendInitialMetadata meta]
res1 <- runOps call clientCQ op1 timeLimit
grpcDebug $ "finished res1: " ++ show res1
let op2 = [OpSendMessage body]
res2 <- runOps call clientCQ op2 timeLimit
grpcDebug $ "finished res2: " ++ show res2
let op3 = [OpSendCloseFromClient]
res3 <- runOps call clientCQ op3 timeLimit
grpcDebug $ "finished res3: " ++ show res3
let op4 = [OpRecvMessage]
res4 <- runOps call clientCQ op4 timeLimit
grpcDebug $ "finished res4: " ++ show res4
let op5 = [OpRecvStatusOnClient]
res5 <- runOps call clientCQ op5 timeLimit
grpcDebug $ "finished res5: " ++ show res5
let results = do
r1 <- res1
r2 <- res2
r3 <- res3
r4 <- res4
r5 <- res5
return $ r1 ++ r2 ++ r3 ++ r4 ++ r5
case results of
Left x -> return $ Left x
Right rs -> return $
Right $ compileNormalRequestResults rs
debugClientCall call
-- NOTE: sendOps and recvOps *must* be in separate batches or
-- the client hangs when the server can't be reached.
let sendOps = [OpSendInitialMetadata meta
, OpSendMessage body
, OpSendCloseFromClient]
sendRes <- runClientOps call clientCQ sendOps timeLimit
case sendRes of
Left x -> do grpcDebug "clientRegisteredRequest: batch error."
return $ Left x
Right rs -> do
let recvOps = [OpRecvMessage, OpRecvStatusOnClient]
recvRes <- runClientOps call clientCQ recvOps timeLimit
case recvRes of
Left x -> do
grpcDebug "clientRegisteredRequest: batch error."
return $ Left x
Right rs' -> do
return $ Right $ compileNormalRequestResults (rs ++ rs')
_ -> error "Streaming methods not yet implemented."
-- | Makes a normal (non-streaming) request without needing to register a method
@ -210,16 +201,16 @@ clientRequest :: Client
-- ^ Request metadata.
-> IO (Either GRPCIOError NormalRequestResult)
clientRequest client@(Client{..}) (MethodName method) (Host host)
timeLimit body meta = do
timeLimit body meta =
fmap join $
withClientCall client (MethodName method) (Host host) timeLimit $ \call -> do
let ops = clientNormalRequestOps body meta
results <- runOps call clientCQ ops timeLimit
results <- runClientOps call clientCQ ops timeLimit
grpcDebug "clientRequest: ops ran."
case results of
Left x -> return $ Left x
Right rs -> return $ Right $ compileNormalRequestResults rs
clientNormalRequestOps :: ByteString -> MetadataMap -> [Op]
clientNormalRequestOps body metadata =
[OpSendInitialMetadata metadata,
@ -224,7 +224,8 @@ shutdownCompletionQueue (CompletionQueue{..}) = do
channelCreateRegisteredCall :: C.Channel -> C.Call -> C.PropagationMask
-> CompletionQueue -> C.CallHandle
-> C.CTimeSpecPtr -> IO (Either GRPCIOError Call)
-> C.CTimeSpecPtr
-> IO (Either GRPCIOError ClientCall)
chan parent mask cq@CompletionQueue{..} handle deadline =
withPermission Push cq $ do
@ -238,7 +239,7 @@ channelCreateRegisteredCall
channelCreateCall :: C.Channel -> C.Call -> C.PropagationMask -> CompletionQueue
-> MethodName -> Host -> C.CTimeSpecPtr
-> IO (Either GRPCIOError Call)
-> IO (Either GRPCIOError ClientCall)
chan parent mask cq@CompletionQueue{..} (MethodName methodName) (Host host)
deadline =
@ -250,7 +251,7 @@ channelCreateCall
-- | Create the call object to handle a registered call.
serverRequestRegisteredCall :: C.Server -> CompletionQueue -> TimeoutSeconds
-> RegisteredMethod
-> IO (Either GRPCIOError Call)
-> IO (Either GRPCIOError ServerRegCall)
server cq@CompletionQueue{..} timeLimit RegisteredMethod{..} =
withPermission Push cq $ do
@ -280,9 +281,8 @@ serverRequestRegisteredCall
return $ Left x
Right () -> do
rawCall <- peek callPtr
let assembledCall = ServerCall rawCall metadataArrayPtr
(Just bbPtr) Nothing Nothing
(Just deadline)
let assembledCall = ServerRegCall rawCall metadataArrayPtr
bbPtr Nothing deadline
return $ Right assembledCall
-- TODO: see TODO for failureCleanup in serverRequestCall.
where failureCleanup deadline callPtr metadataArrayPtr bbPtr = forkIO $ do
@ -294,7 +294,7 @@ serverRequestRegisteredCall
free bbPtr
serverRequestCall :: C.Server -> CompletionQueue -> TimeoutSeconds
-> IO (Either GRPCIOError Call)
-> IO (Either GRPCIOError ServerUnregCall)
serverRequestCall server cq@CompletionQueue{..} timeLimit =
withPermission Push cq $ do
callPtr <- malloc
@ -321,12 +321,10 @@ serverRequestCall server cq@CompletionQueue{..} timeLimit =
return $ Left x
Right () -> do
rawCall <- peek callPtr
let call = ServerCall rawCall
(Just callDetails)
let call = ServerUnregCall rawCall
return $ Right call
--TODO: the gRPC library appears to hold onto these pointers for a random
@ -10,12 +10,20 @@ import Control.Monad.Except (ExceptT(..), runExceptT, throwError,
import Control.Exception
import qualified Data.ByteString as B
import qualified Data.Map as M
import Data.String (IsString)
import qualified Network.GRPC.Unsafe as C
import qualified Network.GRPC.Unsafe.Op as C
#ifdef DEBUG
import GHC.Conc (myThreadId)
type MetadataMap = M.Map B.ByteString B.ByteString
newtype StatusDetails = StatusDetails B.ByteString deriving (Show, Eq, IsString)
-- | Functions as a proof that the gRPC core has been started. The gRPC core
-- must be initialized to create any gRPC state, so this is a requirement for
-- the server and client create/start functions.
@ -39,6 +47,7 @@ data GRPCIOError = GRPCIOCallError C.CallError
-- ^ Thrown if a 'CompletionQueue' fails to shut down in a
-- reasonable amount of time.
| GRPCIOUnknownError
| GRPCIOBadStatusCode C.StatusCode StatusDetails
deriving (Show, Eq)
throwIfCallError :: C.CallError -> Either GRPCIOError ()
@ -14,7 +14,7 @@ import Foreign.Marshal.Alloc (free, malloc,
import Foreign.Ptr (Ptr, nullPtr)
import Foreign.Storable (peek, poke)
import qualified Network.GRPC.Unsafe as C ()
import qualified Network.GRPC.Unsafe as C (Call)
import qualified Network.GRPC.Unsafe.ByteBuffer as C
import qualified Network.GRPC.Unsafe.Metadata as C
import qualified Network.GRPC.Unsafe.Op as C
@ -23,10 +23,6 @@ import Network.GRPC.LowLevel.Call
import Network.GRPC.LowLevel.CompletionQueue
import Network.GRPC.LowLevel.GRPC
type MetadataMap = M.Map B.ByteString B.ByteString
newtype StatusDetails = StatusDetails B.ByteString deriving (Show, Eq, IsString)
-- | Sum describing all possible send and receive operations that can be batched
-- and executed by gRPC. Usually these are processed in a handful of
-- combinations depending on the 'MethodType' of the call being run.
@ -146,7 +142,9 @@ withOpArray n = bracket (C.opArrayCreate n)
-- | Container holding GC-managed results for 'Op's which receive data.
data OpRecvResult =
OpRecvInitialMetadataResult MetadataMap
| OpRecvMessageResult B.ByteString
| OpRecvMessageResult (Maybe B.ByteString)
-- ^ If the client or server dies, we might not receive a response body, in
-- which case this will be 'Nothing'.
| OpRecvStatusOnClientResult MetadataMap C.StatusCode B.ByteString
| OpRecvCloseOnServerResult Bool -- ^ True if call was cancelled.
deriving (Eq, Show)
@ -162,11 +160,12 @@ resultFromOpContext (OpRecvInitialMetadataContext pmetadata) = do
return $ Just $ OpRecvInitialMetadataResult metadataMap
resultFromOpContext (OpRecvMessageContext pbb) = do
grpcDebug "resultFromOpContext: OpRecvMessageContext"
bb <- peek pbb
grpcDebug "resultFromOpContext: bytebuffer peeked."
bs <- C.copyByteBufferToByteString bb
grpcDebug "resultFromOpContext: bb copied."
return $ Just $ OpRecvMessageResult bs
bb@(C.ByteBuffer bbptr) <- peek pbb
if bbptr == nullPtr
then return $ Just $ OpRecvMessageResult Nothing
else do bs <- C.copyByteBufferToByteString bb
grpcDebug "resultFromOpContext: bb copied."
return $ Just $ OpRecvMessageResult (Just bs)
resultFromOpContext (OpRecvStatusOnClientContext pmetadata pcode pstr) = do
grpcDebug "resultFromOpContext: OpRecvStatusOnClientContext"
metadata <- peek pmetadata
@ -186,23 +185,18 @@ resultFromOpContext _ = do
--TODO: the list of 'Op's type is less specific than it could be. There are only
-- a few different sequences of 'Op's we will see in practice. Once we figure
-- out what those are, we should create a more specific sum type. This will also
-- allow us to make a more specific sum type to replace @[OpRecvResult]@, too.
-- out what those are, we should create a more specific sum type. However, since
-- ops can fail, the list of 'OpRecvResult' returned by 'runOps' can vary in
-- their contents and are perhaps less amenable to simplification.
-- In the meantime, from looking at the core tests, it looks like it is safe to
-- say that we always get a GRPC_CALL_ERROR_TOO_MANY_OPERATIONS error if we use
-- the same 'Op' twice in the same batch, so we might want to change the list to
-- a set. I don't think order matters within a batch. Need to check.
-- | For a given call, run the given 'Op's on the given completion queue with
-- the given tag. Blocks until the ops are complete or the given number of
-- seconds have elapsed.
runOps :: Call
-- ^ 'Call' that this batch is associated with. One call can be
-- associated with many batches.
runOps :: C.Call
-> CompletionQueue
-- ^ Queue on which our tag will be placed once our ops are done
-- running.
-> [Op]
-> TimeoutSeconds
-- ^ How long to block waiting for the tag to appear on the queue.
-- If we time out, the result of this action will be
-- @CallBatchError BatchTimeout@.
-> IO (Either GRPCIOError [OpRecvResult])
runOps call cq ops timeLimit =
let l = length ops in
@ -212,7 +206,7 @@ runOps call cq ops timeLimit =
grpcDebug $ "runOps: allocated op contexts: " ++ show contexts
sequence_ $ zipWith (setOpArray opArray) [0..l-1] contexts
tag <- newTag cq
callError <- startBatch cq (internalCall call) opArray l tag
callError <- startBatch cq call opArray l tag
grpcDebug $ "runOps: called start_batch. callError: "
++ (show callError)
case callError of
@ -226,5 +220,44 @@ runOps call cq ops timeLimit =
fmap (Right . catMaybes) $ mapM resultFromOpContext contexts
Left err -> return $ Left err
_nowarn_unused :: a
_nowarn_unused = undefined nullPtr
-- | For a given call, run the given 'Op's on the given completion queue with
-- the given tag. Blocks until the ops are complete or the given number of
-- seconds have elapsed.
-- TODO: now that 'ServerRegCall' and 'ServerUnregCall' are separate types, we
-- could try to limit the input 'Op's more appropriately. E.g., we don't use
-- an 'OpRecvInitialMetadata' when receiving a registered call, because gRPC
-- handles that for us.
runServerRegOps :: ServerRegCall
-- ^ 'Call' that this batch is associated with. One call can be
-- associated with many batches.
-> CompletionQueue
-- ^ Queue on which our tag will be placed once our ops are done
-- running.
-> [Op]
-- ^ The list of 'Op's to execute.
-> TimeoutSeconds
-- ^ How long to block waiting for the tag to appear on the
--queue. If we time out, the result of this action will be
-- @CallBatchError BatchTimeout@.
-> IO (Either GRPCIOError [OpRecvResult])
runServerRegOps = runOps . internalServerRegCall
runServerUnregOps :: ServerUnregCall
-> CompletionQueue
-> [Op]
-> TimeoutSeconds
-> IO (Either GRPCIOError [OpRecvResult])
runServerUnregOps = runOps . internalServerUnregCall
-- | Like 'runServerOps', but for client-side calls.
runClientOps :: ClientCall
-> CompletionQueue
-> [Op]
-> TimeoutSeconds
-> IO (Either GRPCIOError [OpRecvResult])
runClientOps = runOps . internalClientCall
extractStatus :: [OpRecvResult] -> Maybe OpRecvResult
extractStatus [] = Nothing
extractStatus (res@(OpRecvStatusOnClientResult _ _ _):_) = Just res
extractStatus (_:xs) = extractStatus xs
@ -6,9 +6,7 @@ import Control.Concurrent (threadDelay)
import Control.Exception (bracket, finally)
import Control.Monad
import Data.ByteString (ByteString)
import qualified Data.Map as M
import Foreign.Ptr (nullPtr)
import Foreign.Storable (peek)
import qualified Network.GRPC.Unsafe as C
import qualified Network.GRPC.Unsafe.Op as C
@ -25,9 +23,6 @@ import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue,
import Network.GRPC.LowLevel.GRPC
import Network.GRPC.LowLevel.Op
import qualified Network.GRPC.Unsafe.ByteBuffer as C
import qualified Network.GRPC.Unsafe.Metadata as C
-- | Wraps various gRPC state needed to run a server.
data Server = Server {internalServer :: C.Server, serverCQ :: CompletionQueue,
registeredMethods :: [RegisteredMethod]}
@ -123,12 +118,12 @@ serverRegisterMethod _ _ _ _ = error "Streaming methods not implemented yet."
-- | Create a 'Call' with which to wait for the invocation of a registered
-- method.
serverCreateRegisteredCall :: Server -> RegisteredMethod -> TimeoutSeconds
-> IO (Either GRPCIOError Call)
-> IO (Either GRPCIOError ServerRegCall)
serverCreateRegisteredCall Server{..} rm timeLimit =
serverRequestRegisteredCall internalServer serverCQ timeLimit rm
withServerRegisteredCall :: Server -> RegisteredMethod -> TimeoutSeconds
-> (Call
-> (ServerRegCall
-> IO (Either GRPCIOError a))
-> IO (Either GRPCIOError a)
withServerRegisteredCall server regmethod timeout f = do
@ -137,23 +132,23 @@ withServerRegisteredCall server regmethod timeout f = do
Left x -> return $ Left x
Right call -> f call `finally` logDestroy call
where logDestroy c = grpcDebug "withServerRegisteredCall: destroying."
>> destroyCall c
>> destroyServerRegCall c
serverCreateCall :: Server -> TimeoutSeconds
-> IO (Either GRPCIOError Call)
serverCreateCall Server{..} timeLimit =
serverCreateUnregCall :: Server -> TimeoutSeconds
-> IO (Either GRPCIOError ServerUnregCall)
serverCreateUnregCall Server{..} timeLimit =
serverRequestCall internalServer serverCQ timeLimit
withServerCall :: Server -> TimeoutSeconds
-> (Call -> IO (Either GRPCIOError a))
withServerUnregCall :: Server -> TimeoutSeconds
-> (ServerUnregCall -> IO (Either GRPCIOError a))
-> IO (Either GRPCIOError a)
withServerCall server timeout f = do
createResult <- serverCreateCall server timeout
withServerUnregCall server timeout f = do
createResult <- serverCreateUnregCall server timeout
case createResult of
Left x -> return $ Left x
Right call -> f call `finally` logDestroy call
where logDestroy c = grpcDebug "withServerCall: destroying."
>> destroyCall c
>> destroyServerUnregCall c
-- | Sequence of 'Op's needed to receive a normal (non-streaming) call.
serverOpsGetNormalCall :: MetadataMap -> [Op]
@ -211,19 +206,19 @@ serverHandleNormalRegisteredCall s@Server{..} rm timeLimit srvMetadata f = do
-- anyway.
withServerRegisteredCall s rm timeLimit $ \call -> do
grpcDebug "serverHandleNormalRegisteredCall: starting batch."
debugCall call
case optionalPayload call of
Nothing -> error "Impossible: not a registered call." --TODO: better types
Just payloadPtr -> do
payload <- peek payloadPtr
requestBody <- C.copyByteBufferToByteString payload
metadataArray <- peek $ requestMetadataRecv call
metadata <- C.getAllMetadataArray metadataArray
(respBody, initMeta, trailingMeta, details) <- f requestBody metadata
debugServerRegCall call
payload <- serverRegCallGetPayload call
case payload of
--TODO: what should we do with an empty payload? Have the handler take
-- @Maybe ByteString@? Need to figure out when/why payload would be empty.
Nothing -> error "serverHandleNormalRegisteredCall: payload empty."
Just requestBody -> do
requestMeta <- serverRegCallGetMetadata call
(respBody, initMeta, trailingMeta, details) <- f requestBody requestMeta
let status = C.GrpcStatusOk
let respOps = serverOpsSendNormalRegisteredResponse
respBody initMeta trailingMeta status details
respOpsResults <- runOps call serverCQ respOps timeLimit
respOpsResults <- runServerRegOps call serverCQ respOps timeLimit
grpcDebug "serverHandleNormalRegisteredCall: finished response ops."
case respOpsResults of
Left x -> return $ Left x
@ -235,25 +230,28 @@ serverHandleNormalRegisteredCall s@Server{..} rm timeLimit srvMetadata f = do
serverHandleNormalCall :: Server -> TimeoutSeconds
-> MetadataMap
-- ^ Initial server metadata.
-> (ByteString -> MetadataMap
-> (ByteString -> MetadataMap -> MethodName
-> IO (ByteString, MetadataMap, StatusDetails))
-- ^ Handler function takes a request body and
-- metadata and returns a response body and metadata.
-> IO (Either GRPCIOError ())
serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do
withServerCall s timeLimit $ \call -> do
withServerUnregCall s timeLimit $ \call -> do
grpcDebug "serverHandleNormalCall: starting batch."
let recvOps = serverOpsGetNormalCall srvMetadata
opResults <- runOps call serverCQ recvOps timeLimit
opResults <- runServerUnregOps call serverCQ recvOps timeLimit
case opResults of
Left x -> return $ Left x
Right [OpRecvMessageResult body] -> do
--TODO: we need to get client metadata
(respBody, respMetadata, details) <- f body M.empty
Left x -> do grpcDebug "serverHandleNormalCall: ops failed; aborting"
return $ Left x
Right [OpRecvMessageResult (Just body)] -> do
requestMeta <- serverUnregCallGetMetadata call
grpcDebug $ "got client metadata: " ++ show requestMeta
methodName <- serverUnregCallGetMethodName call
(respBody, respMetadata, details) <- f body requestMeta methodName
let status = C.GrpcStatusOk
let respOps = serverOpsSendNormalResponse
respBody respMetadata status details
respOpsResults <- runOps call serverCQ respOps timeLimit
respOpsResults <- runServerUnregOps call serverCQ respOps timeLimit
case respOpsResults of
Left x -> do grpcDebug "serverHandleNormalCall: resp failed."
return $ Left x
@ -263,3 +263,9 @@ castPeek p = peek (castPtr p)
`MetadataArray', id `Ptr ByteBuffer', `CompletionQueue',
`CompletionQueue',unTag `Tag'}
-> `CallError'#}
{#fun call_details_get_method as ^ {`CallDetails'} -> `String'#}
{#fun call_details_get_host as ^ {`CallDetails'} -> `String'#}
{#fun call_details_get_deadline as ^ {`CallDetails'} -> `CTimeSpec' peek* #}
@ -84,8 +84,10 @@ payloadLowLevelServerUnregistered :: TestServer
payloadLowLevelServerUnregistered = TestServer $ \grpc -> do
withServer grpc (ServerConfig "localhost" 50051 []) $ \server -> do
result <- serverHandleNormalCall server 11 M.empty $
\_reqBody _reqMeta -> return ("reply test", M.empty,
StatusDetails "details string")
\reqBody _reqMeta reqMethod -> do
reqBody @?= "Hello!"
reqMethod @?= "/foo"
return ("reply test", M.empty, StatusDetails "details string")
case result of
Left x -> error $ show x
Right _ -> return ()
@ -115,7 +117,7 @@ testServerUnregisteredAwaitNoClient =
let conf = ServerConfig "localhost" 50051 []
withServer grpc conf $ \server -> do
result <- serverHandleNormalCall server 10 M.empty $
\_ _ -> return ("", M.empty, StatusDetails "")
\_ _ _ -> return ("", M.empty, StatusDetails "")
case result of
Left err -> error $ show err
Right _ -> return ()
@ -135,7 +137,7 @@ testWithServerCall =
grpcTest "Server - Create/destroy call" $ \grpc -> do
let conf = ServerConfig "localhost" 50051 []
withServer grpc conf $ \server -> do
result <- withServerCall server 1 $ const $ return $ Right ()
result <- withServerUnregCall server 1 $ const $ return $ Right ()
result @?= Left GRPCIOTimeout
testWithClientCall :: TestTree
Add table
Reference in a new issue