Split up CompletionQueue into CompletionQueue.{Unregistered,Internal} and add Unregistered namespace for Op

This commit is contained in:
Joel Stanley 2016-06-08 12:45:47 -05:00
parent eb1040d07b
commit e8d3e6450e
11 changed files with 261 additions and 238 deletions

View file

@ -45,8 +45,11 @@ library
Network.GRPC.LowLevel.Server.Unregistered Network.GRPC.LowLevel.Server.Unregistered
other-modules: other-modules:
Network.GRPC.LowLevel.CompletionQueue Network.GRPC.LowLevel.CompletionQueue
Network.GRPC.LowLevel.CompletionQueue.Internal
Network.GRPC.LowLevel.CompletionQueue.Unregistered
Network.GRPC.LowLevel.GRPC Network.GRPC.LowLevel.GRPC
Network.GRPC.LowLevel.Op Network.GRPC.LowLevel.Op
Network.GRPC.LowLevel.Op.Unregistered
Network.GRPC.LowLevel.Server Network.GRPC.LowLevel.Server
Network.GRPC.LowLevel.Call Network.GRPC.LowLevel.Call
Network.GRPC.LowLevel.Call.Unregistered Network.GRPC.LowLevel.Call.Unregistered

View file

@ -50,7 +50,6 @@ GRPC
-- * Ops -- * Ops
, runClientOps , runClientOps
, runServerRegOps , runServerRegOps
, runServerUnregOps
, Op(..) , Op(..)
, OpRecvResult(..) , OpRecvResult(..)
@ -63,7 +62,6 @@ import Network.GRPC.LowLevel.Op
import Network.GRPC.LowLevel.Client import Network.GRPC.LowLevel.Client
import Network.GRPC.LowLevel.Client.Unregistered import Network.GRPC.LowLevel.Client.Unregistered
import Network.GRPC.LowLevel.Call import Network.GRPC.LowLevel.Call
import Network.GRPC.LowLevel.Call.Unregistered
import Network.GRPC.Unsafe (ConnectivityState(..)) import Network.GRPC.Unsafe (ConnectivityState(..))
import Network.GRPC.Unsafe.Op (StatusCode(..)) import Network.GRPC.Unsafe.Op (StatusCode(..))

View file

@ -2,17 +2,18 @@
module Network.GRPC.LowLevel.Client.Unregistered where module Network.GRPC.LowLevel.Client.Unregistered where
import Control.Exception (finally) import Control.Exception (finally)
import Control.Monad (join) import Control.Monad (join)
import Data.ByteString (ByteString) import Data.ByteString (ByteString)
import Foreign.Ptr (nullPtr) import Foreign.Ptr (nullPtr)
import qualified Network.GRPC.Unsafe as C import qualified Network.GRPC.Unsafe as C
import qualified Network.GRPC.Unsafe.Constants as C import qualified Network.GRPC.Unsafe.Constants as C
import qualified Network.GRPC.Unsafe.Time as C import qualified Network.GRPC.Unsafe.Time as C
import Network.GRPC.LowLevel.Call import Network.GRPC.LowLevel.Call
import Network.GRPC.LowLevel.Client import Network.GRPC.LowLevel.Client
import Network.GRPC.LowLevel.CompletionQueue import Network.GRPC.LowLevel.CompletionQueue (TimeoutSeconds)
import qualified Network.GRPC.LowLevel.CompletionQueue.Unregistered as U
import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.GRPC
import Network.GRPC.LowLevel.Op import Network.GRPC.LowLevel.Op
@ -26,8 +27,8 @@ clientCreateCall :: Client
clientCreateCall Client{..} meth timeout = do clientCreateCall Client{..} meth timeout = do
let parentCall = C.Call nullPtr let parentCall = C.Call nullPtr
C.withDeadlineSeconds timeout $ \deadline -> do C.withDeadlineSeconds timeout $ \deadline -> do
channelCreateCall clientChannel parentCall C.propagateDefaults U.channelCreateCall clientChannel parentCall C.propagateDefaults
clientCQ meth (clientEndpoint clientConfig) deadline clientCQ meth (clientEndpoint clientConfig) deadline
withClientCall :: Client withClientCall :: Client
-> MethodName -> MethodName

View file

