diff --git a/grpc-haskell.cabal b/grpc-haskell.cabal index 0ec0e2f..9e726a9 100644 --- a/grpc-haskell.cabal +++ b/grpc-haskell.cabal @@ -56,7 +56,7 @@ library , grpc/impl/codegen/slice.h build-tools: c2hs default-language: Haskell2010 - ghc-options: -Wall -fwarn-incomplete-patterns + ghc-options: -Wall -fwarn-incomplete-patterns -fno-warn-unused-do-bind include-dirs: include hs-source-dirs: src default-extensions: CPP @@ -80,7 +80,7 @@ test-suite test LowLevelTests, UnsafeTests default-language: Haskell2010 - ghc-options: -Wall -fwarn-incomplete-patterns -g -threaded + ghc-options: -Wall -fwarn-incomplete-patterns -fno-warn-unused-do-bind -g -threaded hs-source-dirs: tests main-is: Properties.hs type: exitcode-stdio-1.0 diff --git a/src/Network/GRPC/LowLevel/Call.hs b/src/Network/GRPC/LowLevel/Call.hs index 15559db..ac8663a 100644 --- a/src/Network/GRPC/LowLevel/Call.hs +++ b/src/Network/GRPC/LowLevel/Call.hs @@ -107,3 +107,9 @@ destroyCall call@ServerCall{..} = do forM_ callDetails C.destroyCallDetails grpcDebug $ "destroying deadline." ++ show callDeadline forM_ callDeadline C.timespecDestroy + +_nowarn_unused :: a +_nowarn_unused = + castPtr `undefined` + (peek :: Ptr Int -> IO Int) `undefined` + () diff --git a/src/Network/GRPC/LowLevel/CompletionQueue.hs b/src/Network/GRPC/LowLevel/CompletionQueue.hs index b7af4c4..3310734 100644 --- a/src/Network/GRPC/LowLevel/CompletionQueue.hs +++ b/src/Network/GRPC/LowLevel/CompletionQueue.hs @@ -6,8 +6,8 @@ {-# LANGUAGE RecordWildCards #-} -module Network.GRPC.LowLevel.CompletionQueue ( - CompletionQueue +module Network.GRPC.LowLevel.CompletionQueue + ( CompletionQueue , withCompletionQueue , createCompletionQueue , shutdownCompletionQueue @@ -16,33 +16,35 @@ module Network.GRPC.LowLevel.CompletionQueue ( , channelCreateRegisteredCall , channelCreateCall , TimeoutSeconds - , eventSuccess + , isEventSuccessful , serverRegisterCompletionQueue , serverShutdownAndNotify , serverRequestRegisteredCall , serverRequestCall , newTag -) where + ) +where -import Control.Concurrent (forkIO, threadDelay) -import Control.Concurrent.STM (atomically, retry, check) -import Control.Concurrent.STM.TVar (TVar, newTVarIO, modifyTVar', - readTVar, writeTVar) -import Control.Exception (bracket) -import Data.IORef (IORef, newIORef, atomicModifyIORef') -import Data.List (intersperse) -import Foreign.Marshal.Alloc (malloc, free) -import Foreign.Ptr (nullPtr, plusPtr) -import Foreign.Storable (peek) -import qualified Network.GRPC.Unsafe as C +import Control.Concurrent (forkIO, threadDelay) +import Control.Concurrent.STM (atomically, check, retry) +import Control.Concurrent.STM.TVar (TVar, modifyTVar', newTVarIO, + readTVar, writeTVar) +import Control.Exception (bracket) +import Data.IORef (IORef, atomicModifyIORef', + newIORef) +import Data.List (intersperse) +import Foreign.Marshal.Alloc (free, malloc) +import Foreign.Ptr (nullPtr, plusPtr) +import Foreign.Storable (peek) +import qualified Network.GRPC.Unsafe as C import qualified Network.GRPC.Unsafe.Constants as C -import qualified Network.GRPC.Unsafe.Time as C -import qualified Network.GRPC.Unsafe.Op as C -import qualified Network.GRPC.Unsafe.Metadata as C -import System.Timeout (timeout) +import qualified Network.GRPC.Unsafe.Metadata as C +import qualified Network.GRPC.Unsafe.Op as C +import qualified Network.GRPC.Unsafe.Time as C +import System.Timeout (timeout) -import Network.GRPC.LowLevel.GRPC -import Network.GRPC.LowLevel.Call +import Network.GRPC.LowLevel.Call +import Network.GRPC.LowLevel.GRPC -- NOTE: the concurrency requirements for a CompletionQueue are a little -- complicated. There are two read operations: next and pluck. We can either @@ -69,7 +71,7 @@ import Network.GRPC.LowLevel.Call -- are used to wait for batches gRPC operations ('Op's) to finish running, as -- well as wait for various other operations, such as server shutdown, pinging, -- checking to see if we've been disconnected, and so forth. -data CompletionQueue = CompletionQueue {unsafeCQ :: C.CompletionQueue, +data CompletionQueue = CompletionQueue {unsafeCQ :: C.CompletionQueue, -- ^ All access to this field must be -- guarded by a check of 'shuttingDown'. currentPluckers :: TVar Int, @@ -78,15 +80,15 @@ data CompletionQueue = CompletionQueue {unsafeCQ :: C.CompletionQueue, -- queue. -- The max value is set by gRPC in -- 'C.maxCompletionQueuePluckers' - currentPushers :: TVar Int, + currentPushers :: TVar Int, -- ^ Used to prevent new work from -- being pushed onto the queue when -- the queue begins to shut down. - shuttingDown :: TVar Bool, + shuttingDown :: TVar Bool, -- ^ Used to prevent new pluck calls on -- the queue when the queue begins to -- shut down. - nextTag :: IORef Int + nextTag :: IORef Int -- ^ Used to supply unique tags for work -- items pushed onto the queue. } @@ -159,13 +161,14 @@ eventToError (C.Event C.QueueShutdown _ _) = Left GRPCIOShutdown eventToError (C.Event C.QueueTimeout _ _) = Left GRPCIOTimeout eventToError _ = Left GRPCIOUnknownError -isFailedEvent :: C.Event -> Bool -isFailedEvent C.Event{..} = (eventCompletionType /= C.OpComplete) - || not eventSuccess +-- | Returns true iff the given grpc_event was a success. +isEventSuccessful :: C.Event -> Bool +isEventSuccessful (C.Event C.OpComplete True _) = True +isEventSuccessful _ = False -- | Waits for the given number of seconds for the given tag to appear on the -- completion queue. Throws 'GRPCIOShutdown' if the completion queue is shutting ---down and cannot handle new requests. +-- down and cannot handle new requests. pluck :: CompletionQueue -> C.Tag -> TimeoutSeconds -> IO (Either GRPCIOError ()) pluck cq@CompletionQueue{..} tag waitSeconds = do @@ -175,9 +178,7 @@ pluck cq@CompletionQueue{..} tag waitSeconds = do C.withDeadlineSeconds waitSeconds $ \deadline -> do ev <- C.grpcCompletionQueuePluck unsafeCQ tag deadline C.reserved grpcDebug $ "pluck: finished. Event: " ++ show ev - if isFailedEvent ev - then return $ eventToError ev - else return $ Right () + return $ if isEventSuccessful ev then Right () else eventToError ev -- TODO: I'm thinking it might be easier to use 'Either' uniformly everywhere -- even when it's isomorphic to 'Maybe'. If that doesn't turn out to be the @@ -207,7 +208,7 @@ shutdownCompletionQueue (CompletionQueue{..}) = do atomically $ readTVar currentPluckers >>= \x -> check (x == 0) --drain the queue C.grpcCompletionQueueShutdown unsafeCQ - loopRes <- timeout (5*10^6) drainLoop + loopRes <- timeout (5*10^(6::Int)) drainLoop case loopRes of Nothing -> return $ Left GRPCIOShutdownFailure Just () -> C.grpcCompletionQueueDestroy unsafeCQ >> return (Right ()) @@ -221,11 +222,6 @@ shutdownCompletionQueue (CompletionQueue{..}) = do C.QueueTimeout -> drainLoop C.OpComplete -> drainLoop --- | Returns true iff the given grpc_event was a success. -eventSuccess :: C.Event -> Bool -eventSuccess (C.Event C.OpComplete True _) = True -eventSuccess _ = False - channelCreateRegisteredCall :: C.Channel -> C.Call -> C.PropagationMask -> CompletionQueue -> C.CallHandle -> C.CTimeSpecPtr -> IO (Either GRPCIOError Call) @@ -290,7 +286,7 @@ serverRequestRegisteredCall return $ Right assembledCall -- TODO: see TODO for failureCleanup in serverRequestCall. where failureCleanup deadline callPtr metadataArrayPtr bbPtr = forkIO $ do - threadDelay (30*10^6) + threadDelaySecs 30 grpcDebug "serverRequestRegisteredCall: doing delayed cleanup." C.timespecDestroy deadline free callPtr @@ -340,7 +336,7 @@ serverRequestCall server cq@CompletionQueue{..} timeLimit = -- we sleep for a while before freeing the objects. We should find a -- permanent solution that's more robust. where failureCleanup callPtr callDetails metadataArrayPtr = forkIO $ do - threadDelay (30*10^6) + threadDelaySecs 30 grpcDebug "serverRequestCall: doing delayed cleanup." free callPtr C.destroyCallDetails callDetails @@ -356,3 +352,6 @@ serverRegisterCompletionQueue server CompletionQueue{..} = serverShutdownAndNotify :: C.Server -> CompletionQueue -> C.Tag -> IO () serverShutdownAndNotify server CompletionQueue{..} tag = C.grpcServerShutdownAndNotify server unsafeCQ tag + +threadDelaySecs :: Int -> IO () +threadDelaySecs = threadDelay . (* 10^(6::Int)) diff --git a/src/Network/GRPC/LowLevel/GRPC.hs b/src/Network/GRPC/LowLevel/GRPC.hs index 4d7bd8f..dedf1ba 100644 --- a/src/Network/GRPC/LowLevel/GRPC.hs +++ b/src/Network/GRPC/LowLevel/GRPC.hs @@ -51,7 +51,7 @@ grpcDebug :: String -> IO () grpcDebug str = do tid <- myThreadId putStrLn $ (show tid) ++ ": " ++ str #else -grpcDebug str = return () +grpcDebug _ = return () #endif {- diff --git a/src/Network/GRPC/LowLevel/Op.hs b/src/Network/GRPC/LowLevel/Op.hs index 7a82e08..938f2de 100644 --- a/src/Network/GRPC/LowLevel/Op.hs +++ b/src/Network/GRPC/LowLevel/Op.hs @@ -1,26 +1,27 @@ -{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE RecordWildCards #-} module Network.GRPC.LowLevel.Op where import Control.Exception -import qualified Data.ByteString as B -import qualified Data.Map.Strict as M -import Data.Maybe (catMaybes) -import Data.String (IsString) -import Foreign.C.String (CString) -import Foreign.C.Types (CInt) -import Foreign.Marshal.Alloc (malloc, mallocBytes, free) -import Foreign.Ptr (Ptr, nullPtr) -import Foreign.Storable (peek, poke) -import qualified Network.GRPC.Unsafe as C -import qualified Network.GRPC.Unsafe.Metadata as C -import qualified Network.GRPC.Unsafe.ByteBuffer as C -import qualified Network.GRPC.Unsafe.Op as C +import qualified Data.ByteString as B +import qualified Data.Map.Strict as M +import Data.Maybe (catMaybes) +import Data.String (IsString) +import Foreign.C.String (CString) +import Foreign.C.Types (CInt) +import Foreign.Marshal.Alloc (free, malloc, + mallocBytes) +import Foreign.Ptr (Ptr, nullPtr) +import Foreign.Storable (peek, poke) +import qualified Network.GRPC.Unsafe as C () +import qualified Network.GRPC.Unsafe.ByteBuffer as C +import qualified Network.GRPC.Unsafe.Metadata as C +import qualified Network.GRPC.Unsafe.Op as C -import Network.GRPC.LowLevel.GRPC -import Network.GRPC.LowLevel.CompletionQueue -import Network.GRPC.LowLevel.Call +import Network.GRPC.LowLevel.Call +import Network.GRPC.LowLevel.CompletionQueue +import Network.GRPC.LowLevel.GRPC type MetadataMap = M.Map B.ByteString B.ByteString @@ -224,3 +225,6 @@ runOps call cq ops timeLimit = grpcDebug "runOps: got good op; starting." fmap (Right . catMaybes) $ mapM resultFromOpContext contexts Left err -> return $ Left err + +_nowarn_unused :: a +_nowarn_unused = undefined nullPtr diff --git a/src/Network/GRPC/LowLevel/Server.hs b/src/Network/GRPC/LowLevel/Server.hs index d1d95b1..d541ac1 100644 --- a/src/Network/GRPC/LowLevel/Server.hs +++ b/src/Network/GRPC/LowLevel/Server.hs @@ -2,26 +2,31 @@ module Network.GRPC.LowLevel.Server where -import Control.Concurrent (threadDelay) -import Control.Exception (bracket, finally) +import Control.Concurrent (threadDelay) +import Control.Exception (bracket, finally) import Control.Monad -import Data.ByteString (ByteString) -import qualified Data.Map as M -import Foreign.Ptr (nullPtr) -import Foreign.Storable (peek) -import qualified Network.GRPC.Unsafe as C -import qualified Network.GRPC.Unsafe.Op as C +import Data.ByteString (ByteString) +import qualified Data.Map as M +import Foreign.Ptr (nullPtr) +import Foreign.Storable (peek) +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Op as C -import Network.GRPC.LowLevel.GRPC -import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue, - pluck, serverRegisterCompletionQueue, serverShutdownAndNotify, - createCompletionQueue, shutdownCompletionQueue, TimeoutSeconds, - serverRequestRegisteredCall, serverRequestCall) import Network.GRPC.LowLevel.Call +import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue, + TimeoutSeconds, + createCompletionQueue, + pluck, + serverRegisterCompletionQueue, + serverRequestCall, + serverRequestRegisteredCall, + serverShutdownAndNotify, + shutdownCompletionQueue) +import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.Op -import qualified Network.GRPC.Unsafe.ByteBuffer as C -import qualified Network.GRPC.Unsafe.Metadata as C +import qualified Network.GRPC.Unsafe.ByteBuffer as C +import qualified Network.GRPC.Unsafe.Metadata as C -- | Wraps various gRPC state needed to run a server. data Server = Server {internalServer :: C.Server, serverCQ :: CompletionQueue, @@ -30,10 +35,10 @@ data Server = Server {internalServer :: C.Server, serverCQ :: CompletionQueue, -- | Configuration needed to start a server. There might be more fields that -- need to be added to this in the future. data ServerConfig = - ServerConfig {hostName :: Host, + ServerConfig {hostName :: Host, -- ^ Name of the host the server is running on. Not sure -- how this is used. Setting to "localhost" works fine in tests. - port :: Int, + port :: Int, -- ^ Port to listen for requests on. methodsToRegister :: [(MethodName, Host, GRPCMethodType)] -- ^ List of (method name, method host, method type) tuples @@ -199,7 +204,7 @@ serverHandleNormalRegisteredCall :: Server -- metadata and returns a response body and -- metadata. -> IO (Either GRPCIOError ()) -serverHandleNormalRegisteredCall s@Server{..} rm timeLimit initMetadata f = do +serverHandleNormalRegisteredCall s@Server{..} rm timeLimit srvMetadata f = do -- TODO: we use this timeLimit twice, so the max time spent is 2*timeLimit. -- Should we just hard-code time limits instead? Not sure if client -- programmer cares, since this function will likely just be put in a loop @@ -229,16 +234,16 @@ serverHandleNormalRegisteredCall s@Server{..} rm timeLimit initMetadata f = do -- | Handle one unregistered call. serverHandleNormalCall :: Server -> TimeoutSeconds -> MetadataMap - -- ^ Initial metadata. + -- ^ Initial server metadata. -> (ByteString -> MetadataMap -> IO (ByteString, MetadataMap, StatusDetails)) -- ^ Handler function takes a request body and -- metadata and returns a response body and metadata. -> IO (Either GRPCIOError ()) -serverHandleNormalCall s@Server{..} timeLimit initMetadata f = do +serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do withServerCall s timeLimit $ \call -> do grpcDebug "serverHandleNormalCall: starting batch." - let recvOps = serverOpsGetNormalCall initMetadata + let recvOps = serverOpsGetNormalCall srvMetadata opResults <- runOps call serverCQ recvOps timeLimit case opResults of Left x -> return $ Left x @@ -255,3 +260,6 @@ serverHandleNormalCall s@Server{..} timeLimit initMetadata f = do Right _ -> grpcDebug "serverHandleNormalCall: ops done." >> return (Right ()) x -> error $ "impossible pattern match: " ++ show x + +_nowarn_unused :: a +_nowarn_unused = undefined threadDelay