{-# LANGUAGE RecordWildCards #-}
module Network.GRPC.LowLevel.CompletionQueue.Internal where
import Control.Concurrent.STM (atomically, retry)
import Control.Concurrent.STM.TVar (TVar, modifyTVar', readTVar)
import Control.Exception (bracket)
import Data.IORef (IORef, atomicModifyIORef')
import Foreign.Ptr (nullPtr, plusPtr)
import Network.GRPC.LowLevel.GRPC
import qualified Network.GRPC.Unsafe as C
import qualified Network.GRPC.Unsafe.Constants as C
import qualified Network.GRPC.Unsafe.Time as C
-- NOTE: the concurrency requirements for a CompletionQueue are a little
-- complicated. There are two read operations: next and pluck. We can either
-- call next on a CQ or call pluck up to 'maxCompletionQueuePluckers' times
-- concurrently, but we can't mix next and pluck calls. Fortunately, we only
-- need to use next when we are shutting down the queue. Thus, we do two things
-- to shut down:
-- 1. Set the shuttingDown 'TVar' to 'True'. When this is set, no new pluck
-- calls will be allowed to start.
-- 2. Wait until no threads are plucking, as counted by 'currentPluckers'.
-- This logic can be seen in 'pluck' and 'shutdownCompletionQueue'.
-- NOTE: There is one more possible race condition: pushing work onto the queue
-- after we begin to shut down.
-- Solution: another counter, which must reach zero before the shutdown
-- can start.
-- TODO: 'currentPushers' currently imposes an arbitrary limit on the number of
-- concurrent pushers to the CQ, but I don't know what the limit should be set
-- to. I haven't found any documentation that suggests there is a limit imposed
-- by the gRPC library, but there might be. Need to investigate further.
-- | Wraps the state necessary to use a gRPC completion queue. Completion queues
-- are used to wait for batches gRPC operations ('Op's) to finish running, as
-- well as wait for various other operations, such as server shutdown, pinging,
-- checking to see if we've been disconnected, and so forth.
data CompletionQueue = CompletionQueue {unsafeCQ :: C.CompletionQueue,
-- ^ All access to this field must be
-- guarded by a check of 'shuttingDown'.
currentPluckers :: TVar Int,
-- ^ Used to limit the number of
-- concurrent calls to pluck on this
-- queue.
-- The max value is set by gRPC in
-- 'C.maxCompletionQueuePluckers'
currentPushers :: TVar Int,
-- ^ Used to prevent new work from
-- being pushed onto the queue when
-- the queue begins to shut down.
shuttingDown :: TVar Bool,
-- ^ Used to prevent new pluck calls on
-- the queue when the queue begins to
-- shut down.
nextTag :: IORef Int
-- ^ Used to supply unique tags for work
-- items pushed onto the queue.
type TimeoutSeconds = Int
data CQOpType = Push | Pluck deriving (Show, Eq, Enum)
-- | Create a new 'C.Tag' for identifying work items on the 'CompletionQueue'.
-- This will eventually wrap around after reaching @maxBound :: Int@, but from a
-- practical perspective, that should be safe.
newTag :: CompletionQueue -> IO C.Tag
newTag CompletionQueue{..} = do
i <- atomicModifyIORef' nextTag (\i -> (i+1,i))
return $ C.Tag $ plusPtr nullPtr i
-- | Safely brackets an operation that pushes work onto or plucks results from
-- the given 'CompletionQueue'.
withPermission :: CQOpType
-> CompletionQueue
-> IO (Either GRPCIOError a)
-> IO (Either GRPCIOError a)
withPermission op cq f =
bracket acquire release doOp
where acquire = atomically $ do
isShuttingDown <- readTVar (shuttingDown cq)
if isShuttingDown
then return False
else do currCount <- readTVar $ getCount op cq
if currCount < getLimit op
then modifyTVar' (getCount op cq) (+1) >> return True
else retry
doOp gotResource = if gotResource
then f
else return $ Left GRPCIOShutdown
release gotResource =
if gotResource
then atomically $ modifyTVar' (getCount op cq) (subtract 1)
else return ()
-- | Waits for the given number of seconds for the given tag to appear on the
-- completion queue. Throws 'GRPCIOShutdown' if the completion queue is shutting
-- down and cannot handle new requests.
pluck :: CompletionQueue -> C.Tag -> TimeoutSeconds
-> IO (Either GRPCIOError ())
pluck cq@CompletionQueue{..} tag waitSeconds = do
grpcDebug $ "pluck: called with tag: " ++ show tag
++ " and wait: " ++ show waitSeconds
withPermission Pluck cq $ do
C.withDeadlineSeconds waitSeconds $ \deadline -> do
ev <- C.grpcCompletionQueuePluck unsafeCQ tag deadline C.reserved
grpcDebug $ "pluck: finished. Event: " ++ show ev
return $ if isEventSuccessful ev then Right () else eventToError ev
-- | Translate 'C.Event' to an error. The caller is responsible for ensuring
-- that the event actually corresponds to an error condition; a successful event
-- will be translated to a 'GRPCIOUnknownError'.
eventToError :: C.Event -> (Either GRPCIOError a)
eventToError (C.Event C.QueueShutdown _ _) = Left GRPCIOShutdown
eventToError (C.Event C.QueueTimeout _ _) = Left GRPCIOTimeout
eventToError _ = Left GRPCIOUnknownError
-- | Returns true iff the given grpc_event was a success.
isEventSuccessful :: C.Event -> Bool
isEventSuccessful (C.Event C.OpComplete True _) = True
isEventSuccessful _ = False
maxWorkPushers :: Int
maxWorkPushers = 100 --TODO: figure out what this should be.
getCount :: CQOpType -> CompletionQueue -> TVar Int
getCount Push = currentPushers
getCount Pluck = currentPluckers
getLimit :: CQOpType -> Int
getLimit Push = maxWorkPushers
getLimit Pluck = C.maxCompletionQueuePluckers