@ -14,30 +14,23 @@ module Network.GRPC.LowLevel.CompletionQueue
, pluck , pluck
, startBatch , startBatch
, channelCreateRegisteredCall , channelCreateRegisteredCall
, channelCreateCall
, TimeoutSeconds , TimeoutSeconds
, isEventSuccessful , isEventSuccessful
, serverRegisterCompletionQueue , serverRegisterCompletionQueue
, serverShutdownAndNotify , serverShutdownAndNotify
, serverRequestRegisteredCall , serverRequestRegisteredCall
, serverRequestCall
, newTag , newTag
) )
where where
import Control.Concurrent (forkIO, threadDelay) import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically, check, import Control.Concurrent.STM (atomically, check)
retry) import Control.Concurrent.STM.TVar (newTVarIO, readTVar,
import Control.Concurrent.STM.TVar (TVar, modifyTVar',
newTVarIO, readTVar,
writeTVar) writeTVar)
import Control.Exception (bracket) import Control.Exception (bracket)
import Data.IORef (IORef, import Data.IORef (newIORef)
atomicModifyIORef',
newIORef)
import Data.List (intersperse) import Data.List (intersperse)
import Foreign.Marshal.Alloc (free, malloc) import Foreign.Marshal.Alloc (free, malloc)
import Foreign.Ptr (nullPtr, plusPtr)
import Foreign.Storable (peek) import Foreign.Storable (peek)
import qualified Network.GRPC.Unsafe as C import qualified Network.GRPC.Unsafe as C
import qualified Network.GRPC.Unsafe.Constants as C import qualified Network.GRPC.Unsafe.Constants as C
@ -47,100 +40,8 @@ import qualified Network.GRPC.Unsafe.Time as C
import System.Timeout (timeout) import System.Timeout (timeout)
import Network.GRPC.LowLevel.Call import Network.GRPC.LowLevel.Call
import qualified Network.GRPC.LowLevel.Call.Unregistered as U
import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.GRPC
import Network.GRPC.LowLevel.CompletionQueue.Internal
-- NOTE: the concurrency requirements for a CompletionQueue are a little
-- complicated. There are two read operations: next and pluck. We can either
-- call next on a CQ or call pluck up to 'maxCompletionQueuePluckers' times
-- concurrently, but we can't mix next and pluck calls. Fortunately, we only
-- need to use next when we are shutting down the queue. Thus, we do two things
-- to shut down:
-- 1. Set the shuttingDown 'TVar' to 'True'. When this is set, no new pluck
-- calls will be allowed to start.
-- 2. Wait until no threads are plucking, as counted by 'currentPluckers'.
-- This logic can be seen in 'pluck' and 'shutdownCompletionQueue'.
-- NOTE: There is one more possible race condition: pushing work onto the queue
-- after we begin to shut down.
-- Solution: another counter, which must reach zero before the shutdown
-- can start.
-- TODO: 'currentPushers' currently imposes an arbitrary limit on the number of
-- concurrent pushers to the CQ, but I don't know what the limit should be set
-- to. I haven't found any documentation that suggests there is a limit imposed
-- by the gRPC library, but there might be. Need to investigate further.
-- | Wraps the state necessary to use a gRPC completion queue. Completion queues
-- are used to wait for batches gRPC operations ('Op's) to finish running, as
-- well as wait for various other operations, such as server shutdown, pinging,
-- checking to see if we've been disconnected, and so forth.
data CompletionQueue = CompletionQueue {unsafeCQ :: C.CompletionQueue,
-- ^ All access to this field must be
-- guarded by a check of 'shuttingDown'.
currentPluckers :: TVar Int,
-- ^ Used to limit the number of
-- concurrent calls to pluck on this
-- queue.
-- The max value is set by gRPC in
-- 'C.maxCompletionQueuePluckers'
currentPushers :: TVar Int,
-- ^ Used to prevent new work from
-- being pushed onto the queue when
-- the queue begins to shut down.
shuttingDown :: TVar Bool,
-- ^ Used to prevent new pluck calls on
-- the queue when the queue begins to
-- shut down.
nextTag :: IORef Int
-- ^ Used to supply unique tags for work
-- items pushed onto the queue.
}
-- | Create a new 'C.Tag' for identifying work items on the 'CompletionQueue'.
-- This will eventually wrap around after reaching @maxBound :: Int@, but from a
-- practical perspective, that should be safe.
newTag :: CompletionQueue -> IO C.Tag
newTag CompletionQueue{..} = do
i <- atomicModifyIORef' nextTag (\i -> (i+1,i))
return $ C.Tag $ plusPtr nullPtr i
maxWorkPushers :: Int
maxWorkPushers = 100 --TODO: figure out what this should be.
data CQOpType = Push | Pluck deriving (Show, Eq, Enum)
getCount :: CQOpType -> CompletionQueue -> TVar Int
getCount Push = currentPushers
getCount Pluck = currentPluckers
getLimit :: CQOpType -> Int
getLimit Push = maxWorkPushers
getLimit Pluck = C.maxCompletionQueuePluckers
-- | Safely brackets an operation that pushes work onto or plucks results from
-- the given 'CompletionQueue'.
withPermission :: CQOpType
-> CompletionQueue
-> IO (Either GRPCIOError a)
-> IO (Either GRPCIOError a)
withPermission op cq f =
bracket acquire release doOp
where acquire = atomically $ do
isShuttingDown <- readTVar (shuttingDown cq)
if isShuttingDown
then return False
else do currCount <- readTVar $ getCount op cq
if currCount < getLimit op
then modifyTVar' (getCount op cq) (+1) >> return True
else retry
doOp gotResource = if gotResource
then f
else return $ Left GRPCIOShutdown
release gotResource =
if gotResource
then atomically $ modifyTVar' (getCount op cq) (subtract 1)
else return ()
withCompletionQueue :: GRPC -> (CompletionQueue -> IO a) -> IO a withCompletionQueue :: GRPC -> (CompletionQueue -> IO a) -> IO a
withCompletionQueue grpc = bracket (createCompletionQueue grpc) withCompletionQueue grpc = bracket (createCompletionQueue grpc)
@ -155,35 +56,6 @@ createCompletionQueue _ = do
nextTag <- newIORef minBound nextTag <- newIORef minBound
return $ CompletionQueue{..} return $ CompletionQueue{..}
type TimeoutSeconds = Int
-- | Translate 'C.Event' to an error. The caller is responsible for ensuring
-- that the event actually corresponds to an error condition; a successful event
-- will be translated to a 'GRPCIOUnknownError'.
eventToError :: C.Event -> (Either GRPCIOError a)
eventToError (C.Event C.QueueShutdown _ _) = Left GRPCIOShutdown
eventToError (C.Event C.QueueTimeout _ _) = Left GRPCIOTimeout
eventToError _ = Left GRPCIOUnknownError
-- | Returns true iff the given grpc_event was a success.
isEventSuccessful :: C.Event -> Bool
isEventSuccessful (C.Event C.OpComplete True _) = True
isEventSuccessful _ = False
-- | Waits for the given number of seconds for the given tag to appear on the
-- completion queue. Throws 'GRPCIOShutdown' if the completion queue is shutting
-- down and cannot handle new requests.
pluck :: CompletionQueue -> C.Tag -> TimeoutSeconds
-> IO (Either GRPCIOError ())
pluck cq@CompletionQueue{..} tag waitSeconds = do
grpcDebug $ "pluck: called with tag: " ++ show tag
++ " and wait: " ++ show waitSeconds
withPermission Pluck cq $ do
C.withDeadlineSeconds waitSeconds $ \deadline -> do
ev <- C.grpcCompletionQueuePluck unsafeCQ tag deadline C.reserved
grpcDebug $ "pluck: finished. Event: " ++ show ev
return $ if isEventSuccessful ev then Right () else eventToError ev
-- TODO: I'm thinking it might be easier to use 'Either' uniformly everywhere -- TODO: I'm thinking it might be easier to use 'Either' uniformly everywhere
-- even when it's isomorphic to 'Maybe'. If that doesn't turn out to be the -- even when it's isomorphic to 'Maybe'. If that doesn't turn out to be the
-- case, switch these to 'Maybe'. -- case, switch these to 'Maybe'.
@ -241,20 +113,6 @@ channelCreateRegisteredCall
handle deadline C.reserved handle deadline C.reserved
return $ Right $ ClientCall call return $ Right $ ClientCall call
channelCreateCall :: C.Channel
-> C.Call
-> C.PropagationMask
-> CompletionQueue
-> MethodName
-> Endpoint
-> C.CTimeSpecPtr
-> IO (Either GRPCIOError ClientCall)
channelCreateCall chan parent mask cq@CompletionQueue{..} meth endpt deadline =
withPermission Push cq $ do
call <- C.grpcChannelCreateCall chan parent mask unsafeCQ
(unMethodName meth) (unEndpoint endpt) deadline C.reserved
return $ Right $ ClientCall call
-- | Create the call object to handle a registered call. -- | Create the call object to handle a registered call.
serverRequestRegisteredCall :: C.Server -> CompletionQueue -> TimeoutSeconds serverRequestRegisteredCall :: C.Server -> CompletionQueue -> TimeoutSeconds
-> RegisteredMethod -> MetadataMap -> RegisteredMethod -> MetadataMap
@ -309,54 +167,6 @@ serverRequestRegisteredCall
C.metadataArrayDestroy metadataArrayPtr C.metadataArrayDestroy metadataArrayPtr
free bbPtr free bbPtr
serverRequestCall :: C.Server -> CompletionQueue -> TimeoutSeconds
-> IO (Either GRPCIOError U.ServerCall)
serverRequestCall server cq@CompletionQueue{..} timeLimit =
withPermission Push cq $ do
callPtr <- malloc
grpcDebug $ "serverRequestCall: callPtr is " ++ show callPtr
callDetails <- C.createCallDetails
metadataArrayPtr <- C.metadataArrayCreate
metadataArray <- peek metadataArrayPtr
tag <- newTag cq
callError <- C.grpcServerRequestCall server callPtr callDetails
metadataArray unsafeCQ unsafeCQ tag
grpcDebug $ "serverRequestCall: callError was " ++ show callError
if callError /= C.CallOk
then do grpcDebug "serverRequestCall: got call error; cleaning up."
failureCleanup callPtr callDetails metadataArrayPtr
return $ Left $ GRPCIOCallError callError
else do pluckResult <- pluck cq tag timeLimit
grpcDebug $ "serverRequestCall: pluckResult was "
++ show pluckResult
case pluckResult of
Left x -> do
grpcDebug "serverRequestCall: pluck error; cleaning up."
failureCleanup callPtr callDetails
metadataArrayPtr
return $ Left x
Right () -> do
rawCall <- peek callPtr
let call = U.ServerCall rawCall
metadataArrayPtr
Nothing
callDetails
return $ Right call
--TODO: the gRPC library appears to hold onto these pointers for a random
-- amount of time, even after returning from the only call that uses them.
-- This results in malloc errors if
-- gRPC tries to modify them after we free them. To work around it,
-- we sleep for a while before freeing the objects. We should find a
-- permanent solution that's more robust.
where failureCleanup callPtr callDetails metadataArrayPtr = forkIO $ do
threadDelaySecs 30
grpcDebug "serverRequestCall: doing delayed cleanup."
free callPtr
C.destroyCallDetails callDetails
C.metadataArrayDestroy metadataArrayPtr
return ()
-- | Register the server's completion queue. Must be done before the server is -- | Register the server's completion queue. Must be done before the server is
-- started. -- started.
serverRegisterCompletionQueue :: C.Server -> CompletionQueue -> IO () serverRegisterCompletionQueue :: C.Server -> CompletionQueue -> IO ()
@ -366,6 +176,3 @@ serverRegisterCompletionQueue server CompletionQueue{..} =
serverShutdownAndNotify :: C.Server -> CompletionQueue -> C.Tag -> IO () serverShutdownAndNotify :: C.Server -> CompletionQueue -> C.Tag -> IO ()
serverShutdownAndNotify server CompletionQueue{..} tag = serverShutdownAndNotify server CompletionQueue{..} tag =
C.grpcServerShutdownAndNotify server unsafeCQ tag C.grpcServerShutdownAndNotify server unsafeCQ tag
threadDelaySecs :: Int -> IO ()
threadDelaySecs = threadDelay . (* 10^(6::Int))

