diff --git a/grpc-haskell.cabal b/grpc-haskell.cabal index eb76638..6375ae2 100644 --- a/grpc-haskell.cabal +++ b/grpc-haskell.cabal @@ -45,8 +45,11 @@ library Network.GRPC.LowLevel.Server.Unregistered other-modules: Network.GRPC.LowLevel.CompletionQueue + Network.GRPC.LowLevel.CompletionQueue.Internal + Network.GRPC.LowLevel.CompletionQueue.Unregistered Network.GRPC.LowLevel.GRPC Network.GRPC.LowLevel.Op + Network.GRPC.LowLevel.Op.Unregistered Network.GRPC.LowLevel.Server Network.GRPC.LowLevel.Call Network.GRPC.LowLevel.Call.Unregistered diff --git a/src/Network/GRPC/LowLevel.hs b/src/Network/GRPC/LowLevel.hs index af91fbb..7bab209 100644 --- a/src/Network/GRPC/LowLevel.hs +++ b/src/Network/GRPC/LowLevel.hs @@ -50,7 +50,6 @@ GRPC -- * Ops , runClientOps , runServerRegOps -, runServerUnregOps , Op(..) , OpRecvResult(..) @@ -63,7 +62,6 @@ import Network.GRPC.LowLevel.Op import Network.GRPC.LowLevel.Client import Network.GRPC.LowLevel.Client.Unregistered import Network.GRPC.LowLevel.Call -import Network.GRPC.LowLevel.Call.Unregistered import Network.GRPC.Unsafe (ConnectivityState(..)) import Network.GRPC.Unsafe.Op (StatusCode(..)) diff --git a/src/Network/GRPC/LowLevel/Client/Unregistered.hs b/src/Network/GRPC/LowLevel/Client/Unregistered.hs index 9577da0..7f5fa33 100644 --- a/src/Network/GRPC/LowLevel/Client/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/Client/Unregistered.hs @@ -2,17 +2,18 @@ module Network.GRPC.LowLevel.Client.Unregistered where -import Control.Exception (finally) -import Control.Monad (join) -import Data.ByteString (ByteString) -import Foreign.Ptr (nullPtr) -import qualified Network.GRPC.Unsafe as C -import qualified Network.GRPC.Unsafe.Constants as C -import qualified Network.GRPC.Unsafe.Time as C +import Control.Exception (finally) +import Control.Monad (join) +import Data.ByteString (ByteString) +import Foreign.Ptr (nullPtr) +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Constants as C +import qualified Network.GRPC.Unsafe.Time as C import Network.GRPC.LowLevel.Call import Network.GRPC.LowLevel.Client -import Network.GRPC.LowLevel.CompletionQueue +import Network.GRPC.LowLevel.CompletionQueue (TimeoutSeconds) +import qualified Network.GRPC.LowLevel.CompletionQueue.Unregistered as U import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.Op @@ -26,8 +27,8 @@ clientCreateCall :: Client clientCreateCall Client{..} meth timeout = do let parentCall = C.Call nullPtr C.withDeadlineSeconds timeout $ \deadline -> do - channelCreateCall clientChannel parentCall C.propagateDefaults - clientCQ meth (clientEndpoint clientConfig) deadline + U.channelCreateCall clientChannel parentCall C.propagateDefaults + clientCQ meth (clientEndpoint clientConfig) deadline withClientCall :: Client -> MethodName diff --git a/src/Network/GRPC/LowLevel/CompletionQueue.hs b/src/Network/GRPC/LowLevel/CompletionQueue.hs index 44ebddb..42aaaff 100644 --- a/src/Network/GRPC/LowLevel/CompletionQueue.hs +++ b/src/Network/GRPC/LowLevel/CompletionQueue.hs @@ -14,30 +14,23 @@ module Network.GRPC.LowLevel.CompletionQueue , pluck , startBatch , channelCreateRegisteredCall - , channelCreateCall , TimeoutSeconds , isEventSuccessful , serverRegisterCompletionQueue , serverShutdownAndNotify , serverRequestRegisteredCall - , serverRequestCall , newTag ) where -import Control.Concurrent (forkIO, threadDelay) -import Control.Concurrent.STM (atomically, check, - retry) -import Control.Concurrent.STM.TVar (TVar, modifyTVar', - newTVarIO, readTVar, +import Control.Concurrent (forkIO) +import Control.Concurrent.STM (atomically, check) +import Control.Concurrent.STM.TVar (newTVarIO, readTVar, writeTVar) import Control.Exception (bracket) -import Data.IORef (IORef, - atomicModifyIORef', - newIORef) +import Data.IORef (newIORef) import Data.List (intersperse) import Foreign.Marshal.Alloc (free, malloc) -import Foreign.Ptr (nullPtr, plusPtr) import Foreign.Storable (peek) import qualified Network.GRPC.Unsafe as C import qualified Network.GRPC.Unsafe.Constants as C @@ -47,100 +40,8 @@ import qualified Network.GRPC.Unsafe.Time as C import System.Timeout (timeout) import Network.GRPC.LowLevel.Call -import qualified Network.GRPC.LowLevel.Call.Unregistered as U import Network.GRPC.LowLevel.GRPC - --- NOTE: the concurrency requirements for a CompletionQueue are a little --- complicated. There are two read operations: next and pluck. We can either --- call next on a CQ or call pluck up to 'maxCompletionQueuePluckers' times --- concurrently, but we can't mix next and pluck calls. Fortunately, we only --- need to use next when we are shutting down the queue. Thus, we do two things --- to shut down: --- 1. Set the shuttingDown 'TVar' to 'True'. When this is set, no new pluck --- calls will be allowed to start. --- 2. Wait until no threads are plucking, as counted by 'currentPluckers'. --- This logic can be seen in 'pluck' and 'shutdownCompletionQueue'. - --- NOTE: There is one more possible race condition: pushing work onto the queue --- after we begin to shut down. --- Solution: another counter, which must reach zero before the shutdown --- can start. - --- TODO: 'currentPushers' currently imposes an arbitrary limit on the number of --- concurrent pushers to the CQ, but I don't know what the limit should be set --- to. I haven't found any documentation that suggests there is a limit imposed --- by the gRPC library, but there might be. Need to investigate further. - --- | Wraps the state necessary to use a gRPC completion queue. Completion queues --- are used to wait for batches gRPC operations ('Op's) to finish running, as --- well as wait for various other operations, such as server shutdown, pinging, --- checking to see if we've been disconnected, and so forth. -data CompletionQueue = CompletionQueue {unsafeCQ :: C.CompletionQueue, - -- ^ All access to this field must be - -- guarded by a check of 'shuttingDown'. - currentPluckers :: TVar Int, - -- ^ Used to limit the number of - -- concurrent calls to pluck on this - -- queue. - -- The max value is set by gRPC in - -- 'C.maxCompletionQueuePluckers' - currentPushers :: TVar Int, - -- ^ Used to prevent new work from - -- being pushed onto the queue when - -- the queue begins to shut down. - shuttingDown :: TVar Bool, - -- ^ Used to prevent new pluck calls on - -- the queue when the queue begins to - -- shut down. - nextTag :: IORef Int - -- ^ Used to supply unique tags for work - -- items pushed onto the queue. - } - --- | Create a new 'C.Tag' for identifying work items on the 'CompletionQueue'. --- This will eventually wrap around after reaching @maxBound :: Int@, but from a --- practical perspective, that should be safe. -newTag :: CompletionQueue -> IO C.Tag -newTag CompletionQueue{..} = do - i <- atomicModifyIORef' nextTag (\i -> (i+1,i)) - return $ C.Tag $ plusPtr nullPtr i - -maxWorkPushers :: Int -maxWorkPushers = 100 --TODO: figure out what this should be. - -data CQOpType = Push | Pluck deriving (Show, Eq, Enum) - -getCount :: CQOpType -> CompletionQueue -> TVar Int -getCount Push = currentPushers -getCount Pluck = currentPluckers - -getLimit :: CQOpType -> Int -getLimit Push = maxWorkPushers -getLimit Pluck = C.maxCompletionQueuePluckers - --- | Safely brackets an operation that pushes work onto or plucks results from --- the given 'CompletionQueue'. -withPermission :: CQOpType - -> CompletionQueue - -> IO (Either GRPCIOError a) - -> IO (Either GRPCIOError a) -withPermission op cq f = - bracket acquire release doOp - where acquire = atomically $ do - isShuttingDown <- readTVar (shuttingDown cq) - if isShuttingDown - then return False - else do currCount <- readTVar $ getCount op cq - if currCount < getLimit op - then modifyTVar' (getCount op cq) (+1) >> return True - else retry - doOp gotResource = if gotResource - then f - else return $ Left GRPCIOShutdown - release gotResource = - if gotResource - then atomically $ modifyTVar' (getCount op cq) (subtract 1) - else return () +import Network.GRPC.LowLevel.CompletionQueue.Internal withCompletionQueue :: GRPC -> (CompletionQueue -> IO a) -> IO a withCompletionQueue grpc = bracket (createCompletionQueue grpc) @@ -155,35 +56,6 @@ createCompletionQueue _ = do nextTag <- newIORef minBound return $ CompletionQueue{..} -type TimeoutSeconds = Int - --- | Translate 'C.Event' to an error. The caller is responsible for ensuring --- that the event actually corresponds to an error condition; a successful event --- will be translated to a 'GRPCIOUnknownError'. -eventToError :: C.Event -> (Either GRPCIOError a) -eventToError (C.Event C.QueueShutdown _ _) = Left GRPCIOShutdown -eventToError (C.Event C.QueueTimeout _ _) = Left GRPCIOTimeout -eventToError _ = Left GRPCIOUnknownError - --- | Returns true iff the given grpc_event was a success. -isEventSuccessful :: C.Event -> Bool -isEventSuccessful (C.Event C.OpComplete True _) = True -isEventSuccessful _ = False - --- | Waits for the given number of seconds for the given tag to appear on the --- completion queue. Throws 'GRPCIOShutdown' if the completion queue is shutting --- down and cannot handle new requests. -pluck :: CompletionQueue -> C.Tag -> TimeoutSeconds - -> IO (Either GRPCIOError ()) -pluck cq@CompletionQueue{..} tag waitSeconds = do - grpcDebug $ "pluck: called with tag: " ++ show tag - ++ " and wait: " ++ show waitSeconds - withPermission Pluck cq $ do - C.withDeadlineSeconds waitSeconds $ \deadline -> do - ev <- C.grpcCompletionQueuePluck unsafeCQ tag deadline C.reserved - grpcDebug $ "pluck: finished. Event: " ++ show ev - return $ if isEventSuccessful ev then Right () else eventToError ev - -- TODO: I'm thinking it might be easier to use 'Either' uniformly everywhere -- even when it's isomorphic to 'Maybe'. If that doesn't turn out to be the -- case, switch these to 'Maybe'. @@ -241,20 +113,6 @@ channelCreateRegisteredCall handle deadline C.reserved return $ Right $ ClientCall call -channelCreateCall :: C.Channel - -> C.Call - -> C.PropagationMask - -> CompletionQueue - -> MethodName - -> Endpoint - -> C.CTimeSpecPtr - -> IO (Either GRPCIOError ClientCall) -channelCreateCall chan parent mask cq@CompletionQueue{..} meth endpt deadline = - withPermission Push cq $ do - call <- C.grpcChannelCreateCall chan parent mask unsafeCQ - (unMethodName meth) (unEndpoint endpt) deadline C.reserved - return $ Right $ ClientCall call - -- | Create the call object to handle a registered call. serverRequestRegisteredCall :: C.Server -> CompletionQueue -> TimeoutSeconds -> RegisteredMethod -> MetadataMap @@ -309,54 +167,6 @@ serverRequestRegisteredCall C.metadataArrayDestroy metadataArrayPtr free bbPtr -serverRequestCall :: C.Server -> CompletionQueue -> TimeoutSeconds - -> IO (Either GRPCIOError U.ServerCall) -serverRequestCall server cq@CompletionQueue{..} timeLimit = - withPermission Push cq $ do - callPtr <- malloc - grpcDebug $ "serverRequestCall: callPtr is " ++ show callPtr - callDetails <- C.createCallDetails - metadataArrayPtr <- C.metadataArrayCreate - metadataArray <- peek metadataArrayPtr - tag <- newTag cq - callError <- C.grpcServerRequestCall server callPtr callDetails - metadataArray unsafeCQ unsafeCQ tag - grpcDebug $ "serverRequestCall: callError was " ++ show callError - if callError /= C.CallOk - then do grpcDebug "serverRequestCall: got call error; cleaning up." - failureCleanup callPtr callDetails metadataArrayPtr - return $ Left $ GRPCIOCallError callError - else do pluckResult <- pluck cq tag timeLimit - grpcDebug $ "serverRequestCall: pluckResult was " - ++ show pluckResult - case pluckResult of - Left x -> do - grpcDebug "serverRequestCall: pluck error; cleaning up." - failureCleanup callPtr callDetails - metadataArrayPtr - return $ Left x - Right () -> do - rawCall <- peek callPtr - let call = U.ServerCall rawCall - metadataArrayPtr - Nothing - callDetails - return $ Right call - - --TODO: the gRPC library appears to hold onto these pointers for a random - -- amount of time, even after returning from the only call that uses them. - -- This results in malloc errors if - -- gRPC tries to modify them after we free them. To work around it, - -- we sleep for a while before freeing the objects. We should find a - -- permanent solution that's more robust. - where failureCleanup callPtr callDetails metadataArrayPtr = forkIO $ do - threadDelaySecs 30 - grpcDebug "serverRequestCall: doing delayed cleanup." - free callPtr - C.destroyCallDetails callDetails - C.metadataArrayDestroy metadataArrayPtr - return () - -- | Register the server's completion queue. Must be done before the server is -- started. serverRegisterCompletionQueue :: C.Server -> CompletionQueue -> IO () @@ -366,6 +176,3 @@ serverRegisterCompletionQueue server CompletionQueue{..} = serverShutdownAndNotify :: C.Server -> CompletionQueue -> C.Tag -> IO () serverShutdownAndNotify server CompletionQueue{..} tag = C.grpcServerShutdownAndNotify server unsafeCQ tag - -threadDelaySecs :: Int -> IO () -threadDelaySecs = threadDelay . (* 10^(6::Int)) diff --git a/src/Network/GRPC/LowLevel/CompletionQueue/Internal.hs b/src/Network/GRPC/LowLevel/CompletionQueue/Internal.hs new file mode 100644 index 0000000..1facf67 --- /dev/null +++ b/src/Network/GRPC/LowLevel/CompletionQueue/Internal.hs @@ -0,0 +1,134 @@ +{-# LANGUAGE RecordWildCards #-} + +module Network.GRPC.LowLevel.CompletionQueue.Internal where + +import Control.Concurrent.STM (atomically, retry) +import Control.Concurrent.STM.TVar (TVar, modifyTVar', readTVar) +import Control.Exception (bracket) +import Data.IORef (IORef, atomicModifyIORef') +import Foreign.Ptr (nullPtr, plusPtr) +import Network.GRPC.LowLevel.GRPC +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Constants as C +import qualified Network.GRPC.Unsafe.Time as C + +-- NOTE: the concurrency requirements for a CompletionQueue are a little +-- complicated. There are two read operations: next and pluck. We can either +-- call next on a CQ or call pluck up to 'maxCompletionQueuePluckers' times +-- concurrently, but we can't mix next and pluck calls. Fortunately, we only +-- need to use next when we are shutting down the queue. Thus, we do two things +-- to shut down: +-- 1. Set the shuttingDown 'TVar' to 'True'. When this is set, no new pluck +-- calls will be allowed to start. +-- 2. Wait until no threads are plucking, as counted by 'currentPluckers'. +-- This logic can be seen in 'pluck' and 'shutdownCompletionQueue'. + +-- NOTE: There is one more possible race condition: pushing work onto the queue +-- after we begin to shut down. +-- Solution: another counter, which must reach zero before the shutdown +-- can start. + +-- TODO: 'currentPushers' currently imposes an arbitrary limit on the number of +-- concurrent pushers to the CQ, but I don't know what the limit should be set +-- to. I haven't found any documentation that suggests there is a limit imposed +-- by the gRPC library, but there might be. Need to investigate further. + +-- | Wraps the state necessary to use a gRPC completion queue. Completion queues +-- are used to wait for batches gRPC operations ('Op's) to finish running, as +-- well as wait for various other operations, such as server shutdown, pinging, +-- checking to see if we've been disconnected, and so forth. +data CompletionQueue = CompletionQueue {unsafeCQ :: C.CompletionQueue, + -- ^ All access to this field must be + -- guarded by a check of 'shuttingDown'. + currentPluckers :: TVar Int, + -- ^ Used to limit the number of + -- concurrent calls to pluck on this + -- queue. + -- The max value is set by gRPC in + -- 'C.maxCompletionQueuePluckers' + currentPushers :: TVar Int, + -- ^ Used to prevent new work from + -- being pushed onto the queue when + -- the queue begins to shut down. + shuttingDown :: TVar Bool, + -- ^ Used to prevent new pluck calls on + -- the queue when the queue begins to + -- shut down. + nextTag :: IORef Int + -- ^ Used to supply unique tags for work + -- items pushed onto the queue. + } + +type TimeoutSeconds = Int + +data CQOpType = Push | Pluck deriving (Show, Eq, Enum) + +-- | Create a new 'C.Tag' for identifying work items on the 'CompletionQueue'. +-- This will eventually wrap around after reaching @maxBound :: Int@, but from a +-- practical perspective, that should be safe. +newTag :: CompletionQueue -> IO C.Tag +newTag CompletionQueue{..} = do + i <- atomicModifyIORef' nextTag (\i -> (i+1,i)) + return $ C.Tag $ plusPtr nullPtr i + +-- | Safely brackets an operation that pushes work onto or plucks results from +-- the given 'CompletionQueue'. +withPermission :: CQOpType + -> CompletionQueue + -> IO (Either GRPCIOError a) + -> IO (Either GRPCIOError a) +withPermission op cq f = + bracket acquire release doOp + where acquire = atomically $ do + isShuttingDown <- readTVar (shuttingDown cq) + if isShuttingDown + then return False + else do currCount <- readTVar $ getCount op cq + if currCount < getLimit op + then modifyTVar' (getCount op cq) (+1) >> return True + else retry + doOp gotResource = if gotResource + then f + else return $ Left GRPCIOShutdown + release gotResource = + if gotResource + then atomically $ modifyTVar' (getCount op cq) (subtract 1) + else return () + +-- | Waits for the given number of seconds for the given tag to appear on the +-- completion queue. Throws 'GRPCIOShutdown' if the completion queue is shutting +-- down and cannot handle new requests. +pluck :: CompletionQueue -> C.Tag -> TimeoutSeconds + -> IO (Either GRPCIOError ()) +pluck cq@CompletionQueue{..} tag waitSeconds = do + grpcDebug $ "pluck: called with tag: " ++ show tag + ++ " and wait: " ++ show waitSeconds + withPermission Pluck cq $ do + C.withDeadlineSeconds waitSeconds $ \deadline -> do + ev <- C.grpcCompletionQueuePluck unsafeCQ tag deadline C.reserved + grpcDebug $ "pluck: finished. Event: " ++ show ev + return $ if isEventSuccessful ev then Right () else eventToError ev + +-- | Translate 'C.Event' to an error. The caller is responsible for ensuring +-- that the event actually corresponds to an error condition; a successful event +-- will be translated to a 'GRPCIOUnknownError'. +eventToError :: C.Event -> (Either GRPCIOError a) +eventToError (C.Event C.QueueShutdown _ _) = Left GRPCIOShutdown +eventToError (C.Event C.QueueTimeout _ _) = Left GRPCIOTimeout +eventToError _ = Left GRPCIOUnknownError + +-- | Returns true iff the given grpc_event was a success. +isEventSuccessful :: C.Event -> Bool +isEventSuccessful (C.Event C.OpComplete True _) = True +isEventSuccessful _ = False + +maxWorkPushers :: Int +maxWorkPushers = 100 --TODO: figure out what this should be. + +getCount :: CQOpType -> CompletionQueue -> TVar Int +getCount Push = currentPushers +getCount Pluck = currentPluckers + +getLimit :: CQOpType -> Int +getLimit Push = maxWorkPushers +getLimit Pluck = C.maxCompletionQueuePluckers diff --git a/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs b/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs new file mode 100644 index 0000000..40ea875 --- /dev/null +++ b/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs @@ -0,0 +1,78 @@ +{-# LANGUAGE RecordWildCards #-} + +module Network.GRPC.LowLevel.CompletionQueue.Unregistered where + +import Control.Concurrent (forkIO) +import Foreign.Marshal.Alloc (free, malloc) +import Foreign.Storable (peek) +import Network.GRPC.LowLevel.Call +import qualified Network.GRPC.LowLevel.Call.Unregistered as U +import Network.GRPC.LowLevel.CompletionQueue.Internal +import Network.GRPC.LowLevel.GRPC +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Constants as C +import qualified Network.GRPC.Unsafe.Metadata as C +import qualified Network.GRPC.Unsafe.Time as C + +channelCreateCall :: C.Channel + -> C.Call + -> C.PropagationMask + -> CompletionQueue + -> MethodName + -> Endpoint + -> C.CTimeSpecPtr + -> IO (Either GRPCIOError ClientCall) +channelCreateCall chan parent mask cq@CompletionQueue{..} meth endpt deadline = + withPermission Push cq $ do + call <- C.grpcChannelCreateCall chan parent mask unsafeCQ + (unMethodName meth) (unEndpoint endpt) deadline C.reserved + return $ Right $ ClientCall call + + +serverRequestCall :: C.Server -> CompletionQueue -> TimeoutSeconds + -> IO (Either GRPCIOError U.ServerCall) +serverRequestCall server cq@CompletionQueue{..} timeLimit = + withPermission Push cq $ do + callPtr <- malloc + grpcDebug $ "serverRequestCall: callPtr is " ++ show callPtr + callDetails <- C.createCallDetails + metadataArrayPtr <- C.metadataArrayCreate + metadataArray <- peek metadataArrayPtr + tag <- newTag cq + callError <- C.grpcServerRequestCall server callPtr callDetails + metadataArray unsafeCQ unsafeCQ tag + grpcDebug $ "serverRequestCall: callError was " ++ show callError + if callError /= C.CallOk + then do grpcDebug "serverRequestCall: got call error; cleaning up." + failureCleanup callPtr callDetails metadataArrayPtr + return $ Left $ GRPCIOCallError callError + else do pluckResult <- pluck cq tag timeLimit + grpcDebug $ "serverRequestCall: pluckResult was " + ++ show pluckResult + case pluckResult of + Left x -> do + grpcDebug "serverRequestCall: pluck error; cleaning up." + failureCleanup callPtr callDetails + metadataArrayPtr + return $ Left x + Right () -> do + rawCall <- peek callPtr + let call = U.ServerCall rawCall + metadataArrayPtr + Nothing + callDetails + return $ Right call + + --TODO: the gRPC library appears to hold onto these pointers for a random + -- amount of time, even after returning from the only call that uses them. + -- This results in malloc errors if + -- gRPC tries to modify them after we free them. To work around it, + -- we sleep for a while before freeing the objects. We should find a + -- permanent solution that's more robust. + where failureCleanup callPtr callDetails metadataArrayPtr = forkIO $ do + threadDelaySecs 30 + grpcDebug "serverRequestCall: doing delayed cleanup." + free callPtr + C.destroyCallDetails callDetails + C.metadataArrayDestroy metadataArrayPtr + return () diff --git a/src/Network/GRPC/LowLevel/GRPC.hs b/src/Network/GRPC/LowLevel/GRPC.hs index 65529d6..aa4106f 100644 --- a/src/Network/GRPC/LowLevel/GRPC.hs +++ b/src/Network/GRPC/LowLevel/GRPC.hs @@ -3,12 +3,7 @@ {-# LANGUAGE MultiParamTypeClasses #-} module Network.GRPC.LowLevel.GRPC where -{- --- TODO: remove if not needed -import Control.Monad.IO.Class (liftIO, MonadIO) -import Control.Monad.Except (ExceptT(..), runExceptT, throwError, - MonadError) --} +import Control.Concurrent (threadDelay) import Control.Exception import qualified Data.ByteString as B import qualified Data.Map as M @@ -63,6 +58,9 @@ grpcDebug str = do tid <- myThreadId grpcDebug _ = return () #endif +threadDelaySecs :: Int -> IO () +threadDelaySecs = threadDelay . (* 10^(6::Int)) + {- -- TODO: remove this once finally decided on whether to use it. -- | Monad for running gRPC operations. diff --git a/src/Network/GRPC/LowLevel/Op.hs b/src/Network/GRPC/LowLevel/Op.hs index 9d71bb1..4950338 100644 --- a/src/Network/GRPC/LowLevel/Op.hs +++ b/src/Network/GRPC/LowLevel/Op.hs @@ -7,7 +7,6 @@ import Control.Exception import qualified Data.ByteString as B import qualified Data.Map.Strict as M import Data.Maybe (catMaybes) -import Data.String (IsString) import Foreign.C.String (CString) import Foreign.C.Types (CInt) import Foreign.Marshal.Alloc (free, malloc, @@ -20,7 +19,6 @@ import qualified Network.GRPC.Unsafe.Metadata as C import qualified Network.GRPC.Unsafe.Op as C import Network.GRPC.LowLevel.Call -import Network.GRPC.LowLevel.Call.Unregistered as U import Network.GRPC.LowLevel.CompletionQueue import Network.GRPC.LowLevel.GRPC @@ -224,7 +222,7 @@ runOps call cq ops timeLimit = -- | For a given call, run the given 'Op's on the given completion queue with -- the given tag. Blocks until the ops are complete or the given number of -- seconds have elapsed. --- TODO: now that 'ServerRegCall' and 'ServerUnregCall' are separate types, we +-- TODO: now that 'ServerRegCall' and 'U.ServerCall' are separate types, we -- could try to limit the input 'Op's more appropriately. E.g., we don't use -- an 'OpRecvInitialMetadata' when receiving a registered call, because gRPC -- handles that for us. @@ -243,13 +241,6 @@ runServerRegOps :: ServerRegCall -> IO (Either GRPCIOError [OpRecvResult]) runServerRegOps = runOps . internalServerRegCall -runServerUnregOps :: U.ServerCall - -> CompletionQueue - -> [Op] - -> TimeoutSeconds - -> IO (Either GRPCIOError [OpRecvResult]) -runServerUnregOps = runOps . U.internalServerCall - -- | Like 'runServerOps', but for client-side calls. runClientOps :: ClientCall -> CompletionQueue @@ -263,6 +254,6 @@ runClientOps = runOps . internalClientCall extractStatusInfo :: [OpRecvResult] -> Maybe (MetadataMap, C.StatusCode, B.ByteString) extractStatusInfo [] = Nothing -extractStatusInfo (res@(OpRecvStatusOnClientResult meta code details):_) = +extractStatusInfo (OpRecvStatusOnClientResult meta code details:_) = Just (meta, code, details) extractStatusInfo (_:xs) = extractStatusInfo xs diff --git a/src/Network/GRPC/LowLevel/Op/Unregistered.hs b/src/Network/GRPC/LowLevel/Op/Unregistered.hs new file mode 100644 index 0000000..c41b8a1 --- /dev/null +++ b/src/Network/GRPC/LowLevel/Op/Unregistered.hs @@ -0,0 +1,13 @@ +module Network.GRPC.LowLevel.Op.Unregistered where + +import Network.GRPC.LowLevel.GRPC +import Network.GRPC.LowLevel.Op +import Network.GRPC.LowLevel.CompletionQueue +import Network.GRPC.LowLevel.Call.Unregistered as U + +runServerOps :: U.ServerCall + -> CompletionQueue + -> [Op] + -> TimeoutSeconds + -> IO (Either GRPCIOError [OpRecvResult]) +runServerOps = runOps . U.internalServerCall diff --git a/src/Network/GRPC/LowLevel/Server.hs b/src/Network/GRPC/LowLevel/Server.hs index 6b269c3..f34d676 100644 --- a/src/Network/GRPC/LowLevel/Server.hs +++ b/src/Network/GRPC/LowLevel/Server.hs @@ -15,7 +15,6 @@ import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue, createCompletionQueue, pluck, serverRegisterCompletionQueue, - serverRequestCall, serverRequestRegisteredCall, serverShutdownAndNotify, shutdownCompletionQueue) diff --git a/src/Network/GRPC/LowLevel/Server/Unregistered.hs b/src/Network/GRPC/LowLevel/Server/Unregistered.hs index 0c8273f..1a007d4 100644 --- a/src/Network/GRPC/LowLevel/Server/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/Server/Unregistered.hs @@ -2,21 +2,22 @@ module Network.GRPC.LowLevel.Server.Unregistered where -import Control.Exception (finally) -import Data.ByteString (ByteString) -import Network.GRPC.LowLevel.Call (MethodName) +import Control.Exception (finally) +import Data.ByteString (ByteString) +import Network.GRPC.LowLevel.Call (MethodName) import Network.GRPC.LowLevel.Call.Unregistered -import Network.GRPC.LowLevel.CompletionQueue (TimeoutSeconds, - serverRequestCall) +import Network.GRPC.LowLevel.CompletionQueue (TimeoutSeconds) +import qualified Network.GRPC.LowLevel.CompletionQueue.Unregistered as U import Network.GRPC.LowLevel.GRPC -import Network.GRPC.LowLevel.Op +import Network.GRPC.LowLevel.Op (OpRecvResult (..)) +import qualified Network.GRPC.LowLevel.Op.Unregistered as U import Network.GRPC.LowLevel.Server -import qualified Network.GRPC.Unsafe.Op as C +import qualified Network.GRPC.Unsafe.Op as C serverCreateCall :: Server -> TimeoutSeconds -> IO (Either GRPCIOError ServerCall) serverCreateCall Server{..} timeLimit = - serverRequestCall internalServer serverCQ timeLimit + U.serverRequestCall internalServer serverCQ timeLimit withServerCall :: Server -> TimeoutSeconds -> (ServerCall -> IO (Either GRPCIOError a)) @@ -45,7 +46,7 @@ serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do withServerCall s timeLimit $ \call -> do grpcDebug "serverHandleCall(U): starting batch." let recvOps = serverOpsGetNormalCall srvMetadata - opResults <- runServerUnregOps call serverCQ recvOps timeLimit + opResults <- U.runServerOps call serverCQ recvOps timeLimit case opResults of Left x -> do grpcDebug "serverHandleNormalCall(U): ops failed; aborting" return $ Left x @@ -59,7 +60,7 @@ serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do let status = C.GrpcStatusOk let respOps = serverOpsSendNormalResponse respBody respMetadata status details - respOpsResults <- runServerUnregOps call serverCQ respOps timeLimit + respOpsResults <- U.runServerOps call serverCQ respOps timeLimit case respOpsResults of Left x -> do grpcDebug "serverHandleNormalCall(U): resp failed." return $ Left x