
176 lines
8.5 KiB
Raw Normal View History

{-# LANGUAGE RecordWildCards #-}
module Network.GRPC.LowLevel.CompletionQueue.Internal where
import Control.Concurrent.STM (atomically, retry, check)
import Control.Concurrent.STM.TVar (TVar, modifyTVar', readTVar,
import Control.Exception (bracket)
import Control.Monad
import Data.IORef (IORef, atomicModifyIORef')
import Foreign.Ptr (nullPtr, plusPtr)
import Network.GRPC.LowLevel.GRPC
import qualified Network.GRPC.Unsafe as C
import qualified Network.GRPC.Unsafe.Constants as C
import qualified Network.GRPC.Unsafe.Time as C
import System.Timeout (timeout)
-- NOTE: the concurrency requirements for a CompletionQueue are a little
-- complicated. There are two read operations: next and pluck. We can either
-- call next on a CQ or call pluck up to 'maxCompletionQueuePluckers' times
-- concurrently, but we can't mix next and pluck calls. Fortunately, we only
-- need to use next when we are shutting down the queue. Thus, we do two things
-- to shut down:
-- 1. Set the shuttingDown 'TVar' to 'True'. When this is set, no new pluck
-- calls will be allowed to start.
-- 2. Wait until no threads are plucking, as counted by 'currentPluckers'.
-- This logic can be seen in 'pluck' and 'shutdownCompletionQueue'.
-- NOTE: There is one more possible race condition: pushing work onto the queue
-- after we begin to shut down.
-- Solution: another counter, which must reach zero before the shutdown
-- can start.
-- TODO: 'currentPushers' currently imposes an arbitrary limit on the number of
-- concurrent pushers to the CQ, but I don't know what the limit should be set
-- to. I haven't found any documentation that suggests there is a limit imposed
-- by the gRPC library, but there might be. Need to investigate further.
-- | Wraps the state necessary to use a gRPC completion queue. Completion queues
-- are used to wait for batches gRPC operations ('Op's) to finish running, as
-- well as wait for various other operations, such as server shutdown, pinging,
-- checking to see if we've been disconnected, and so forth.
data CompletionQueue = CompletionQueue {unsafeCQ :: C.CompletionQueue,
-- ^ All access to this field must be
-- guarded by a check of 'shuttingDown'.
currentPluckers :: TVar Int,
-- ^ Used to limit the number of
-- concurrent calls to pluck on this
-- queue.
-- The max value is set by gRPC in
-- 'C.maxCompletionQueuePluckers'
currentPushers :: TVar Int,
-- ^ Used to prevent new work from
-- being pushed onto the queue when
-- the queue begins to shut down.
shuttingDown :: TVar Bool,
-- ^ Used to prevent new pluck calls on
-- the queue when the queue begins to
-- shut down.
nextTag :: IORef Int
-- ^ Used to supply unique tags for work
-- items pushed onto the queue.
Joel/highlevelv2 (#40) * Tweak runOps param order, inline common op sequences, clean up serverHandleNormalCall * More ops sequence inlining for clarity, experimenting with Managed * Checkpoint: preliminary support for all streaming modes; much cleanup/refactoring and api design still needed * Use mempty for default StatusDetails; tweak bad status matching mechanism * Preliminary user-facing, server-streaming, low-level api and test * renaming wibbles * Preliminary user-facing, client-streaming, low-level api and test * Move sendMsgs comb to Network.GRPC.LowLevel.Op; misc cleanup/DCR * Modify bidi streaming to omit request payload * Add transformers dep * Preliminary user-facing low-level bidirectional streaming api and test * Fix missing peek import * Remove TimeoutSeconds params on streaming mode functions * Fix serverHandleNormalCall rebase wart * Fix rebase warts; minor hlint fixes and wibbles * Post-rebase tweaks to optional payload use in serverRequestCall (i.e., now respects payloadHandling again) * Cleanup/refactor serverRequestCall * Fix comment * Change ServerRWHandler type so that handler does not have to invoke a finalizer * Change ServerReaderHandler type so that handler does not have to invoke a finalizer * Simplify serverWriter interface and ServerWriterHandler structure * Simplify serverRW (get rid of exec param), improve bidi streaming tests * Use ExceptT in serverRW impl * Change ServerRWHandler type to pass recv/send operations. * Renaming * Define ClientRWHandler, pass recv/send ops * wibbles * Use ExceptT in clientRW impl * Add DataKinded phantom typing to RegisteredMethod; misc cleanup * Simplify sendMsgs interface; add SingleSend type and related helpers * Rename SingleSend to SendSingle, use ExceptT to clean up {client,server}Writer and sendMsgs * More ExceptT cleanup in clientWriter * Factor out reusable bits of clientWriter * Shrink ServerReaderHandler * Delete stale comments * begin high-level server interface * update to datakind representation * clean up * move method type info to type level, parametrize ServerCall by payload * convert for writer handler * start switching over to Message-based handlers * begin work on highlevel example * comment out old code * parametrize StreamSend * parametrize StreamRecv * conversion for ServerReaderHandler * finish handler conversions * Add high level version and payload checking to echo-client * Decouple server CQs from call-bound CQs (registered methods); use more consistent naming conventions * Decouple server/call-bound CQs for unregistered methods; refactor U.serverRequestCall; misc cleanup * Make convertRecv total; formatting wibbles
2016-07-12 20:28:21 +02:00
instance Show CompletionQueue where show = show . unsafeCQ
type TimeoutSeconds = Int
data CQOpType = Push | Pluck deriving (Show, Eq, Enum)
-- | Create a new 'C.Tag' for identifying work items on the 'CompletionQueue'.
-- This will eventually wrap around after reaching @maxBound :: Int@, but from a
-- practical perspective, that should be safe.
newTag :: CompletionQueue -> IO C.Tag
newTag CompletionQueue{..} = do
i <- atomicModifyIORef' nextTag (\i -> (i+1,i))
return $ C.Tag $ plusPtr nullPtr i
-- | Safely brackets an operation that pushes work onto or plucks results from
-- the given 'CompletionQueue'.
withPermission :: CQOpType
-> CompletionQueue
-> IO (Either GRPCIOError a)
-> IO (Either GRPCIOError a)
2016-07-11 00:28:10 +02:00
withPermission op cq act = bracket acquire release $ \gotResource ->
if gotResource then act else return (Left GRPCIOShutdown)
acquire = atomically $ do
isShuttingDown <- readTVar (shuttingDown cq)
2016-07-11 00:28:10 +02:00
unless isShuttingDown $ do
currCount <- readTVar (getCount op cq)
if currCount < getLimit op
then writeTVar (getCount op cq) (currCount + 1)
else retry
return (not isShuttingDown)
release gotResource = when gotResource $
atomically $ modifyTVar' (getCount op cq) (subtract 1)
-- | Waits for the given number of seconds for the given tag to appear on the
-- completion queue. Throws 'GRPCIOShutdown' if the completion queue is shutting
-- down and cannot handle new requests. Note that the timeout is optional. When
-- doing client ops, provide @Nothing@ and the pluck will automatically fail if
-- the deadline associated with the 'ClientCall' expires. If plucking
-- 'serverRequestCall', this will block forever unless a timeout is given.
pluck :: CompletionQueue -> C.Tag -> Maybe TimeoutSeconds
-> IO (Either GRPCIOError ())
pluck cq@CompletionQueue{..} tag mwait = do
grpcDebug $ "pluck: called with tag=" ++ show tag ++ ",mwait=" ++ show mwait
withPermission Pluck cq $ pluck' cq tag mwait
-- Variant of pluck' which assumes pluck permission has been granted.
pluck' :: CompletionQueue
-> C.Tag
-> Maybe TimeoutSeconds
-> IO (Either GRPCIOError ())
pluck' CompletionQueue{..} tag mwait =
maybe C.withInfiniteDeadline C.withDeadlineSeconds mwait $ \dead -> do
grpcDebug $ "pluck: blocking on grpc_completion_queue_pluck for tag=" ++ show tag
ev <- C.grpcCompletionQueuePluck unsafeCQ tag dead C.reserved
grpcDebug $ "pluck finished: " ++ show ev
return $ if isEventSuccessful ev then Right () else eventToError ev
-- | Translate 'C.Event' to an error. The caller is responsible for ensuring
-- that the event actually corresponds to an error condition; a successful event
-- will be translated to a 'GRPCIOUnknownError'.
eventToError :: C.Event -> (Either GRPCIOError a)
eventToError (C.Event C.QueueShutdown _ _) = Left GRPCIOShutdown
eventToError (C.Event C.QueueTimeout _ _) = Left GRPCIOTimeout
eventToError (C.Event C.OpComplete False _) = Left GRPCIOTimeout
eventToError _ = Left GRPCIOUnknownError
-- | Returns true iff the given grpc_event was a success.
isEventSuccessful :: C.Event -> Bool
isEventSuccessful (C.Event C.OpComplete True _) = True
isEventSuccessful _ = False
maxWorkPushers :: Int
maxWorkPushers = 100 --TODO: figure out what this should be.
getCount :: CQOpType -> CompletionQueue -> TVar Int
getCount Push = currentPushers
getCount Pluck = currentPluckers
getLimit :: CQOpType -> Int
getLimit Push = maxWorkPushers
getLimit Pluck = C.maxCompletionQueuePluckers
-- | Shuts down the completion queue. See the comment above 'CompletionQueue'
-- for the strategy we use to ensure that no one tries to use the
-- queue after we begin the shutdown process. Errors with
-- 'GRPCIOShutdownFailure' if the queue can't be shut down within 5 seconds.
shutdownCompletionQueue :: CompletionQueue -> IO (Either GRPCIOError ())
shutdownCompletionQueue CompletionQueue{..} = do
atomically $ writeTVar shuttingDown True
atomically $ do
readTVar currentPushers >>= check . (==0)
readTVar currentPluckers >>= check . (==0)
--drain the queue
C.grpcCompletionQueueShutdown unsafeCQ
loopRes <- timeout (5*10^(6::Int)) drainLoop
grpcDebug $ "Got CQ loop shutdown result of: " ++ show loopRes
case loopRes of
Nothing -> return $ Left GRPCIOShutdownFailure
Just () -> C.grpcCompletionQueueDestroy unsafeCQ >> return (Right ())
where drainLoop :: IO ()
drainLoop = do
grpcDebug "drainLoop: before next() call"
ev <- C.withDeadlineSeconds 1 $ \deadline ->
C.grpcCompletionQueueNext unsafeCQ deadline C.reserved
grpcDebug $ "drainLoop: next() call got " ++ show ev
case C.eventCompletionType ev of
C.QueueShutdown -> return ()
C.QueueTimeout -> drainLoop
C.OpComplete -> drainLoop