View file

@ -0,0 +1,134 @@
{-# LANGUAGE RecordWildCards #-}
module Network.GRPC.LowLevel.CompletionQueue.Internal where
import Control.Concurrent.STM (atomically, retry)
import Control.Concurrent.STM.TVar (TVar, modifyTVar', readTVar)
import Control.Exception (bracket)
import Data.IORef (IORef, atomicModifyIORef')
import Foreign.Ptr (nullPtr, plusPtr)
import Network.GRPC.LowLevel.GRPC
import qualified Network.GRPC.Unsafe as C
import qualified Network.GRPC.Unsafe.Constants as C
import qualified Network.GRPC.Unsafe.Time as C
-- NOTE: the concurrency requirements for a CompletionQueue are a little
-- complicated. There are two read operations: next and pluck. We can either
-- call next on a CQ or call pluck up to 'maxCompletionQueuePluckers' times
-- concurrently, but we can't mix next and pluck calls. Fortunately, we only
-- need to use next when we are shutting down the queue. Thus, we do two things
-- to shut down:
-- 1. Set the shuttingDown 'TVar' to 'True'. When this is set, no new pluck
-- calls will be allowed to start.
-- 2. Wait until no threads are plucking, as counted by 'currentPluckers'.
-- This logic can be seen in 'pluck' and 'shutdownCompletionQueue'.
-- NOTE: There is one more possible race condition: pushing work onto the queue
-- after we begin to shut down.
-- Solution: another counter, which must reach zero before the shutdown
-- can start.
-- TODO: 'currentPushers' currently imposes an arbitrary limit on the number of
-- concurrent pushers to the CQ, but I don't know what the limit should be set
-- to. I haven't found any documentation that suggests there is a limit imposed
-- by the gRPC library, but there might be. Need to investigate further.
-- | Wraps the state necessary to use a gRPC completion queue. Completion queues
-- are used to wait for batches gRPC operations ('Op's) to finish running, as
-- well as wait for various other operations, such as server shutdown, pinging,
-- checking to see if we've been disconnected, and so forth.
data CompletionQueue = CompletionQueue {unsafeCQ :: C.CompletionQueue,
-- ^ All access to this field must be
-- guarded by a check of 'shuttingDown'.
currentPluckers :: TVar Int,
-- ^ Used to limit the number of
-- concurrent calls to pluck on this
-- queue.
-- The max value is set by gRPC in
-- 'C.maxCompletionQueuePluckers'
currentPushers :: TVar Int,
-- ^ Used to prevent new work from
-- being pushed onto the queue when
-- the queue begins to shut down.
shuttingDown :: TVar Bool,
-- ^ Used to prevent new pluck calls on
-- the queue when the queue begins to
-- shut down.
nextTag :: IORef Int
-- ^ Used to supply unique tags for work
-- items pushed onto the queue.
}
type TimeoutSeconds = Int
data CQOpType = Push | Pluck deriving (Show, Eq, Enum)
-- | Create a new 'C.Tag' for identifying work items on the 'CompletionQueue'.
-- This will eventually wrap around after reaching @maxBound :: Int@, but from a
-- practical perspective, that should be safe.
newTag :: CompletionQueue -> IO C.Tag
newTag CompletionQueue{..} = do
i <- atomicModifyIORef' nextTag (\i -> (i+1,i))
return $ C.Tag $ plusPtr nullPtr i
-- | Safely brackets an operation that pushes work onto or plucks results from
-- the given 'CompletionQueue'.
withPermission :: CQOpType
-> CompletionQueue
-> IO (Either GRPCIOError a)
-> IO (Either GRPCIOError a)
withPermission op cq f =
bracket acquire release doOp
where acquire = atomically $ do
isShuttingDown <- readTVar (shuttingDown cq)
if isShuttingDown
then return False
else do currCount <- readTVar $ getCount op cq
if currCount < getLimit op
then modifyTVar' (getCount op cq) (+1) >> return True
else retry
doOp gotResource = if gotResource
then f
else return $ Left GRPCIOShutdown
release gotResource =
if gotResource
then atomically $ modifyTVar' (getCount op cq) (subtract 1)
else return ()
-- | Waits for the given number of seconds for the given tag to appear on the
-- completion queue. Throws 'GRPCIOShutdown' if the completion queue is shutting
-- down and cannot handle new requests.
pluck :: CompletionQueue -> C.Tag -> TimeoutSeconds
-> IO (Either GRPCIOError ())
pluck cq@CompletionQueue{..} tag waitSeconds = do
grpcDebug $ "pluck: called with tag: " ++ show tag
++ " and wait: " ++ show waitSeconds
withPermission Pluck cq $ do
C.withDeadlineSeconds waitSeconds $ \deadline -> do
ev <- C.grpcCompletionQueuePluck unsafeCQ tag deadline C.reserved
grpcDebug $ "pluck: finished. Event: " ++ show ev
return $ if isEventSuccessful ev then Right () else eventToError ev
-- | Translate 'C.Event' to an error. The caller is responsible for ensuring
-- that the event actually corresponds to an error condition; a successful event
-- will be translated to a 'GRPCIOUnknownError'.
eventToError :: C.Event -> (Either GRPCIOError a)
eventToError (C.Event C.QueueShutdown _ _) = Left GRPCIOShutdown
eventToError (C.Event C.QueueTimeout _ _) = Left GRPCIOTimeout
eventToError _ = Left GRPCIOUnknownError
-- | Returns true iff the given grpc_event was a success.
isEventSuccessful :: C.Event -> Bool
isEventSuccessful (C.Event C.OpComplete True _) = True
isEventSuccessful _ = False
maxWorkPushers :: Int
maxWorkPushers = 100 --TODO: figure out what this should be.
getCount :: CQOpType -> CompletionQueue -> TVar Int
getCount Push = currentPushers
getCount Pluck = currentPluckers
getLimit :: CQOpType -> Int
getLimit Push = maxWorkPushers
getLimit Pluck = C.maxCompletionQueuePluckers

View file

@ -0,0 +1,78 @@
{-# LANGUAGE RecordWildCards #-}
module Network.GRPC.LowLevel.CompletionQueue.Unregistered where
import Control.Concurrent (forkIO)
import Foreign.Marshal.Alloc (free, malloc)
import Foreign.Storable (peek)
import Network.GRPC.LowLevel.Call
import qualified Network.GRPC.LowLevel.Call.Unregistered as U
import Network.GRPC.LowLevel.CompletionQueue.Internal
import Network.GRPC.LowLevel.GRPC
import qualified Network.GRPC.Unsafe as C
import qualified Network.GRPC.Unsafe.Constants as C
import qualified Network.GRPC.Unsafe.Metadata as C
import qualified Network.GRPC.Unsafe.Time as C
channelCreateCall :: C.Channel
-> C.Call
-> C.PropagationMask
-> CompletionQueue
-> MethodName
-> Endpoint
-> C.CTimeSpecPtr
-> IO (Either GRPCIOError ClientCall)
channelCreateCall chan parent mask cq@CompletionQueue{..} meth endpt deadline =
withPermission Push cq $ do
call <- C.grpcChannelCreateCall chan parent mask unsafeCQ
(unMethodName meth) (unEndpoint endpt) deadline C.reserved
return $ Right $ ClientCall call
serverRequestCall :: C.Server -> CompletionQueue -> TimeoutSeconds
-> IO (Either GRPCIOError U.ServerCall)
serverRequestCall server cq@CompletionQueue{..} timeLimit =
withPermission Push cq $ do
callPtr <- malloc
grpcDebug $ "serverRequestCall: callPtr is " ++ show callPtr
callDetails <- C.createCallDetails
metadataArrayPtr <- C.metadataArrayCreate
metadataArray <- peek metadataArrayPtr
tag <- newTag cq
callError <- C.grpcServerRequestCall server callPtr callDetails
metadataArray unsafeCQ unsafeCQ tag
grpcDebug $ "serverRequestCall: callError was " ++ show callError
if callError /= C.CallOk
then do grpcDebug "serverRequestCall: got call error; cleaning up."
failureCleanup callPtr callDetails metadataArrayPtr
return $ Left $ GRPCIOCallError callError
else do pluckResult <- pluck cq tag timeLimit
grpcDebug $ "serverRequestCall: pluckResult was "
++ show pluckResult
case pluckResult of
Left x -> do
grpcDebug "serverRequestCall: pluck error; cleaning up."
failureCleanup callPtr callDetails
metadataArrayPtr
return $ Left x
Right () -> do
rawCall <- peek callPtr
let call = U.ServerCall rawCall
metadataArrayPtr
Nothing
callDetails
return $ Right call
--TODO: the gRPC library appears to hold onto these pointers for a random
-- amount of time, even after returning from the only call that uses them.
-- This results in malloc errors if
-- gRPC tries to modify them after we free them. To work around it,
-- we sleep for a while before freeing the objects. We should find a
-- permanent solution that's more robust.
where failureCleanup callPtr callDetails metadataArrayPtr = forkIO $ do
threadDelaySecs 30
grpcDebug "serverRequestCall: doing delayed cleanup."
free callPtr
C.destroyCallDetails callDetails
C.metadataArrayDestroy metadataArrayPtr
return ()

View file

@ -3,12 +3,7 @@
{-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE MultiParamTypeClasses #-}
module Network.GRPC.LowLevel.GRPC where module Network.GRPC.LowLevel.GRPC where
{- import Control.Concurrent (threadDelay)
-- TODO: remove if not needed
import Control.Monad.IO.Class (liftIO, MonadIO)
import Control.Monad.Except (ExceptT(..), runExceptT, throwError,
MonadError)
-}
import Control.Exception import Control.Exception
import qualified Data.ByteString as B import qualified Data.ByteString as B
import qualified Data.Map as M import qualified Data.Map as M
@ -63,6 +58,9 @@ grpcDebug str = do tid <- myThreadId
grpcDebug _ = return () grpcDebug _ = return ()
#endif #endif
threadDelaySecs :: Int -> IO ()
threadDelaySecs = threadDelay . (* 10^(6::Int))
{- {-
-- TODO: remove this once finally decided on whether to use it. -- TODO: remove this once finally decided on whether to use it.
-- | Monad for running gRPC operations. -- | Monad for running gRPC operations.

View file

@ -7,7 +7,6 @@ import Control.Exception
import qualified Data.ByteString as B import qualified Data.ByteString as B
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes) import Data.Maybe (catMaybes)
import Data.String (IsString)
import Foreign.C.String (CString) import Foreign.C.String (CString)
import Foreign.C.Types (CInt) import Foreign.C.Types (CInt)
import Foreign.Marshal.Alloc (free, malloc, import Foreign.Marshal.Alloc (free, malloc,
@ -20,7 +19,6 @@ import qualified Network.GRPC.Unsafe.Metadata as C
import qualified Network.GRPC.Unsafe.Op as C import qualified Network.GRPC.Unsafe.Op as C
import Network.GRPC.LowLevel.Call import Network.GRPC.LowLevel.Call
import Network.GRPC.LowLevel.Call.Unregistered as U
import Network.GRPC.LowLevel.CompletionQueue import Network.GRPC.LowLevel.CompletionQueue
import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.GRPC
@ -224,7 +222,7 @@ runOps call cq ops timeLimit =
-- | For a given call, run the given 'Op's on the given completion queue with -- | For a given call, run the given 'Op's on the given completion queue with
-- the given tag. Blocks until the ops are complete or the given number of -- the given tag. Blocks until the ops are complete or the given number of
-- seconds have elapsed. -- seconds have elapsed.
-- TODO: now that 'ServerRegCall' and 'ServerUnregCall' are separate types, we -- TODO: now that 'ServerRegCall' and 'U.ServerCall' are separate types, we
-- could try to limit the input 'Op's more appropriately. E.g., we don't use -- could try to limit the input 'Op's more appropriately. E.g., we don't use
-- an 'OpRecvInitialMetadata' when receiving a registered call, because gRPC -- an 'OpRecvInitialMetadata' when receiving a registered call, because gRPC
-- handles that for us. -- handles that for us.
@ -243,13 +241,6 @@ runServerRegOps :: ServerRegCall
-> IO (Either GRPCIOError [OpRecvResult]) -> IO (Either GRPCIOError [OpRecvResult])
runServerRegOps = runOps . internalServerRegCall runServerRegOps = runOps . internalServerRegCall
runServerUnregOps :: U.ServerCall
-> CompletionQueue
-> [Op]
-> TimeoutSeconds
-> IO (Either GRPCIOError [OpRecvResult])
runServerUnregOps = runOps . U.internalServerCall
-- | Like 'runServerOps', but for client-side calls. -- | Like 'runServerOps', but for client-side calls.
runClientOps :: ClientCall runClientOps :: ClientCall
-> CompletionQueue -> CompletionQueue
@ -263,6 +254,6 @@ runClientOps = runOps . internalClientCall
extractStatusInfo :: [OpRecvResult] extractStatusInfo :: [OpRecvResult]
-> Maybe (MetadataMap, C.StatusCode, B.ByteString) -> Maybe (MetadataMap, C.StatusCode, B.ByteString)
extractStatusInfo [] = Nothing extractStatusInfo [] = Nothing
extractStatusInfo (res@(OpRecvStatusOnClientResult meta code details):_) = extractStatusInfo (OpRecvStatusOnClientResult meta code details:_) =
Just (meta, code, details) Just (meta, code, details)
extractStatusInfo (_:xs) = extractStatusInfo xs extractStatusInfo (_:xs) = extractStatusInfo xs

View file

@ -0,0 +1,13 @@
module Network.GRPC.LowLevel.Op.Unregistered where
import Network.GRPC.LowLevel.GRPC
import Network.GRPC.LowLevel.Op
import Network.GRPC.LowLevel.CompletionQueue
import Network.GRPC.LowLevel.Call.Unregistered as U
runServerOps :: U.ServerCall
-> CompletionQueue
-> [Op]
-> TimeoutSeconds
-> IO (Either GRPCIOError [OpRecvResult])
runServerOps = runOps . U.internalServerCall

View file

@ -15,7 +15,6 @@ import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue,
createCompletionQueue, createCompletionQueue,
pluck, pluck,
serverRegisterCompletionQueue, serverRegisterCompletionQueue,
serverRequestCall,
serverRequestRegisteredCall, serverRequestRegisteredCall,
serverShutdownAndNotify, serverShutdownAndNotify,
shutdownCompletionQueue) shutdownCompletionQueue)

View file

@ -2,21 +2,22 @@
module Network.GRPC.LowLevel.Server.Unregistered where module Network.GRPC.LowLevel.Server.Unregistered where
import Control.Exception (finally) import Control.Exception (finally)
import Data.ByteString (ByteString) import Data.ByteString (ByteString)
import Network.GRPC.LowLevel.Call (MethodName) import Network.GRPC.LowLevel.Call (MethodName)
import Network.GRPC.LowLevel.Call.Unregistered import Network.GRPC.LowLevel.Call.Unregistered
import Network.GRPC.LowLevel.CompletionQueue (TimeoutSeconds, import Network.GRPC.LowLevel.CompletionQueue (TimeoutSeconds)
serverRequestCall) import qualified Network.GRPC.LowLevel.CompletionQueue.Unregistered as U
import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.GRPC
import Network.GRPC.LowLevel.Op import Network.GRPC.LowLevel.Op (OpRecvResult (..))
import qualified Network.GRPC.LowLevel.Op.Unregistered as U
import Network.GRPC.LowLevel.Server import Network.GRPC.LowLevel.Server
import qualified Network.GRPC.Unsafe.Op as C import qualified Network.GRPC.Unsafe.Op as C
serverCreateCall :: Server -> TimeoutSeconds serverCreateCall :: Server -> TimeoutSeconds
-> IO (Either GRPCIOError ServerCall) -> IO (Either GRPCIOError ServerCall)
serverCreateCall Server{..} timeLimit = serverCreateCall Server{..} timeLimit =
serverRequestCall internalServer serverCQ timeLimit U.serverRequestCall internalServer serverCQ timeLimit
withServerCall :: Server -> TimeoutSeconds withServerCall :: Server -> TimeoutSeconds
-> (ServerCall -> IO (Either GRPCIOError a)) -> (ServerCall -> IO (Either GRPCIOError a))
@ -45,7 +46,7 @@ serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do
withServerCall s timeLimit $ \call -> do withServerCall s timeLimit $ \call -> do
grpcDebug "serverHandleCall(U): starting batch." grpcDebug "serverHandleCall(U): starting batch."
let recvOps = serverOpsGetNormalCall srvMetadata let recvOps = serverOpsGetNormalCall srvMetadata
opResults <- runServerUnregOps call serverCQ recvOps timeLimit opResults <- U.runServerOps call serverCQ recvOps timeLimit
case opResults of case opResults of
Left x -> do grpcDebug "serverHandleNormalCall(U): ops failed; aborting" Left x -> do grpcDebug "serverHandleNormalCall(U): ops failed; aborting"
return $ Left x return $ Left x
@ -59,7 +60,7 @@ serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do
let status = C.GrpcStatusOk let status = C.GrpcStatusOk
let respOps = serverOpsSendNormalResponse let respOps = serverOpsSendNormalResponse
respBody respMetadata status details respBody respMetadata status details
respOpsResults <- runServerUnregOps call serverCQ respOps timeLimit respOpsResults <- U.runServerOps call serverCQ respOps timeLimit
case respOpsResults of case respOpsResults of
Left x -> do grpcDebug "serverHandleNormalCall(U): resp failed." Left x -> do grpcDebug "serverHandleNormalCall(U): resp failed."
return $ Left x return $ Left x