Rename reg operations in all modules; use qualified imports whenever selecting unregistered variants

This commit is contained in:
Joel Stanley 2016-06-08 14:38:01 -05:00
parent b08ae78dd1
commit 8069ebba07
15 changed files with 240 additions and 223 deletions

View file

@ -1,19 +1,20 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# OPTIONS_GHC -fno-warn-missing-signatures #-}
{-# OPTIONS_GHC -fno-warn-unused-binds #-}
import Control.Monad
import Network.GRPC.LowLevel
import qualified Network.GRPC.LowLevel.Client.Unregistered as U
echoMethod = MethodName "/echo.Echo/DoEcho"
unregistered c = do
clientRequest c echoMethod 1 "hi" mempty
U.clientRequest c echoMethod 1 "hi" mempty
registered c = do
meth <- clientRegisterMethod c echoMethod Normal
clientRegisteredRequest c meth 1 "hi" mempty
clientRequest c meth 1 "hi" mempty
run f = withGRPC $ \g -> withClient g (ClientConfig "localhost" 50051) $ \c ->
f c >>= \case

View file

@ -33,7 +33,7 @@ regMain = withGRPC $ \grpc -> do
withServer grpc (ServerConfig "localhost" 50051 methods) $ \server ->
forever $ do
let method = head (registeredMethods server)
result <- serverHandleNormalRegisteredCall server method 15 serverMeta $
result <- serverHandleNormalCall server method 15 serverMeta $
\reqBody _reqMeta -> return (reqBody, serverMeta, serverMeta,
StatusDetails "")
case result of
@ -43,7 +43,7 @@ regMain = withGRPC $ \grpc -> do
-- | loop to fork n times
regLoop :: Server -> RegisteredMethod -> IO ()
regLoop server method = forever $ do
result <- serverHandleNormalRegisteredCall server method 15 serverMeta $
result <- serverHandleNormalCall server method 15 serverMeta $
\reqBody _reqMeta -> return (reqBody, serverMeta, serverMeta,
StatusDetails "")
case result of

View file

@ -43,6 +43,7 @@ library
Network.GRPC.Unsafe
Network.GRPC.LowLevel
Network.GRPC.LowLevel.Server.Unregistered
Network.GRPC.LowLevel.Client.Unregistered
other-modules:
Network.GRPC.LowLevel.CompletionQueue
Network.GRPC.LowLevel.CompletionQueue.Internal
@ -54,7 +55,6 @@ library
Network.GRPC.LowLevel.Call
Network.GRPC.LowLevel.Call.Unregistered
Network.GRPC.LowLevel.Client
Network.GRPC.LowLevel.Client.Unregistered
extra-libraries:
grpc
includes:

View file

@ -29,11 +29,11 @@ GRPC
-- * Server
, ServerConfig(..)
, Server
, ServerRegCall
, ServerCall
, registeredMethods
, withServer
, serverHandleNormalRegisteredCall
, withServerRegisteredCall
, serverHandleNormalCall
, withServerCall
-- * Client
, ClientConfig(..)
@ -43,13 +43,12 @@ GRPC
, clientConnectivity
, withClient
, clientRegisterMethod
, clientRegisteredRequest
, clientRequest
, withClientCall
-- * Ops
, runClientOps
, runServerRegOps
, runServerOps
, Op(..)
, OpRecvResult(..)
@ -60,7 +59,6 @@ import Network.GRPC.LowLevel.Server
import Network.GRPC.LowLevel.CompletionQueue
import Network.GRPC.LowLevel.Op
import Network.GRPC.LowLevel.Client
import Network.GRPC.LowLevel.Client.Unregistered
import Network.GRPC.LowLevel.Call
import Network.GRPC.Unsafe (ConnectivityState(..))

View file

@ -54,32 +54,31 @@ data RegisteredMethod = RegisteredMethod {methodType :: GRPCMethodType,
-- | Represents one GRPC call (i.e. request) on the client.
-- This is used to associate send/receive 'Op's with a request.
data ClientCall = ClientCall {internalClientCall :: C.Call}
data ClientCall = ClientCall { unClientCall :: C.Call }
-- | Represents one registered GRPC call on the server.
-- Contains pointers to all the C state needed to respond to a registered call.
data ServerRegCall = ServerRegCall
{internalServerRegCall :: C.Call,
requestMetadataRecvReg :: Ptr C.MetadataArray,
optionalPayload :: Ptr C.ByteBuffer,
parentPtrReg :: Maybe (Ptr C.Call),
callDeadline :: C.CTimeSpecPtr
-- | Represents one registered GRPC call on the server. Contains pointers to all
-- the C state needed to respond to a registered call.
data ServerCall = ServerCall
{ unServerCall :: C.Call,
requestMetadataRecv :: Ptr C.MetadataArray,
optionalPayload :: Ptr C.ByteBuffer,
parentPtr :: Maybe (Ptr C.Call),
callDeadline :: C.CTimeSpecPtr
}
serverRegCallGetMetadata :: ServerRegCall -> IO MetadataMap
serverRegCallGetMetadata ServerRegCall{..} = do
marray <- peek requestMetadataRecvReg
serverCallGetMetadata :: ServerCall -> IO MetadataMap
serverCallGetMetadata ServerCall{..} = do
marray <- peek requestMetadataRecv
C.getAllMetadataArray marray
-- | Extract the client request body from the given registered call, if present.
-- TODO: the reason this returns @Maybe ByteString@ is because the gRPC library
-- calls the underlying out parameter "optional_payload". I am not sure exactly
-- in what cases it won't be present. The C++ library checks a
-- has_request_payload_ bool and passes in nullptr to request_registered_call
-- if the bool is false, so we may be able to do the payload present/absent
-- check earlier.
serverRegCallGetPayload :: ServerRegCall -> IO (Maybe ByteString)
serverRegCallGetPayload ServerRegCall{..} = do
-- | Extract the client request body from the given call, if present. TODO: the
-- reason this returns @Maybe ByteString@ is because the gRPC library calls the
-- underlying out parameter "optional_payload". I am not sure exactly in what
-- cases it won't be present. The C++ library checks a has_request_payload_ bool
-- and passes in nullptr to request_registered_call if the bool is false, so we
-- may be able to do the payload present/absent check earlier.
serverCallGetPayload :: ServerCall -> IO (Maybe ByteString)
serverCallGetPayload ServerCall{..} = do
bb@(C.ByteBuffer rawPtr) <- peek optionalPayload
if rawPtr == nullPtr
then return Nothing
@ -94,48 +93,47 @@ debugClientCall (ClientCall (C.Call ptr)) =
debugClientCall = const $ return ()
#endif
debugServerRegCall :: ServerRegCall -> IO ()
debugServerCall :: ServerCall -> IO ()
#ifdef DEBUG
debugServerRegCall call@(ServerRegCall (C.Call ptr) _ _ _ _) = do
grpcDebug $ "debugServerRegCall: server call: " ++ (show ptr)
grpcDebug $ "debugServerRegCall: metadata ptr: "
++ show (requestMetadataRecvReg call)
metadataArr <- peek (requestMetadataRecvReg call)
debugServerCall call@(ServerCall (C.Call ptr) _ _ _ _) = do
grpcDebug $ "debugServerCall(R): server call: " ++ (show ptr)
grpcDebug $ "debugServerCall(R): metadata ptr: "
++ show (requestMetadataRecv call)
metadataArr <- peek (requestMetadataRecv call)
metadata <- C.getAllMetadataArray metadataArr
grpcDebug $ "debugServerRegCall: metadata received: " ++ (show metadata)
grpcDebug $ "debugServerRegCall: payload ptr: " ++ show (optionalPayload call)
grpcDebug $ "debugServerCall(R): metadata received: " ++ (show metadata)
grpcDebug $ "debugServerCall(R): payload ptr: " ++ show (optionalPayload call)
payload <- peek (optionalPayload call)
bs <- C.copyByteBufferToByteString payload
grpcDebug $ "debugServerRegCall: payload contents: " ++ show bs
forM_ (parentPtrReg call) $ \parentPtr' -> do
grpcDebug $ "debugServerRegCall: parent ptr: " ++ show parentPtr'
grpcDebug $ "debugServerCall(R): payload contents: " ++ show bs
forM_ (parentPtr call) $ \parentPtr' -> do
grpcDebug $ "debugServerCall(R): parent ptr: " ++ show parentPtr'
(C.Call parent) <- peek parentPtr'
grpcDebug $ "debugServerRegCall: parent: " ++ show parent
grpcDebug $ "debugServerRegCall: deadline ptr: " ++ show (callDeadline call)
grpcDebug $ "debugServerCall(R): parent: " ++ show parent
grpcDebug $ "debugServerCall(R): deadline ptr: " ++ show (callDeadline call)
timespec <- peek (callDeadline call)
grpcDebug $ "debugServerRegCall: deadline: " ++ show (C.timeSpec timespec)
grpcDebug $ "debugServerCall(R): deadline: " ++ show (C.timeSpec timespec)
#else
{-# INLINE debugServerRegCall #-}
debugServerRegCall = const $ return ()
{-# INLINE debugServerCall #-}
debugServerCall = const $ return ()
#endif
destroyClientCall :: ClientCall -> IO ()
destroyClientCall ClientCall{..} = do
grpcDebug "Destroying client-side call object."
C.grpcCallDestroy internalClientCall
C.grpcCallDestroy unClientCall
destroyServerRegCall :: ServerRegCall -> IO ()
destroyServerRegCall call@ServerRegCall{..} = do
grpcDebug "destroyServerRegCall: entered."
debugServerRegCall call
grpcDebug $ "Destroying server-side call object: "
++ show internalServerRegCall
C.grpcCallDestroy internalServerRegCall
grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecvReg
C.metadataArrayDestroy requestMetadataRecvReg
destroyServerCall :: ServerCall -> IO ()
destroyServerCall call@ServerCall{..} = do
grpcDebug "destroyServerCall(R): entered."
debugServerCall call
grpcDebug $ "Destroying server-side call object: " ++ show unServerCall
C.grpcCallDestroy unServerCall
grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecv
C.metadataArrayDestroy requestMetadataRecv
grpcDebug $ "destroying optional payload" ++ show optionalPayload
C.destroyReceivingByteBuffer optionalPayload
grpcDebug $ "freeing parentPtr: " ++ show parentPtrReg
forM_ parentPtrReg free
grpcDebug $ "freeing parentPtr: " ++ show parentPtr
forM_ parentPtr free
grpcDebug $ "destroying deadline." ++ show callDeadline
C.timespecDestroy callDeadline

View file

@ -3,21 +3,19 @@
module Network.GRPC.LowLevel.Call.Unregistered where
import Control.Monad
import Foreign.Marshal.Alloc (free)
import Foreign.Ptr (Ptr)
import Foreign.Storable (peek)
import qualified Network.GRPC.Unsafe as C
import qualified Network.GRPC.Unsafe.Metadata as C
import Network.GRPC.LowLevel.Call
import Network.GRPC.LowLevel.GRPC (MetadataMap, grpcDebug)
import Foreign.Marshal.Alloc (free)
import Foreign.Ptr (Ptr)
import Foreign.Storable (peek)
import Network.GRPC.LowLevel.Call (Host (..), MethodName (..))
import Network.GRPC.LowLevel.GRPC (MetadataMap, grpcDebug)
import qualified Network.GRPC.Unsafe as C
import qualified Network.GRPC.Unsafe.Metadata as C
-- | Represents one unregistered GRPC call on the server.
-- Contains pointers to all the C state needed to respond to an unregistered
-- call.
data ServerCall = ServerCall
{ internalServerCall :: C.Call
{ unServerCall :: C.Call
, requestMetadataRecv :: Ptr C.MetadataArray
, parentPtr :: Maybe (Ptr C.Call)
, callDetails :: C.CallDetails
@ -61,9 +59,8 @@ destroyServerCall :: ServerCall -> IO ()
destroyServerCall call@ServerCall{..} = do
grpcDebug "destroyServerCall(U): entered."
debugServerCall call
grpcDebug $ "Destroying server-side call object: "
++ show internalServerCall
C.grpcCallDestroy internalServerCall
grpcDebug $ "Destroying server-side call object: " ++ show unServerCall
C.grpcCallDestroy unServerCall
grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecv
C.metadataArrayDestroy requestMetadataRecv
grpcDebug $ "freeing parentPtr: " ++ show parentPtr

View file

@ -58,7 +58,7 @@ clientConnectivity Client{..} =
C.grpcChannelCheckConnectivityState clientChannel False
-- | Register a method on the client so that we can call it with
-- 'clientRegisteredRequest'.
-- 'clientRequest'.
clientRegisterMethod :: Client
-> MethodName
-- ^ method name, e.g. "/foo"
@ -74,27 +74,30 @@ clientRegisterMethod _ _ _ = error "Streaming methods not yet implemented."
-- | Create a new call on the client for a registered method.
-- Returns 'Left' if the CQ is shutting down or if the job to create a call
-- timed out.
clientCreateRegisteredCall :: Client -> RegisteredMethod -> TimeoutSeconds
-> IO (Either GRPCIOError ClientCall)
clientCreateRegisteredCall Client{..} RegisteredMethod{..} timeout = do
clientCreateCall :: Client
-> RegisteredMethod
-> TimeoutSeconds
-> IO (Either GRPCIOError ClientCall)
clientCreateCall Client{..} RegisteredMethod{..} timeout = do
let parentCall = C.Call nullPtr --Unsure what this does. null is safe, though.
C.withDeadlineSeconds timeout $ \deadline -> do
channelCreateRegisteredCall clientChannel parentCall C.propagateDefaults
clientCQ methodHandle deadline
channelCreateCall clientChannel parentCall C.propagateDefaults
clientCQ methodHandle deadline
-- TODO: the error-handling refactor made this quite ugly. It could be fixed
-- by switching to ExceptT IO.
-- | Handles safe creation and cleanup of a client call
withClientRegisteredCall :: Client -> RegisteredMethod -> TimeoutSeconds
-> (ClientCall
-> IO (Either GRPCIOError a))
-> IO (Either GRPCIOError a)
withClientRegisteredCall client regmethod timeout f = do
createResult <- clientCreateRegisteredCall client regmethod timeout
withClientCall :: Client
-> RegisteredMethod
-> TimeoutSeconds
-> (ClientCall -> IO (Either GRPCIOError a))
-> IO (Either GRPCIOError a)
withClientCall client regmethod timeout f = do
createResult <- clientCreateCall client regmethod timeout
case createResult of
Left x -> return $ Left x
Right call -> f call `finally` logDestroy call
where logDestroy c = grpcDebug "withClientRegisteredCall: destroying."
where logDestroy c = grpcDebug "withClientCall(R): destroying."
>> destroyClientCall c
data NormalRequestResult = NormalRequestResult
@ -131,25 +134,24 @@ compileNormalRequestResults x =
-- server's response. TODO: This is preliminary until we figure out how many
-- different variations on sending request ops will be needed for full gRPC
-- functionality.
clientRegisteredRequest :: Client
-> RegisteredMethod
-> TimeoutSeconds
-- ^ Timeout of both the grpc_call and the
-- max time to wait for the completion of the batch.
-- TODO: I think we will need to decouple the
-- lifetime of the call from the queue deadline once
-- we expose functionality for streaming calls, where
-- one call object persists across many batches.
-> ByteString
-- ^ The body of the request.
-> MetadataMap
-- ^ Metadata to send with the request.
-> IO (Either GRPCIOError NormalRequestResult)
clientRegisteredRequest client@(Client{..}) rm@(RegisteredMethod{..})
timeLimit body meta =
clientRequest :: Client
-> RegisteredMethod
-> TimeoutSeconds
-- ^ Timeout of both the grpc_call and the max time to wait for
-- the completion of the batch. TODO: I think we will need to
-- decouple the lifetime of the call from the queue deadline once
-- we expose functionality for streaming calls, where one call
-- object persists across many batches.
-> ByteString
-- ^ The body of the request
-> MetadataMap
-- ^ Metadata to send with the request
-> IO (Either GRPCIOError NormalRequestResult)
clientRequest client@(Client{..}) rm@(RegisteredMethod{..})
timeLimit body meta =
fmap join $ case methodType of
Normal -> withClientRegisteredCall client rm timeLimit $ \call -> do
grpcDebug "clientRegisteredRequest: created call."
Normal -> withClientCall client rm timeLimit $ \call -> do
grpcDebug "clientRequest(R): created call."
debugClientCall call
-- NOTE: sendOps and recvOps *must* be in separate batches or
-- the client hangs when the server can't be reached.
@ -158,7 +160,7 @@ clientRegisteredRequest client@(Client{..}) rm@(RegisteredMethod{..})
, OpSendCloseFromClient]
sendRes <- runClientOps call clientCQ sendOps timeLimit
case sendRes of
Left x -> do grpcDebug "clientRegisteredRequest: batch error."
Left x -> do grpcDebug "clientRequest(R) : batch error."
return $ Left x
Right rs -> do
let recvOps = [OpRecvInitialMetadata,
@ -167,7 +169,7 @@ clientRegisteredRequest client@(Client{..}) rm@(RegisteredMethod{..})
recvRes <- runClientOps call clientCQ recvOps timeLimit
case recvRes of
Left x -> do
grpcDebug "clientRegisteredRequest: batch error."
grpcDebug "clientRequest(R): batch error."
return $ Left x
Right rs' -> do
return $ Right $ compileNormalRequestResults (rs ++ rs')

View file

@ -11,7 +11,11 @@ import qualified Network.GRPC.Unsafe.Constants as C
import qualified Network.GRPC.Unsafe.Time as C
import Network.GRPC.LowLevel.Call
import Network.GRPC.LowLevel.Client
import Network.GRPC.LowLevel.Client (Client (..),
NormalRequestResult (..),
clientEndpoint,
clientNormalRequestOps,
compileNormalRequestResults)
import Network.GRPC.LowLevel.CompletionQueue (TimeoutSeconds)
import qualified Network.GRPC.LowLevel.CompletionQueue.Unregistered as U
import Network.GRPC.LowLevel.GRPC
@ -40,7 +44,7 @@ withClientCall client method timeout f = do
case createResult of
Left x -> return $ Left x
Right call -> f call `finally` logDestroy call
where logDestroy c = grpcDebug "withClientCall: destroying."
where logDestroy c = grpcDebug "withClientCall(U): destroying."
>> destroyClientCall c
-- | Makes a normal (non-streaming) request without needing to register a method
@ -61,7 +65,7 @@ clientRequest client@Client{..} meth timeLimit body meta =
withClientCall client meth timeLimit $ \call -> do
let ops = clientNormalRequestOps body meta
results <- runClientOps call clientCQ ops timeLimit
grpcDebug "clientRequest: ops ran."
grpcDebug "clientRequest(U): ops ran."
case results of
Left x -> return $ Left x
Right rs -> return $ Right $ compileNormalRequestResults rs

View file

@ -19,12 +19,12 @@ module Network.GRPC.LowLevel.CompletionQueue
, shutdownCompletionQueue
, pluck
, startBatch
, channelCreateRegisteredCall
, channelCreateCall
, TimeoutSeconds
, isEventSuccessful
, serverRegisterCompletionQueue
, serverShutdownAndNotify
, serverRequestRegisteredCall
, serverRequestCall
, newTag
)
where
@ -104,14 +104,17 @@ shutdownCompletionQueue (CompletionQueue{..}) = do
C.QueueTimeout -> drainLoop
C.OpComplete -> drainLoop
channelCreateRegisteredCall :: C.Channel -> C.Call -> C.PropagationMask
-> CompletionQueue -> C.CallHandle
-> C.CTimeSpecPtr
-> IO (Either GRPCIOError ClientCall)
channelCreateRegisteredCall
channelCreateCall :: C.Channel
-> C.Call
-> C.PropagationMask
-> CompletionQueue
-> C.CallHandle
-> C.CTimeSpecPtr
-> IO (Either GRPCIOError ClientCall)
channelCreateCall
chan parent mask cq@CompletionQueue{..} handle deadline =
withPermission Push cq $ do
grpcDebug $ "channelCreateRegisteredCall: call with "
grpcDebug $ "channelCreateCall: call with "
++ concat (intersperse " " [show chan, show parent, show mask,
show unsafeCQ, show handle,
show deadline])
@ -120,10 +123,13 @@ channelCreateRegisteredCall
return $ Right $ ClientCall call
-- | Create the call object to handle a registered call.
serverRequestRegisteredCall :: C.Server -> CompletionQueue -> TimeoutSeconds
-> RegisteredMethod -> MetadataMap
-> IO (Either GRPCIOError ServerRegCall)
serverRequestRegisteredCall
serverRequestCall :: C.Server
-> CompletionQueue
-> TimeoutSeconds
-> RegisteredMethod
-> MetadataMap
-> IO (Either GRPCIOError ServerCall)
serverRequestCall
server cq@CompletionQueue{..} timeLimit RegisteredMethod{..} initMeta =
withPermission Push cq $ do
-- TODO: Is gRPC supposed to populate this deadline?
@ -146,28 +152,28 @@ serverRequestRegisteredCall
callError <- C.grpcServerRequestRegisteredCall
server methodHandle callPtr deadline
metadataArray bbPtr unsafeCQ unsafeCQ tag
grpcDebug $ "serverRequestRegisteredCall: callError: "
grpcDebug $ "serverRequestCall(R): callError: "
++ show callError
if callError /= C.CallOk
then do grpcDebug "serverRequestRegisteredCall: callError. cleaning up"
then do grpcDebug "serverRequestCall(R): callError. cleaning up"
failureCleanup deadline callPtr metadataArrayPtr bbPtr
return $ Left $ GRPCIOCallError callError
else do pluckResult <- pluck cq tag timeLimit
grpcDebug "serverRequestRegisteredCall: finished pluck."
grpcDebug "serverRequestCall(R): finished pluck."
case pluckResult of
Left x -> do
grpcDebug "serverRequestRegisteredCall: cleanup pluck err"
grpcDebug "serverRequestCall(R): cleanup pluck err"
failureCleanup deadline callPtr metadataArrayPtr bbPtr
return $ Left x
Right () -> do
rawCall <- peek callPtr
let assembledCall = ServerRegCall rawCall metadataArrayPtr
bbPtr Nothing deadline
let assembledCall = ServerCall rawCall metadataArrayPtr
bbPtr Nothing deadline
return $ Right assembledCall
-- TODO: see TODO for failureCleanup in serverRequestCall.
where failureCleanup deadline callPtr metadataArrayPtr bbPtr = forkIO $ do
threadDelaySecs 30
grpcDebug "serverRequestRegisteredCall: doing delayed cleanup."
grpcDebug "serverRequestCall(R): doing delayed cleanup."
C.timespecDestroy deadline
free callPtr
C.metadataArrayDestroy metadataArrayPtr

View file

@ -29,8 +29,10 @@ channelCreateCall chan parent mask cq@CompletionQueue{..} meth endpt deadline =
return $ Right $ ClientCall call
serverRequestCall :: C.Server -> CompletionQueue -> TimeoutSeconds
-> IO (Either GRPCIOError U.ServerCall)
serverRequestCall :: C.Server
-> CompletionQueue
-> TimeoutSeconds
-> IO (Either GRPCIOError U.ServerCall)
serverRequestCall server cq@CompletionQueue{..} timeLimit =
withPermission Push cq $ do
callPtr <- malloc

View file

@ -4,23 +4,22 @@
module Network.GRPC.LowLevel.Op where
import Control.Exception
import qualified Data.ByteString as B
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes)
import Foreign.C.String (CString)
import Foreign.C.Types (CInt)
import Foreign.Marshal.Alloc (free, malloc,
mallocBytes)
import Foreign.Ptr (Ptr, nullPtr)
import Foreign.Storable (peek, poke)
import qualified Network.GRPC.Unsafe as C (Call)
import qualified Network.GRPC.Unsafe.ByteBuffer as C
import qualified Network.GRPC.Unsafe.Metadata as C
import qualified Network.GRPC.Unsafe.Op as C
import qualified Data.ByteString as B
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes)
import Foreign.C.String (CString)
import Foreign.C.Types (CInt)
import Foreign.Marshal.Alloc (free, malloc,
mallocBytes)
import Foreign.Ptr (Ptr, nullPtr)
import Foreign.Storable (peek, poke)
import Network.GRPC.LowLevel.Call
import Network.GRPC.LowLevel.CompletionQueue
import Network.GRPC.LowLevel.GRPC
import qualified Network.GRPC.Unsafe as C (Call)
import qualified Network.GRPC.Unsafe.ByteBuffer as C
import qualified Network.GRPC.Unsafe.Metadata as C
import qualified Network.GRPC.Unsafe.Op as C
-- | Sum describing all possible send and receive operations that can be batched
-- and executed by gRPC. Usually these are processed in a handful of
@ -219,26 +218,26 @@ runOps call cq ops timeLimit =
fmap (Right . catMaybes) $ mapM resultFromOpContext contexts
Left err -> return $ Left err
-- | For a given call, run the given 'Op's on the given completion queue with
-- the given tag. Blocks until the ops are complete or the given number of
-- | For a given server call, run the given 'Op's on the given completion queue
-- with the given tag. Blocks until the ops are complete or the given number of
-- seconds have elapsed. TODO: now that we distinguish between different types
-- of calls at the type level, we could try to limit the input 'Op's more
-- appropriately. E.g., we don't use an 'OpRecvInitialMetadata' when receiving a
-- registered call, because gRPC handles that for us.
runServerRegOps :: ServerRegCall
-- ^ 'Call' that this batch is associated with. One call can be
-- associated with many batches.
-> CompletionQueue
-- ^ Queue on which our tag will be placed once our ops are done
-- running.
-> [Op]
runServerOps :: ServerCall
-- ^ 'Call' that this batch is associated with. One call can be
-- associated with many batches.
-> CompletionQueue
-- ^ Queue on which our tag will be placed once our ops are done
-- running.
-> [Op]
-- ^ The list of 'Op's to execute.
-> TimeoutSeconds
-> TimeoutSeconds
-- ^ How long to block waiting for the tag to appear on the
--queue. If we time out, the result of this action will be
-- queue. If we time out, the result of this action will be
-- @CallBatchError BatchTimeout@.
-> IO (Either GRPCIOError [OpRecvResult])
runServerRegOps = runOps . internalServerRegCall
-> IO (Either GRPCIOError [OpRecvResult])
runServerOps = runOps . unServerCall
-- | Like 'runServerOps', but for client-side calls.
runClientOps :: ClientCall
@ -246,7 +245,7 @@ runClientOps :: ClientCall
-> [Op]
-> TimeoutSeconds
-> IO (Either GRPCIOError [OpRecvResult])
runClientOps = runOps . internalClientCall
runClientOps = runOps . unClientCall
-- | If response status info is present in the given 'OpRecvResult's, returns
-- a tuple of trailing metadata, status code, and status details.

View file

@ -10,4 +10,4 @@ runServerOps :: U.ServerCall
-> [Op]
-> TimeoutSeconds
-> IO (Either GRPCIOError [OpRecvResult])
runServerOps = runOps . U.internalServerCall
runServerOps = runOps . U.unServerCall

View file

@ -5,24 +5,23 @@
-- `Network.GRPC.LowLevel.Server.Unregistered`.
module Network.GRPC.LowLevel.Server where
import Control.Exception (bracket, finally)
import Control.Exception (bracket, finally)
import Control.Monad
import Data.ByteString (ByteString)
import Foreign.Ptr (nullPtr)
import qualified Network.GRPC.Unsafe as C
import qualified Network.GRPC.Unsafe.Op as C
import Data.ByteString (ByteString)
import Foreign.Ptr (nullPtr)
import Network.GRPC.LowLevel.Call
import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue,
TimeoutSeconds,
createCompletionQueue,
pluck,
serverRegisterCompletionQueue,
serverRequestRegisteredCall,
serverShutdownAndNotify,
shutdownCompletionQueue)
import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue,
TimeoutSeconds,
createCompletionQueue,
pluck,
serverRegisterCompletionQueue,
serverRequestCall,
serverShutdownAndNotify,
shutdownCompletionQueue)
import Network.GRPC.LowLevel.GRPC
import Network.GRPC.LowLevel.Op
import qualified Network.GRPC.Unsafe as C
import qualified Network.GRPC.Unsafe.Op as C
-- | Wraps various gRPC state needed to run a server.
data Server = Server
@ -123,24 +122,27 @@ serverRegisterMethod _ _ _ _ = error "Streaming methods not implemented yet."
-- | Create a 'Call' with which to wait for the invocation of a registered
-- method.
serverCreateRegisteredCall :: Server -> RegisteredMethod -> TimeoutSeconds
-> MetadataMap
-> IO (Either GRPCIOError ServerRegCall)
serverCreateRegisteredCall Server{..} rm timeLimit initMeta =
serverRequestRegisteredCall internalServer serverCQ timeLimit rm initMeta
serverCreateCall :: Server
-> RegisteredMethod
-> TimeoutSeconds
-> MetadataMap
-> IO (Either GRPCIOError ServerCall)
serverCreateCall Server{..} rm timeLimit initMeta =
serverRequestCall internalServer serverCQ timeLimit rm initMeta
withServerRegisteredCall :: Server -> RegisteredMethod -> TimeoutSeconds
-> MetadataMap
-> (ServerRegCall
-> IO (Either GRPCIOError a))
-> IO (Either GRPCIOError a)
withServerRegisteredCall server regmethod timeout initMeta f = do
createResult <- serverCreateRegisteredCall server regmethod timeout initMeta
withServerCall :: Server
-> RegisteredMethod
-> TimeoutSeconds
-> MetadataMap
-> (ServerCall -> IO (Either GRPCIOError a))
-> IO (Either GRPCIOError a)
withServerCall server regmethod timeout initMeta f = do
createResult <- serverCreateCall server regmethod timeout initMeta
case createResult of
Left x -> return $ Left x
Right call -> f call `finally` logDestroy call
where logDestroy c = grpcDebug "withServerRegisteredCall: destroying."
>> destroyServerRegCall c
>> destroyServerCall c
-- | Sequence of 'Op's needed to receive a normal (non-streaming) call.
serverOpsGetNormalCall :: MetadataMap -> [Op]
@ -174,44 +176,49 @@ serverOpsSendNormalRegisteredResponse
OpSendMessage body,
OpSendStatusFromServer trailingMeta code details]
-- | A handler for an registered server call; bytestring parameter is request
-- body, with the bytestring response body in the result tuple. The first
-- metadata parameter refers to the request metadata, with the two metadata
-- values in the result tuple being the initial and trailing metadata
-- respectively.
-- TODO: make a more rigid type for this with a Maybe MetadataMap for the
-- trailing meta, and use it for both kinds of call handlers.
type ServerHandler
= ByteString -> MetadataMap
-> IO (ByteString, MetadataMap, MetadataMap, StatusDetails)
-- TODO: we will want to replace this with some more general concept that also
-- works with streaming calls in the future.
-- | Wait for and then handle a normal (non-streaming) call.
serverHandleNormalRegisteredCall :: Server
-> RegisteredMethod
-> TimeoutSeconds
-> MetadataMap
-- ^ Initial server metadata
-> (ByteString -> MetadataMap
-> IO (ByteString,
MetadataMap,
MetadataMap,
StatusDetails))
-- ^ Handler function takes a request body and
-- metadata and returns a response body and
-- metadata.
-> IO (Either GRPCIOError ())
serverHandleNormalRegisteredCall s@Server{..} rm timeLimit srvMetadata f = do
serverHandleNormalCall :: Server
-> RegisteredMethod
-> TimeoutSeconds
-> MetadataMap
-- ^ Initial server metadata
-> ServerHandler
-> IO (Either GRPCIOError ())
serverHandleNormalCall s@Server{..} rm timeLimit srvMetadata f = do
-- TODO: we use this timeLimit twice, so the max time spent is 2*timeLimit.
-- Should we just hard-code time limits instead? Not sure if client
-- programmer cares, since this function will likely just be put in a loop
-- anyway.
withServerRegisteredCall s rm timeLimit srvMetadata $ \call -> do
grpcDebug "serverHandleNormalRegisteredCall: starting batch."
debugServerRegCall call
payload <- serverRegCallGetPayload call
withServerCall s rm timeLimit srvMetadata $ \call -> do
grpcDebug "serverHandleNormalCall(R): starting batch."
debugServerCall call
payload <- serverCallGetPayload call
case payload of
--TODO: what should we do with an empty payload? Have the handler take
-- @Maybe ByteString@? Need to figure out when/why payload would be empty.
Nothing -> error "serverHandleNormalRegisteredCall: payload empty."
Nothing -> error "serverHandleNormalCall(R): payload empty."
Just requestBody -> do
requestMeta <- serverRegCallGetMetadata call
requestMeta <- serverCallGetMetadata call
(respBody, initMeta, trailingMeta, details) <- f requestBody requestMeta
let status = C.GrpcStatusOk
let respOps = serverOpsSendNormalRegisteredResponse
respBody initMeta trailingMeta status details
respOpsResults <- runServerRegOps call serverCQ respOps timeLimit
grpcDebug "serverHandleNormalRegisteredCall: finished response ops."
respOpsResults <- runServerOps call serverCQ respOps timeLimit
grpcDebug "serverHandleNormalCall(R): finished response ops."
case respOpsResults of
Left x -> return $ Left x
Right _ -> return $ Right ()

View file

@ -11,7 +11,9 @@ import qualified Network.GRPC.LowLevel.CompletionQueue.Unregistered as U
import Network.GRPC.LowLevel.GRPC
import Network.GRPC.LowLevel.Op (OpRecvResult (..))
import qualified Network.GRPC.LowLevel.Op.Unregistered as U
import Network.GRPC.LowLevel.Server
import Network.GRPC.LowLevel.Server (Server (..),
serverOpsGetNormalCall,
serverOpsSendNormalResponse)
import qualified Network.GRPC.Unsafe.Op as C
serverCreateCall :: Server -> TimeoutSeconds
@ -44,7 +46,7 @@ serverHandleNormalCall :: Server
-> IO (Either GRPCIOError ())
serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do
withServerCall s timeLimit $ \call -> do
grpcDebug "serverHandleCall(U): starting batch."
grpcDebug "serverHandleNormalCall(U): starting batch."
let recvOps = serverOpsGetNormalCall srvMetadata
opResults <- U.runServerOps call serverCQ recvOps timeLimit
case opResults of

View file

@ -11,7 +11,8 @@ import Control.Monad
import Data.ByteString (ByteString)
import qualified Data.Map as M
import Network.GRPC.LowLevel
import Network.GRPC.LowLevel.Server.Unregistered as U
import qualified Network.GRPC.LowLevel.Client.Unregistered as U
import qualified Network.GRPC.LowLevel.Server.Unregistered as U
import Test.Tasty
import Test.Tasty.HUnit as HU (Assertion,
assertEqual,
@ -50,14 +51,14 @@ testClientCreateDestroy =
testClientCall :: TestTree
testClientCall =
clientOnlyTest "create/destroy call" $ \c -> do
r <- withClientCall c "foo" 10 $ const $ return $ Right ()
r <- U.withClientCall c "/foo" 10 $ const $ return $ Right ()
r @?= Right ()
testClientTimeoutNoServer :: TestTree
testClientTimeoutNoServer =
clientOnlyTest "request timeout when server DNE" $ \c -> do
rm <- clientRegisterMethod c "/foo" Normal
r <- clientRegisteredRequest c rm 1 "Hello" mempty
r <- clientRequest c rm 1 "Hello" mempty
r @?= Left GRPCIOTimeout
testServerCreateDestroy :: TestTree
@ -74,7 +75,7 @@ testServerTimeoutNoClient :: TestTree
testServerTimeoutNoClient =
serverOnlyTest "wait timeout when client DNE" [("/foo", Normal)] $ \s -> do
let rm = head (registeredMethods s)
r <- serverHandleNormalRegisteredCall s rm 1 mempty $ \_ _ ->
r <- serverHandleNormalCall s rm 1 mempty $ \_ _ ->
return ("", mempty, mempty, StatusDetails "details")
r @?= Left GRPCIOTimeout
@ -89,13 +90,13 @@ testWrongEndpoint =
-- further
client c = do
rm <- clientRegisterMethod c "/bar" Normal
r <- clientRegisteredRequest c rm 1 "Hello!" mempty
r <- clientRequest c rm 1 "Hello!" mempty
r @?= Left (GRPCIOBadStatusCode GrpcStatusDeadlineExceeded
(StatusDetails "Deadline Exceeded"))
server s = do
length (registeredMethods s) @?= 1
let rm = head (registeredMethods s)
r <- serverHandleNormalRegisteredCall s rm 10 mempty $ \_ _ -> do
r <- serverHandleNormalCall s rm 10 mempty $ \_ _ -> do
return ("reply test", dummyMeta, dummyMeta, StatusDetails "details string")
r @?= Right ()
@ -112,7 +113,7 @@ testPayload =
clientMD = [("foo_key", "foo_val"), ("bar_key", "bar_val")]
client c = do
rm <- clientRegisterMethod c "/foo" Normal
clientRegisteredRequest c rm 10 "Hello!" clientMD >>= do
clientRequest c rm 10 "Hello!" clientMD >>= do
checkReqRslt $ \NormalRequestResult{..} -> do
rspCode @?= GrpcStatusOk
rspBody @?= "reply test"
@ -122,7 +123,7 @@ testPayload =
server s = do
length (registeredMethods s) @?= 1
let rm = head (registeredMethods s)
r <- serverHandleNormalRegisteredCall s rm 11 mempty $ \reqBody reqMD -> do
r <- serverHandleNormalCall s rm 11 mempty $ \reqBody reqMD -> do
reqBody @?= "Hello!"
checkMD "Server metadata mismatch" clientMD reqMD
return ("reply test", dummyMeta, dummyMeta, StatusDetails "details string")
@ -133,13 +134,13 @@ testPayloadUnregistered =
csTest "unregistered normal request/response" client server []
where
client c = do
clientRequest c "/foo" 10 "Hello!" mempty >>= do
U.clientRequest c "/foo" 10 "Hello!" mempty >>= do
checkReqRslt $ \NormalRequestResult{..} -> do
rspCode @?= GrpcStatusOk
rspBody @?= "reply test"
details @?= "details string"
server s = do
r <- serverHandleNormalCall s 11 mempty $ \body _md meth -> do
r <- U.serverHandleNormalCall s 11 mempty $ \body _md meth -> do
body @?= "Hello!"
meth @?= "/foo"
return ("reply test", mempty, "details string")