From 2119ef4b168b1bcf735d8a638a115a425f14d82c Mon Sep 17 00:00:00 2001 From: Joel Stanley Date: Wed, 8 Jun 2016 10:48:28 -0500 Subject: [PATCH 1/9] Group client and server tests together --- tests/LowLevelTests.hs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/tests/LowLevelTests.hs b/tests/LowLevelTests.hs index d2d5d90..361bf33 100644 --- a/tests/LowLevelTests.hs +++ b/tests/LowLevelTests.hs @@ -19,11 +19,11 @@ lowLevelTests :: TestTree lowLevelTests = testGroup "Unit tests of low-level Haskell library" [ testGRPCBracket , testCompletionQueueCreateDestroy - , testServerCreateDestroy , testClientCreateDestroy - , testWithServerCall - , testWithClientCall + , testClientCall , testClientTimeoutNoServer + , testServerCreateDestroy + , testServerCall , testServerTimeoutNoClient -- , testWrongEndpoint , testPayload @@ -39,22 +39,12 @@ testCompletionQueueCreateDestroy = testCase "Create/destroy CQ" $ withGRPC $ \g -> withCompletionQueue g nop -testServerCreateDestroy :: TestTree -testServerCreateDestroy = - serverOnlyTest "start/stop" [] nop - testClientCreateDestroy :: TestTree testClientCreateDestroy = clientOnlyTest "start/stop" nop -testWithServerCall :: TestTree -testWithServerCall = - serverOnlyTest "create/destroy call" [] $ \s -> do - r <- withServerUnregCall s 1 $ const $ return $ Right () - r @?= Left GRPCIOTimeout - -testWithClientCall :: TestTree -testWithClientCall = +testClientCall :: TestTree +testClientCall = clientOnlyTest "create/destroy call" $ \c -> do r <- withClientCall c "foo" 10 $ const $ return $ Right () r @?= Right () @@ -66,6 +56,16 @@ testClientTimeoutNoServer = r <- clientRegisteredRequest c rm 1 "Hello" mempty r @?= Left GRPCIOTimeout +testServerCreateDestroy :: TestTree +testServerCreateDestroy = + serverOnlyTest "start/stop" [] nop + +testServerCall :: TestTree +testServerCall = + serverOnlyTest "create/destroy call" [] $ \s -> do + r <- withServerUnregCall s 1 $ const $ return $ Right () + r @?= Left GRPCIOTimeout + testServerTimeoutNoClient :: TestTree testServerTimeoutNoClient = serverOnlyTest "wait timeout when client DNE" [("/foo", Normal)] $ \s -> do From 386568463a331465e7648817fb98b24acdbf22e1 Mon Sep 17 00:00:00 2001 From: Joel Stanley Date: Wed, 8 Jun 2016 11:18:43 -0500 Subject: [PATCH 2/9] Split off support for unregistered calls to an *.Unregistered module namespace --- grpc-haskell.cabal | 3 + src/Network/GRPC/LowLevel.hs | 3 + src/Network/GRPC/LowLevel/Call.hs | 81 +++------------- .../GRPC/LowLevel/Call/Unregistered.hs | 75 +++++++++++++++ src/Network/GRPC/LowLevel/Client.hs | 73 +++----------- .../GRPC/LowLevel/Client/Unregistered.hs | 66 +++++++++++++ src/Network/GRPC/LowLevel/CompletionQueue.hs | 38 ++++---- src/Network/GRPC/LowLevel/Op.hs | 29 +++--- src/Network/GRPC/LowLevel/Server.hs | 95 ++++--------------- .../GRPC/LowLevel/Server/Unregistered.hs | 78 +++++++++++++++ 10 files changed, 305 insertions(+), 236 deletions(-) create mode 100644 src/Network/GRPC/LowLevel/Call/Unregistered.hs create mode 100644 src/Network/GRPC/LowLevel/Client/Unregistered.hs create mode 100644 src/Network/GRPC/LowLevel/Server/Unregistered.hs diff --git a/grpc-haskell.cabal b/grpc-haskell.cabal index 72373f8..0386528 100644 --- a/grpc-haskell.cabal +++ b/grpc-haskell.cabal @@ -47,8 +47,11 @@ library Network.GRPC.LowLevel.GRPC Network.GRPC.LowLevel.Op Network.GRPC.LowLevel.Server + Network.GRPC.LowLevel.Server.Unregistered Network.GRPC.LowLevel.Call + Network.GRPC.LowLevel.Call.Unregistered Network.GRPC.LowLevel.Client + Network.GRPC.LowLevel.Client.Unregistered extra-libraries: grpc includes: diff --git a/src/Network/GRPC/LowLevel.hs b/src/Network/GRPC/LowLevel.hs index aa96213..26b8f52 100644 --- a/src/Network/GRPC/LowLevel.hs +++ b/src/Network/GRPC/LowLevel.hs @@ -61,10 +61,13 @@ GRPC import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.Server +import Network.GRPC.LowLevel.Server.Unregistered import Network.GRPC.LowLevel.CompletionQueue import Network.GRPC.LowLevel.Op import Network.GRPC.LowLevel.Client +import Network.GRPC.LowLevel.Client.Unregistered import Network.GRPC.LowLevel.Call +import Network.GRPC.LowLevel.Call.Unregistered import Network.GRPC.Unsafe (ConnectivityState(..)) import Network.GRPC.Unsafe.Op (StatusCode(..)) diff --git a/src/Network/GRPC/LowLevel/Call.hs b/src/Network/GRPC/LowLevel/Call.hs index d8349ca..127b4ca 100644 --- a/src/Network/GRPC/LowLevel/Call.hs +++ b/src/Network/GRPC/LowLevel/Call.hs @@ -1,21 +1,24 @@ -{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE RecordWildCards #-} +-- | This module defines data structures and operations pertaining to registered +-- calls; for unregistered call support, see +-- `Network.GRPC.LowLevel.Call.Unregistered`. module Network.GRPC.LowLevel.Call where import Control.Monad -import Data.ByteString (ByteString) -import Data.String (IsString) -import Foreign.Marshal.Alloc (free) -import Foreign.Ptr (Ptr, nullPtr) -import Foreign.Storable (peek) +import Data.ByteString (ByteString) +import Data.String (IsString) +import Foreign.Marshal.Alloc (free) +import Foreign.Ptr (Ptr, nullPtr) +import Foreign.Storable (peek) -import qualified Network.GRPC.Unsafe as C -import qualified Network.GRPC.Unsafe.Time as C -import qualified Network.GRPC.Unsafe.Metadata as C +import qualified Network.GRPC.Unsafe as C import qualified Network.GRPC.Unsafe.ByteBuffer as C +import qualified Network.GRPC.Unsafe.Metadata as C +import qualified Network.GRPC.Unsafe.Time as C -import Network.GRPC.LowLevel.GRPC (grpcDebug, MetadataMap) +import Network.GRPC.LowLevel.GRPC (MetadataMap, grpcDebug) -- | Models the four types of RPC call supported by gRPC. We currently only -- support the first alternative, and only in a preliminary fashion. @@ -82,28 +85,6 @@ serverRegCallGetPayload ServerRegCall{..} = do then return Nothing else Just <$> C.copyByteBufferToByteString bb --- | Represents one unregistered GRPC call on the server. --- Contains pointers to all the C state needed to respond to an unregistered --- call. -data ServerUnregCall = ServerUnregCall - {internalServerUnregCall :: C.Call, - requestMetadataRecvUnreg :: Ptr C.MetadataArray, - parentPtrUnreg :: Maybe (Ptr C.Call), - callDetails :: C.CallDetails} - -serverUnregCallGetMetadata :: ServerUnregCall -> IO MetadataMap -serverUnregCallGetMetadata ServerUnregCall{..} = do - marray <- peek requestMetadataRecvUnreg - C.getAllMetadataArray marray - -serverUnregCallGetMethodName :: ServerUnregCall -> IO MethodName -serverUnregCallGetMethodName ServerUnregCall{..} = - MethodName <$> C.callDetailsGetMethod callDetails - -serverUnregCallGetHost :: ServerUnregCall -> IO Host -serverUnregCallGetHost ServerUnregCall{..} = - Host <$> C.callDetailsGetHost callDetails - debugClientCall :: ClientCall -> IO () {-# INLINE debugClientCall #-} #ifdef DEBUG @@ -138,28 +119,6 @@ debugServerRegCall call@(ServerRegCall (C.Call ptr) _ _ _ _) = do debugServerRegCall = const $ return () #endif -debugServerUnregCall :: ServerUnregCall -> IO () -#ifdef DEBUG -debugServerUnregCall call@(ServerUnregCall (C.Call ptr) _ _ _) = do - grpcDebug $ "debugServerUnregCall: server call: " ++ (show ptr) - grpcDebug $ "debugServerUnregCall: metadata ptr: " - ++ show (requestMetadataRecvUnreg call) - metadataArr <- peek (requestMetadataRecvUnreg call) - metadata <- C.getAllMetadataArray metadataArr - grpcDebug $ "debugServerUnregCall: metadata received: " ++ (show metadata) - forM_ (parentPtrUnreg call) $ \parentPtr' -> do - grpcDebug $ "debugServerRegCall: parent ptr: " ++ show parentPtr' - (C.Call parent) <- peek parentPtr' - grpcDebug $ "debugServerRegCall: parent: " ++ show parent - grpcDebug $ "debugServerUnregCall: callDetails ptr: " - ++ show (callDetails call) - --TODO: need functions for getting data out of call_details. -#else -{-# INLINE debugServerUnregCall #-} -debugServerUnregCall = const $ return () -#endif - - destroyClientCall :: ClientCall -> IO () destroyClientCall ClientCall{..} = do grpcDebug "Destroying client-side call object." @@ -180,17 +139,3 @@ destroyServerRegCall call@ServerRegCall{..} = do forM_ parentPtrReg free grpcDebug $ "destroying deadline." ++ show callDeadline C.timespecDestroy callDeadline - -destroyServerUnregCall :: ServerUnregCall -> IO () -destroyServerUnregCall call@ServerUnregCall{..} = do - grpcDebug "destroyServerUnregCall: entered." - debugServerUnregCall call - grpcDebug $ "Destroying server-side call object: " - ++ show internalServerUnregCall - C.grpcCallDestroy internalServerUnregCall - grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecvUnreg - C.metadataArrayDestroy requestMetadataRecvUnreg - grpcDebug $ "freeing parentPtrUnreg: " ++ show parentPtrUnreg - forM_ parentPtrUnreg free - grpcDebug $ "destroying call details: " ++ show callDetails - C.destroyCallDetails callDetails diff --git a/src/Network/GRPC/LowLevel/Call/Unregistered.hs b/src/Network/GRPC/LowLevel/Call/Unregistered.hs new file mode 100644 index 0000000..de6fb36 --- /dev/null +++ b/src/Network/GRPC/LowLevel/Call/Unregistered.hs @@ -0,0 +1,75 @@ +{-# LANGUAGE RecordWildCards #-} + +module Network.GRPC.LowLevel.Call.Unregistered where + +import Control.Monad +import Data.ByteString (ByteString) +import Data.String (IsString) +import Foreign.Marshal.Alloc (free) +import Foreign.Ptr (Ptr, nullPtr) +import Foreign.Storable (peek) + +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.ByteBuffer as C +import qualified Network.GRPC.Unsafe.Metadata as C +import qualified Network.GRPC.Unsafe.Time as C + +import Network.GRPC.LowLevel.Call +import Network.GRPC.LowLevel.GRPC (MetadataMap, grpcDebug) + +-- | Represents one unregistered GRPC call on the server. +-- Contains pointers to all the C state needed to respond to an unregistered +-- call. +data ServerUnregCall = ServerUnregCall + {internalServerUnregCall :: C.Call, + requestMetadataRecvUnreg :: Ptr C.MetadataArray, + parentPtrUnreg :: Maybe (Ptr C.Call), + callDetails :: C.CallDetails} + +serverUnregCallGetMetadata :: ServerUnregCall -> IO MetadataMap +serverUnregCallGetMetadata ServerUnregCall{..} = do + marray <- peek requestMetadataRecvUnreg + C.getAllMetadataArray marray + +serverUnregCallGetMethodName :: ServerUnregCall -> IO MethodName +serverUnregCallGetMethodName ServerUnregCall{..} = + MethodName <$> C.callDetailsGetMethod callDetails + +serverUnregCallGetHost :: ServerUnregCall -> IO Host +serverUnregCallGetHost ServerUnregCall{..} = + Host <$> C.callDetailsGetHost callDetails + +debugServerUnregCall :: ServerUnregCall -> IO () +#ifdef DEBUG +debugServerUnregCall call@(ServerUnregCall (C.Call ptr) _ _ _) = do + grpcDebug $ "debugServerUnregCall: server call: " ++ (show ptr) + grpcDebug $ "debugServerUnregCall: metadata ptr: " + ++ show (requestMetadataRecvUnreg call) + metadataArr <- peek (requestMetadataRecvUnreg call) + metadata <- C.getAllMetadataArray metadataArr + grpcDebug $ "debugServerUnregCall: metadata received: " ++ (show metadata) + forM_ (parentPtrUnreg call) $ \parentPtr' -> do + grpcDebug $ "debugServerRegCall: parent ptr: " ++ show parentPtr' + (C.Call parent) <- peek parentPtr' + grpcDebug $ "debugServerRegCall: parent: " ++ show parent + grpcDebug $ "debugServerUnregCall: callDetails ptr: " + ++ show (callDetails call) + --TODO: need functions for getting data out of call_details. +#else +{-# INLINE debugServerUnregCall #-} +debugServerUnregCall = const $ return () +#endif + +destroyServerUnregCall :: ServerUnregCall -> IO () +destroyServerUnregCall call@ServerUnregCall{..} = do + grpcDebug "destroyServerUnregCall: entered." + debugServerUnregCall call + grpcDebug $ "Destroying server-side call object: " + ++ show internalServerUnregCall + C.grpcCallDestroy internalServerUnregCall + grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecvUnreg + C.metadataArrayDestroy requestMetadataRecvUnreg + grpcDebug $ "freeing parentPtrUnreg: " ++ show parentPtrUnreg + forM_ parentPtrUnreg free + grpcDebug $ "destroying call details: " ++ show callDetails + C.destroyCallDetails callDetails diff --git a/src/Network/GRPC/LowLevel/Client.hs b/src/Network/GRPC/LowLevel/Client.hs index 3b57381..ffa4479 100644 --- a/src/Network/GRPC/LowLevel/Client.hs +++ b/src/Network/GRPC/LowLevel/Client.hs @@ -2,24 +2,24 @@ module Network.GRPC.LowLevel.Client where -import Control.Exception (bracket, finally) -import Control.Monad (join) -import Data.ByteString (ByteString) -import Foreign.Ptr (nullPtr) -import qualified Network.GRPC.Unsafe as C -import qualified Network.GRPC.Unsafe.Time as C -import qualified Network.GRPC.Unsafe.Constants as C -import qualified Network.GRPC.Unsafe.Op as C +import Control.Exception (bracket, finally) +import Control.Monad (join) +import Data.ByteString (ByteString) +import Foreign.Ptr (nullPtr) +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Constants as C +import qualified Network.GRPC.Unsafe.Op as C +import qualified Network.GRPC.Unsafe.Time as C -import Network.GRPC.LowLevel.GRPC -import Network.GRPC.LowLevel.CompletionQueue import Network.GRPC.LowLevel.Call +import Network.GRPC.LowLevel.CompletionQueue +import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.Op -- | Represents the context needed to perform client-side gRPC operations. data Client = Client {clientChannel :: C.Channel, - clientCQ :: CompletionQueue, - clientConfig :: ClientConfig + clientCQ :: CompletionQueue, + clientConfig :: ClientConfig } -- | Configuration necessary to set up a client. @@ -94,32 +94,6 @@ withClientRegisteredCall client regmethod timeout f = do where logDestroy c = grpcDebug "withClientRegisteredCall: destroying." >> destroyClientCall c --- | Create a call on the client for an endpoint without using the --- method registration machinery. In practice, we'll probably only use the --- registered method version, but we include this for completeness and testing. -clientCreateCall :: Client - -> MethodName - -> TimeoutSeconds - -> IO (Either GRPCIOError ClientCall) -clientCreateCall Client{..} meth timeout = do - let parentCall = C.Call nullPtr - C.withDeadlineSeconds timeout $ \deadline -> do - channelCreateCall clientChannel parentCall C.propagateDefaults - clientCQ meth (clientEndpoint clientConfig) deadline - -withClientCall :: Client - -> MethodName - -> TimeoutSeconds - -> (ClientCall -> IO (Either GRPCIOError a)) - -> IO (Either GRPCIOError a) -withClientCall client method timeout f = do - createResult <- clientCreateCall client method timeout - case createResult of - Left x -> return $ Left x - Right call -> f call `finally` logDestroy call - where logDestroy c = grpcDebug "withClientCall: destroying." - >> destroyClientCall c - data NormalRequestResult = NormalRequestResult { rspBody :: ByteString , initMD :: Maybe MetadataMap -- initial metadata @@ -196,29 +170,6 @@ clientRegisteredRequest client@(Client{..}) rm@(RegisteredMethod{..}) return $ Right $ compileNormalRequestResults (rs ++ rs') _ -> error "Streaming methods not yet implemented." --- | Makes a normal (non-streaming) request without needing to register a method --- first. Probably only useful for testing. TODO: This is preliminary, like --- 'clientRegisteredRequest'. -clientRequest :: Client - -> MethodName - -- ^ Method name, e.g. "/foo" - -> TimeoutSeconds - -- ^ "Number of seconds until request times out" - -> ByteString - -- ^ Request body. - -> MetadataMap - -- ^ Request metadata. - -> IO (Either GRPCIOError NormalRequestResult) -clientRequest client@Client{..} meth timeLimit body meta = - fmap join $ do - withClientCall client meth timeLimit $ \call -> do - let ops = clientNormalRequestOps body meta - results <- runClientOps call clientCQ ops timeLimit - grpcDebug "clientRequest: ops ran." - case results of - Left x -> return $ Left x - Right rs -> return $ Right $ compileNormalRequestResults rs - clientNormalRequestOps :: ByteString -> MetadataMap -> [Op] clientNormalRequestOps body metadata = [OpSendInitialMetadata metadata, diff --git a/src/Network/GRPC/LowLevel/Client/Unregistered.hs b/src/Network/GRPC/LowLevel/Client/Unregistered.hs new file mode 100644 index 0000000..9577da0 --- /dev/null +++ b/src/Network/GRPC/LowLevel/Client/Unregistered.hs @@ -0,0 +1,66 @@ +{-# LANGUAGE RecordWildCards #-} + +module Network.GRPC.LowLevel.Client.Unregistered where + +import Control.Exception (finally) +import Control.Monad (join) +import Data.ByteString (ByteString) +import Foreign.Ptr (nullPtr) +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Constants as C +import qualified Network.GRPC.Unsafe.Time as C + +import Network.GRPC.LowLevel.Call +import Network.GRPC.LowLevel.Client +import Network.GRPC.LowLevel.CompletionQueue +import Network.GRPC.LowLevel.GRPC +import Network.GRPC.LowLevel.Op + +-- | Create a call on the client for an endpoint without using the +-- method registration machinery. In practice, we'll probably only use the +-- registered method version, but we include this for completeness and testing. +clientCreateCall :: Client + -> MethodName + -> TimeoutSeconds + -> IO (Either GRPCIOError ClientCall) +clientCreateCall Client{..} meth timeout = do + let parentCall = C.Call nullPtr + C.withDeadlineSeconds timeout $ \deadline -> do + channelCreateCall clientChannel parentCall C.propagateDefaults + clientCQ meth (clientEndpoint clientConfig) deadline + +withClientCall :: Client + -> MethodName + -> TimeoutSeconds + -> (ClientCall -> IO (Either GRPCIOError a)) + -> IO (Either GRPCIOError a) +withClientCall client method timeout f = do + createResult <- clientCreateCall client method timeout + case createResult of + Left x -> return $ Left x + Right call -> f call `finally` logDestroy call + where logDestroy c = grpcDebug "withClientCall: destroying." + >> destroyClientCall c + +-- | Makes a normal (non-streaming) request without needing to register a method +-- first. Probably only useful for testing. TODO: This is preliminary, like +-- 'clientRegisteredRequest'. +clientRequest :: Client + -> MethodName + -- ^ Method name, e.g. "/foo" + -> TimeoutSeconds + -- ^ "Number of seconds until request times out" + -> ByteString + -- ^ Request body. + -> MetadataMap + -- ^ Request metadata. + -> IO (Either GRPCIOError NormalRequestResult) +clientRequest client@Client{..} meth timeLimit body meta = + fmap join $ do + withClientCall client meth timeLimit $ \call -> do + let ops = clientNormalRequestOps body meta + results <- runClientOps call clientCQ ops timeLimit + grpcDebug "clientRequest: ops ran." + case results of + Left x -> return $ Left x + Right rs -> return $ Right $ compileNormalRequestResults rs diff --git a/src/Network/GRPC/LowLevel/CompletionQueue.hs b/src/Network/GRPC/LowLevel/CompletionQueue.hs index dd14388..2b63978 100644 --- a/src/Network/GRPC/LowLevel/CompletionQueue.hs +++ b/src/Network/GRPC/LowLevel/CompletionQueue.hs @@ -25,25 +25,29 @@ module Network.GRPC.LowLevel.CompletionQueue ) where -import Control.Concurrent (forkIO, threadDelay) -import Control.Concurrent.STM (atomically, check, retry) -import Control.Concurrent.STM.TVar (TVar, modifyTVar', newTVarIO, - readTVar, writeTVar) -import Control.Exception (bracket) -import Data.IORef (IORef, atomicModifyIORef', - newIORef) -import Data.List (intersperse) -import Foreign.Marshal.Alloc (free, malloc) -import Foreign.Ptr (nullPtr, plusPtr) -import Foreign.Storable (peek) -import qualified Network.GRPC.Unsafe as C -import qualified Network.GRPC.Unsafe.Constants as C -import qualified Network.GRPC.Unsafe.Metadata as C -import qualified Network.GRPC.Unsafe.Op as C -import qualified Network.GRPC.Unsafe.Time as C -import System.Timeout (timeout) +import Control.Concurrent (forkIO, threadDelay) +import Control.Concurrent.STM (atomically, check, + retry) +import Control.Concurrent.STM.TVar (TVar, modifyTVar', + newTVarIO, readTVar, + writeTVar) +import Control.Exception (bracket) +import Data.IORef (IORef, + atomicModifyIORef', + newIORef) +import Data.List (intersperse) +import Foreign.Marshal.Alloc (free, malloc) +import Foreign.Ptr (nullPtr, plusPtr) +import Foreign.Storable (peek) +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Constants as C +import qualified Network.GRPC.Unsafe.Metadata as C +import qualified Network.GRPC.Unsafe.Op as C +import qualified Network.GRPC.Unsafe.Time as C +import System.Timeout (timeout) import Network.GRPC.LowLevel.Call +import Network.GRPC.LowLevel.Call.Unregistered import Network.GRPC.LowLevel.GRPC -- NOTE: the concurrency requirements for a CompletionQueue are a little diff --git a/src/Network/GRPC/LowLevel/Op.hs b/src/Network/GRPC/LowLevel/Op.hs index fc39016..b654605 100644 --- a/src/Network/GRPC/LowLevel/Op.hs +++ b/src/Network/GRPC/LowLevel/Op.hs @@ -4,22 +4,23 @@ module Network.GRPC.LowLevel.Op where import Control.Exception -import qualified Data.ByteString as B -import qualified Data.Map.Strict as M -import Data.Maybe (catMaybes) -import Data.String (IsString) -import Foreign.C.String (CString) -import Foreign.C.Types (CInt) -import Foreign.Marshal.Alloc (free, malloc, - mallocBytes) -import Foreign.Ptr (Ptr, nullPtr) -import Foreign.Storable (peek, poke) -import qualified Network.GRPC.Unsafe as C (Call) -import qualified Network.GRPC.Unsafe.ByteBuffer as C -import qualified Network.GRPC.Unsafe.Metadata as C -import qualified Network.GRPC.Unsafe.Op as C +import qualified Data.ByteString as B +import qualified Data.Map.Strict as M +import Data.Maybe (catMaybes) +import Data.String (IsString) +import Foreign.C.String (CString) +import Foreign.C.Types (CInt) +import Foreign.Marshal.Alloc (free, malloc, + mallocBytes) +import Foreign.Ptr (Ptr, nullPtr) +import Foreign.Storable (peek, poke) +import qualified Network.GRPC.Unsafe as C (Call) +import qualified Network.GRPC.Unsafe.ByteBuffer as C +import qualified Network.GRPC.Unsafe.Metadata as C +import qualified Network.GRPC.Unsafe.Op as C import Network.GRPC.LowLevel.Call +import Network.GRPC.LowLevel.Call.Unregistered import Network.GRPC.LowLevel.CompletionQueue import Network.GRPC.LowLevel.GRPC diff --git a/src/Network/GRPC/LowLevel/Server.hs b/src/Network/GRPC/LowLevel/Server.hs index 4c3bc74..6b269c3 100644 --- a/src/Network/GRPC/LowLevel/Server.hs +++ b/src/Network/GRPC/LowLevel/Server.hs @@ -2,41 +2,40 @@ module Network.GRPC.LowLevel.Server where -import Control.Concurrent (threadDelay) -import Control.Exception (bracket, finally) +import Control.Exception (bracket, finally) import Control.Monad -import Data.ByteString (ByteString) -import Foreign.Ptr (nullPtr) -import qualified Network.GRPC.Unsafe as C -import qualified Network.GRPC.Unsafe.Op as C +import Data.ByteString (ByteString) +import Foreign.Ptr (nullPtr) +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Op as C import Network.GRPC.LowLevel.Call -import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue, - TimeoutSeconds, - createCompletionQueue, - pluck, - serverRegisterCompletionQueue, - serverRequestCall, - serverRequestRegisteredCall, - serverShutdownAndNotify, - shutdownCompletionQueue) +import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue, + TimeoutSeconds, + createCompletionQueue, + pluck, + serverRegisterCompletionQueue, + serverRequestCall, + serverRequestRegisteredCall, + serverShutdownAndNotify, + shutdownCompletionQueue) import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.Op -- | Wraps various gRPC state needed to run a server. data Server = Server - { internalServer :: C.Server - , serverCQ :: CompletionQueue + { internalServer :: C.Server + , serverCQ :: CompletionQueue , registeredMethods :: [RegisteredMethod] - , serverConfig :: ServerConfig + , serverConfig :: ServerConfig } -- | Configuration needed to start a server. data ServerConfig = ServerConfig - { host :: Host + { host :: Host -- ^ Name of the host the server is running on. Not sure how this is -- used. Setting to "localhost" works fine in tests. - , port :: Port + , port :: Port -- ^ Port on which to listen for requests. , methodsToRegister :: [(MethodName, GRPCMethodType)] -- ^ List of (method name, method type) tuples specifying all methods to @@ -141,22 +140,6 @@ withServerRegisteredCall server regmethod timeout initMeta f = do where logDestroy c = grpcDebug "withServerRegisteredCall: destroying." >> destroyServerRegCall c -serverCreateUnregCall :: Server -> TimeoutSeconds - -> IO (Either GRPCIOError ServerUnregCall) -serverCreateUnregCall Server{..} timeLimit = - serverRequestCall internalServer serverCQ timeLimit - -withServerUnregCall :: Server -> TimeoutSeconds - -> (ServerUnregCall -> IO (Either GRPCIOError a)) - -> IO (Either GRPCIOError a) -withServerUnregCall server timeout f = do - createResult <- serverCreateUnregCall server timeout - case createResult of - Left x -> return $ Left x - Right call -> f call `finally` logDestroy call - where logDestroy c = grpcDebug "withServerCall: destroying." - >> destroyServerUnregCall c - -- | Sequence of 'Op's needed to receive a normal (non-streaming) call. serverOpsGetNormalCall :: MetadataMap -> [Op] serverOpsGetNormalCall initMetadata = @@ -230,43 +213,3 @@ serverHandleNormalRegisteredCall s@Server{..} rm timeLimit srvMetadata f = do case respOpsResults of Left x -> return $ Left x Right _ -> return $ Right () - --- TODO: This is preliminary. --- We still need to provide the method name to the handler. --- | Handle one unregistered call. -serverHandleNormalCall :: Server -> TimeoutSeconds - -> MetadataMap - -- ^ Initial server metadata. - -> (ByteString -> MetadataMap -> MethodName - -> IO (ByteString, MetadataMap, StatusDetails)) - -- ^ Handler function takes a request body and - -- metadata and returns a response body and metadata. - -> IO (Either GRPCIOError ()) -serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do - withServerUnregCall s timeLimit $ \call -> do - grpcDebug "serverHandleNormalCall: starting batch." - let recvOps = serverOpsGetNormalCall srvMetadata - opResults <- runServerUnregOps call serverCQ recvOps timeLimit - case opResults of - Left x -> do grpcDebug "serverHandleNormalCall: ops failed; aborting" - return $ Left x - Right [OpRecvMessageResult (Just body)] -> do - requestMeta <- serverUnregCallGetMetadata call - grpcDebug $ "got client metadata: " ++ show requestMeta - methodName <- serverUnregCallGetMethodName call - hostName <- serverUnregCallGetHost call - grpcDebug $ "call_details host is: " ++ show hostName - (respBody, respMetadata, details) <- f body requestMeta methodName - let status = C.GrpcStatusOk - let respOps = serverOpsSendNormalResponse - respBody respMetadata status details - respOpsResults <- runServerUnregOps call serverCQ respOps timeLimit - case respOpsResults of - Left x -> do grpcDebug "serverHandleNormalCall: resp failed." - return $ Left x - Right _ -> grpcDebug "serverHandleNormalCall: ops done." - >> return (Right ()) - x -> error $ "impossible pattern match: " ++ show x - -_nowarn_unused :: a -_nowarn_unused = undefined threadDelay diff --git a/src/Network/GRPC/LowLevel/Server/Unregistered.hs b/src/Network/GRPC/LowLevel/Server/Unregistered.hs new file mode 100644 index 0000000..d9bcb0c --- /dev/null +++ b/src/Network/GRPC/LowLevel/Server/Unregistered.hs @@ -0,0 +1,78 @@ +{-# LANGUAGE RecordWildCards #-} + +module Network.GRPC.LowLevel.Server.Unregistered where + +import Control.Exception (bracket, finally) +import Control.Monad +import Data.ByteString (ByteString) +import Foreign.Ptr (nullPtr) +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Op as C + +import Network.GRPC.LowLevel.Call +import Network.GRPC.LowLevel.Call.Unregistered +import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue, + TimeoutSeconds, + createCompletionQueue, + pluck, + serverRegisterCompletionQueue, + serverRequestCall, + serverRequestRegisteredCall, + serverShutdownAndNotify, + shutdownCompletionQueue) +import Network.GRPC.LowLevel.GRPC +import Network.GRPC.LowLevel.Op +import Network.GRPC.LowLevel.Server + +serverCreateUnregCall :: Server -> TimeoutSeconds + -> IO (Either GRPCIOError ServerUnregCall) +serverCreateUnregCall Server{..} timeLimit = + serverRequestCall internalServer serverCQ timeLimit + +withServerUnregCall :: Server -> TimeoutSeconds + -> (ServerUnregCall -> IO (Either GRPCIOError a)) + -> IO (Either GRPCIOError a) +withServerUnregCall server timeout f = do + createResult <- serverCreateUnregCall server timeout + case createResult of + Left x -> return $ Left x + Right call -> f call `finally` logDestroy call + where logDestroy c = grpcDebug "withServerCall: destroying." + >> destroyServerUnregCall c + +-- TODO: This is preliminary. +-- We still need to provide the method name to the handler. +-- | Handle one unregistered call. +serverHandleNormalCall :: Server -> TimeoutSeconds + -> MetadataMap + -- ^ Initial server metadata. + -> (ByteString -> MetadataMap -> MethodName + -> IO (ByteString, MetadataMap, StatusDetails)) + -- ^ Handler function takes a request body and + -- metadata and returns a response body and metadata. + -> IO (Either GRPCIOError ()) +serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do + withServerUnregCall s timeLimit $ \call -> do + grpcDebug "serverHandleNormalCall: starting batch." + let recvOps = serverOpsGetNormalCall srvMetadata + opResults <- runServerUnregOps call serverCQ recvOps timeLimit + case opResults of + Left x -> do grpcDebug "serverHandleNormalCall: ops failed; aborting" + return $ Left x + Right [OpRecvMessageResult (Just body)] -> do + requestMeta <- serverUnregCallGetMetadata call + grpcDebug $ "got client metadata: " ++ show requestMeta + methodName <- serverUnregCallGetMethodName call + hostName <- serverUnregCallGetHost call + grpcDebug $ "call_details host is: " ++ show hostName + (respBody, respMetadata, details) <- f body requestMeta methodName + let status = C.GrpcStatusOk + let respOps = serverOpsSendNormalResponse + respBody respMetadata status details + respOpsResults <- runServerUnregOps call serverCQ respOps timeLimit + case respOpsResults of + Left x -> do grpcDebug "serverHandleNormalCall: resp failed." + return $ Left x + Right _ -> grpcDebug "serverHandleNormalCall: ops done." + >> return (Right ()) + x -> error $ "impossible pattern match: " ++ show x From 27a9a6283a0a1946c727b53d2a106b2812e2bd5f Mon Sep 17 00:00:00 2001 From: Joel Stanley Date: Wed, 8 Jun 2016 11:41:58 -0500 Subject: [PATCH 3/9] Rename unreg operations in Network.GRPC.LowLevel.Call.Unregistered --- examples/echo/echo-server/Main.hs | 20 ++--- grpc-haskell.cabal | 2 +- src/Network/GRPC/LowLevel.hs | 4 - .../GRPC/LowLevel/Call/Unregistered.hs | 77 +++++++++---------- src/Network/GRPC/LowLevel/CompletionQueue.hs | 12 +-- src/Network/GRPC/LowLevel/Op.hs | 6 +- .../GRPC/LowLevel/Server/Unregistered.hs | 33 +++----- 7 files changed, 69 insertions(+), 85 deletions(-) diff --git a/examples/echo/echo-server/Main.hs b/examples/echo/echo-server/Main.hs index 30d8b2b..3a28d25 100644 --- a/examples/echo/echo-server/Main.hs +++ b/examples/echo/echo-server/Main.hs @@ -1,18 +1,20 @@ +{-# LANGUAGE OverloadedLists #-} {-# LANGUAGE OverloadedStrings #-} +{-# OPTIONS_GHC -fno-warn-missing-signatures #-} +{-# OPTIONS_GHC -fno-warn-unused-binds #-} -import Control.Concurrent.Async (async, wait) -import Control.Monad (forever) -import Data.ByteString (ByteString) -import qualified Data.Map as M -import Network.GRPC.LowLevel - +import Control.Concurrent.Async (async, wait) +import Control.Monad (forever) +import Data.ByteString (ByteString) +import Network.GRPC.LowLevel +import qualified Network.GRPC.LowLevel.Server.Unregistered as U serverMeta :: MetadataMap -serverMeta = M.fromList [("test_meta", "test_meta_value")] +serverMeta = [("test_meta", "test_meta_value")] handler :: ByteString -> MetadataMap -> MethodName -> IO (ByteString, MetadataMap, StatusDetails) -handler reqBody reqMeta method = do +handler reqBody _reqMeta _method = do --putStrLn $ "Got request for method: " ++ show method --putStrLn $ "Got metadata: " ++ show reqMeta return (reqBody, serverMeta, StatusDetails "") @@ -20,7 +22,7 @@ handler reqBody reqMeta method = do unregMain :: IO () unregMain = withGRPC $ \grpc -> do withServer grpc (ServerConfig "localhost" 50051 []) $ \server -> forever $ do - result <- serverHandleNormalCall server 15 serverMeta handler + result <- U.serverHandleNormalCall server 15 serverMeta handler case result of Left x -> putStrLn $ "handle call result error: " ++ show x Right _ -> return () diff --git a/grpc-haskell.cabal b/grpc-haskell.cabal index 0386528..eb76638 100644 --- a/grpc-haskell.cabal +++ b/grpc-haskell.cabal @@ -42,12 +42,12 @@ library Network.GRPC.Unsafe.Op Network.GRPC.Unsafe Network.GRPC.LowLevel + Network.GRPC.LowLevel.Server.Unregistered other-modules: Network.GRPC.LowLevel.CompletionQueue Network.GRPC.LowLevel.GRPC Network.GRPC.LowLevel.Op Network.GRPC.LowLevel.Server - Network.GRPC.LowLevel.Server.Unregistered Network.GRPC.LowLevel.Call Network.GRPC.LowLevel.Call.Unregistered Network.GRPC.LowLevel.Client diff --git a/src/Network/GRPC/LowLevel.hs b/src/Network/GRPC/LowLevel.hs index 26b8f52..af91fbb 100644 --- a/src/Network/GRPC/LowLevel.hs +++ b/src/Network/GRPC/LowLevel.hs @@ -30,12 +30,9 @@ GRPC , ServerConfig(..) , Server , ServerRegCall -, ServerUnregCall , registeredMethods , withServer , serverHandleNormalRegisteredCall -, serverHandleNormalCall -, withServerUnregCall , withServerRegisteredCall -- * Client @@ -61,7 +58,6 @@ GRPC import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.Server -import Network.GRPC.LowLevel.Server.Unregistered import Network.GRPC.LowLevel.CompletionQueue import Network.GRPC.LowLevel.Op import Network.GRPC.LowLevel.Client diff --git a/src/Network/GRPC/LowLevel/Call/Unregistered.hs b/src/Network/GRPC/LowLevel/Call/Unregistered.hs index de6fb36..60307ef 100644 --- a/src/Network/GRPC/LowLevel/Call/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/Call/Unregistered.hs @@ -3,16 +3,12 @@ module Network.GRPC.LowLevel.Call.Unregistered where import Control.Monad -import Data.ByteString (ByteString) -import Data.String (IsString) import Foreign.Marshal.Alloc (free) -import Foreign.Ptr (Ptr, nullPtr) +import Foreign.Ptr (Ptr) import Foreign.Storable (peek) import qualified Network.GRPC.Unsafe as C -import qualified Network.GRPC.Unsafe.ByteBuffer as C import qualified Network.GRPC.Unsafe.Metadata as C -import qualified Network.GRPC.Unsafe.Time as C import Network.GRPC.LowLevel.Call import Network.GRPC.LowLevel.GRPC (MetadataMap, grpcDebug) @@ -20,56 +16,57 @@ import Network.GRPC.LowLevel.GRPC (MetadataMap, grpcDebug) -- | Represents one unregistered GRPC call on the server. -- Contains pointers to all the C state needed to respond to an unregistered -- call. -data ServerUnregCall = ServerUnregCall - {internalServerUnregCall :: C.Call, - requestMetadataRecvUnreg :: Ptr C.MetadataArray, - parentPtrUnreg :: Maybe (Ptr C.Call), - callDetails :: C.CallDetails} +data ServerCall = ServerCall + { internalServerCall :: C.Call + , requestMetadataRecv :: Ptr C.MetadataArray + , parentPtr :: Maybe (Ptr C.Call) + , callDetails :: C.CallDetails + } -serverUnregCallGetMetadata :: ServerUnregCall -> IO MetadataMap -serverUnregCallGetMetadata ServerUnregCall{..} = do - marray <- peek requestMetadataRecvUnreg +serverCallGetMetadata :: ServerCall -> IO MetadataMap +serverCallGetMetadata ServerCall{..} = do + marray <- peek requestMetadataRecv C.getAllMetadataArray marray -serverUnregCallGetMethodName :: ServerUnregCall -> IO MethodName -serverUnregCallGetMethodName ServerUnregCall{..} = +serverCallGetMethodName :: ServerCall -> IO MethodName +serverCallGetMethodName ServerCall{..} = MethodName <$> C.callDetailsGetMethod callDetails -serverUnregCallGetHost :: ServerUnregCall -> IO Host -serverUnregCallGetHost ServerUnregCall{..} = +serverCallGetHost :: ServerCall -> IO Host +serverCallGetHost ServerCall{..} = Host <$> C.callDetailsGetHost callDetails -debugServerUnregCall :: ServerUnregCall -> IO () +debugServerCall :: ServerCall -> IO () #ifdef DEBUG -debugServerUnregCall call@(ServerUnregCall (C.Call ptr) _ _ _) = do - grpcDebug $ "debugServerUnregCall: server call: " ++ (show ptr) - grpcDebug $ "debugServerUnregCall: metadata ptr: " - ++ show (requestMetadataRecvUnreg call) - metadataArr <- peek (requestMetadataRecvUnreg call) +debugServerCall call@(ServerCall (C.Call ptr) _ _ _) = do + grpcDebug $ "debugServerCall(U): server call: " ++ (show ptr) + grpcDebug $ "debugServerCall(U): metadata ptr: " + ++ show (requestMetadataRecv call) + metadataArr <- peek (requestMetadataRecv call) metadata <- C.getAllMetadataArray metadataArr - grpcDebug $ "debugServerUnregCall: metadata received: " ++ (show metadata) - forM_ (parentPtrUnreg call) $ \parentPtr' -> do - grpcDebug $ "debugServerRegCall: parent ptr: " ++ show parentPtr' + grpcDebug $ "debugServerCall(U): metadata received: " ++ (show metadata) + forM_ (parentPtr call) $ \parentPtr' -> do + grpcDebug $ "debugServerCall(U): parent ptr: " ++ show parentPtr' (C.Call parent) <- peek parentPtr' - grpcDebug $ "debugServerRegCall: parent: " ++ show parent - grpcDebug $ "debugServerUnregCall: callDetails ptr: " + grpcDebug $ "debugServerCall(U): parent: " ++ show parent + grpcDebug $ "debugServerCall(U): callDetails ptr: " ++ show (callDetails call) --TODO: need functions for getting data out of call_details. #else -{-# INLINE debugServerUnregCall #-} -debugServerUnregCall = const $ return () +{-# INLINE debugServerCall #-} +debugServerCall = const $ return () #endif -destroyServerUnregCall :: ServerUnregCall -> IO () -destroyServerUnregCall call@ServerUnregCall{..} = do - grpcDebug "destroyServerUnregCall: entered." - debugServerUnregCall call +destroyServerCall :: ServerCall -> IO () +destroyServerCall call@ServerCall{..} = do + grpcDebug "destroyServerCall(U): entered." + debugServerCall call grpcDebug $ "Destroying server-side call object: " - ++ show internalServerUnregCall - C.grpcCallDestroy internalServerUnregCall - grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecvUnreg - C.metadataArrayDestroy requestMetadataRecvUnreg - grpcDebug $ "freeing parentPtrUnreg: " ++ show parentPtrUnreg - forM_ parentPtrUnreg free + ++ show internalServerCall + C.grpcCallDestroy internalServerCall + grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecv + C.metadataArrayDestroy requestMetadataRecv + grpcDebug $ "freeing parentPtr: " ++ show parentPtr + forM_ parentPtr free grpcDebug $ "destroying call details: " ++ show callDetails C.destroyCallDetails callDetails diff --git a/src/Network/GRPC/LowLevel/CompletionQueue.hs b/src/Network/GRPC/LowLevel/CompletionQueue.hs index 2b63978..44ebddb 100644 --- a/src/Network/GRPC/LowLevel/CompletionQueue.hs +++ b/src/Network/GRPC/LowLevel/CompletionQueue.hs @@ -47,7 +47,7 @@ import qualified Network.GRPC.Unsafe.Time as C import System.Timeout (timeout) import Network.GRPC.LowLevel.Call -import Network.GRPC.LowLevel.Call.Unregistered +import qualified Network.GRPC.LowLevel.Call.Unregistered as U import Network.GRPC.LowLevel.GRPC -- NOTE: the concurrency requirements for a CompletionQueue are a little @@ -310,7 +310,7 @@ serverRequestRegisteredCall free bbPtr serverRequestCall :: C.Server -> CompletionQueue -> TimeoutSeconds - -> IO (Either GRPCIOError ServerUnregCall) + -> IO (Either GRPCIOError U.ServerCall) serverRequestCall server cq@CompletionQueue{..} timeLimit = withPermission Push cq $ do callPtr <- malloc @@ -337,10 +337,10 @@ serverRequestCall server cq@CompletionQueue{..} timeLimit = return $ Left x Right () -> do rawCall <- peek callPtr - let call = ServerUnregCall rawCall - metadataArrayPtr - Nothing - callDetails + let call = U.ServerCall rawCall + metadataArrayPtr + Nothing + callDetails return $ Right call --TODO: the gRPC library appears to hold onto these pointers for a random diff --git a/src/Network/GRPC/LowLevel/Op.hs b/src/Network/GRPC/LowLevel/Op.hs index b654605..9d71bb1 100644 --- a/src/Network/GRPC/LowLevel/Op.hs +++ b/src/Network/GRPC/LowLevel/Op.hs @@ -20,7 +20,7 @@ import qualified Network.GRPC.Unsafe.Metadata as C import qualified Network.GRPC.Unsafe.Op as C import Network.GRPC.LowLevel.Call -import Network.GRPC.LowLevel.Call.Unregistered +import Network.GRPC.LowLevel.Call.Unregistered as U import Network.GRPC.LowLevel.CompletionQueue import Network.GRPC.LowLevel.GRPC @@ -243,12 +243,12 @@ runServerRegOps :: ServerRegCall -> IO (Either GRPCIOError [OpRecvResult]) runServerRegOps = runOps . internalServerRegCall -runServerUnregOps :: ServerUnregCall +runServerUnregOps :: U.ServerCall -> CompletionQueue -> [Op] -> TimeoutSeconds -> IO (Either GRPCIOError [OpRecvResult]) -runServerUnregOps = runOps . internalServerUnregCall +runServerUnregOps = runOps . U.internalServerCall -- | Like 'runServerOps', but for client-side calls. runClientOps :: ClientCall diff --git a/src/Network/GRPC/LowLevel/Server/Unregistered.hs b/src/Network/GRPC/LowLevel/Server/Unregistered.hs index d9bcb0c..e5f630c 100644 --- a/src/Network/GRPC/LowLevel/Server/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/Server/Unregistered.hs @@ -2,35 +2,24 @@ module Network.GRPC.LowLevel.Server.Unregistered where -import Control.Exception (bracket, finally) -import Control.Monad +import Control.Exception (finally) import Data.ByteString (ByteString) -import Foreign.Ptr (nullPtr) -import qualified Network.GRPC.Unsafe as C -import qualified Network.GRPC.Unsafe.Op as C - -import Network.GRPC.LowLevel.Call +import Network.GRPC.LowLevel.Call (MethodName) import Network.GRPC.LowLevel.Call.Unregistered -import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue, - TimeoutSeconds, - createCompletionQueue, - pluck, - serverRegisterCompletionQueue, - serverRequestCall, - serverRequestRegisteredCall, - serverShutdownAndNotify, - shutdownCompletionQueue) +import Network.GRPC.LowLevel.CompletionQueue (TimeoutSeconds, + serverRequestCall) import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.Op import Network.GRPC.LowLevel.Server +import qualified Network.GRPC.Unsafe.Op as C serverCreateUnregCall :: Server -> TimeoutSeconds - -> IO (Either GRPCIOError ServerUnregCall) + -> IO (Either GRPCIOError ServerCall) serverCreateUnregCall Server{..} timeLimit = serverRequestCall internalServer serverCQ timeLimit withServerUnregCall :: Server -> TimeoutSeconds - -> (ServerUnregCall -> IO (Either GRPCIOError a)) + -> (ServerCall -> IO (Either GRPCIOError a)) -> IO (Either GRPCIOError a) withServerUnregCall server timeout f = do createResult <- serverCreateUnregCall server timeout @@ -38,7 +27,7 @@ withServerUnregCall server timeout f = do Left x -> return $ Left x Right call -> f call `finally` logDestroy call where logDestroy c = grpcDebug "withServerCall: destroying." - >> destroyServerUnregCall c + >> destroyServerCall c -- TODO: This is preliminary. -- We still need to provide the method name to the handler. @@ -60,10 +49,10 @@ serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do Left x -> do grpcDebug "serverHandleNormalCall: ops failed; aborting" return $ Left x Right [OpRecvMessageResult (Just body)] -> do - requestMeta <- serverUnregCallGetMetadata call + requestMeta <- serverCallGetMetadata call grpcDebug $ "got client metadata: " ++ show requestMeta - methodName <- serverUnregCallGetMethodName call - hostName <- serverUnregCallGetHost call + methodName <- serverCallGetMethodName call + hostName <- serverCallGetHost call grpcDebug $ "call_details host is: " ++ show hostName (respBody, respMetadata, details) <- f body requestMeta methodName let status = C.GrpcStatusOk From eb1040d07b8044bb4c74bfa5c6bb3ac199cf74b4 Mon Sep 17 00:00:00 2001 From: Joel Stanley Date: Wed, 8 Jun 2016 11:50:57 -0500 Subject: [PATCH 4/9] Rename unreg operations in Network.GRPC.LowLevel.Server.Unregistered --- .../GRPC/LowLevel/Server/Unregistered.hs | 41 ++++++++++--------- tests/LowLevelTests.hs | 16 +++++--- 2 files changed, 31 insertions(+), 26 deletions(-) diff --git a/src/Network/GRPC/LowLevel/Server/Unregistered.hs b/src/Network/GRPC/LowLevel/Server/Unregistered.hs index e5f630c..0c8273f 100644 --- a/src/Network/GRPC/LowLevel/Server/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/Server/Unregistered.hs @@ -13,40 +13,41 @@ import Network.GRPC.LowLevel.Op import Network.GRPC.LowLevel.Server import qualified Network.GRPC.Unsafe.Op as C -serverCreateUnregCall :: Server -> TimeoutSeconds +serverCreateCall :: Server -> TimeoutSeconds -> IO (Either GRPCIOError ServerCall) -serverCreateUnregCall Server{..} timeLimit = +serverCreateCall Server{..} timeLimit = serverRequestCall internalServer serverCQ timeLimit -withServerUnregCall :: Server -> TimeoutSeconds +withServerCall :: Server -> TimeoutSeconds -> (ServerCall -> IO (Either GRPCIOError a)) -> IO (Either GRPCIOError a) -withServerUnregCall server timeout f = do - createResult <- serverCreateUnregCall server timeout +withServerCall server timeout f = do + createResult <- serverCreateCall server timeout case createResult of Left x -> return $ Left x Right call -> f call `finally` logDestroy call where logDestroy c = grpcDebug "withServerCall: destroying." >> destroyServerCall c --- TODO: This is preliminary. --- We still need to provide the method name to the handler. +-- | A handler for an unregistered server call; bytestring arguments are the +-- request body and response body respectively. +type ServerHandler + = ByteString -> MetadataMap -> MethodName + -> IO (ByteString, MetadataMap, StatusDetails) + -- | Handle one unregistered call. -serverHandleNormalCall :: Server -> TimeoutSeconds - -> MetadataMap - -- ^ Initial server metadata. - -> (ByteString -> MetadataMap -> MethodName - -> IO (ByteString, MetadataMap, StatusDetails)) - -- ^ Handler function takes a request body and - -- metadata and returns a response body and metadata. - -> IO (Either GRPCIOError ()) +serverHandleNormalCall :: Server + -> TimeoutSeconds + -> MetadataMap -- ^ Initial server metadata. + -> ServerHandler + -> IO (Either GRPCIOError ()) serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do - withServerUnregCall s timeLimit $ \call -> do - grpcDebug "serverHandleNormalCall: starting batch." + withServerCall s timeLimit $ \call -> do + grpcDebug "serverHandleCall(U): starting batch." let recvOps = serverOpsGetNormalCall srvMetadata opResults <- runServerUnregOps call serverCQ recvOps timeLimit case opResults of - Left x -> do grpcDebug "serverHandleNormalCall: ops failed; aborting" + Left x -> do grpcDebug "serverHandleNormalCall(U): ops failed; aborting" return $ Left x Right [OpRecvMessageResult (Just body)] -> do requestMeta <- serverCallGetMetadata call @@ -60,8 +61,8 @@ serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do respBody respMetadata status details respOpsResults <- runServerUnregOps call serverCQ respOps timeLimit case respOpsResults of - Left x -> do grpcDebug "serverHandleNormalCall: resp failed." + Left x -> do grpcDebug "serverHandleNormalCall(U): resp failed." return $ Left x - Right _ -> grpcDebug "serverHandleNormalCall: ops done." + Right _ -> grpcDebug "serverHandleNormalCall(U): ops done." >> return (Right ()) x -> error $ "impossible pattern match: " ++ show x diff --git a/tests/LowLevelTests.hs b/tests/LowLevelTests.hs index 361bf33..2e0032c 100644 --- a/tests/LowLevelTests.hs +++ b/tests/LowLevelTests.hs @@ -5,15 +5,19 @@ module LowLevelTests (lowLevelTests) where -import Control.Concurrent (threadDelay) +import Control.Concurrent (threadDelay) import Control.Concurrent.Async import Control.Monad -import Data.ByteString (ByteString) -import qualified Data.Map as M +import Data.ByteString (ByteString) +import qualified Data.Map as M import Network.GRPC.LowLevel +import Network.GRPC.LowLevel.Server.Unregistered as U import Test.Tasty -import Test.Tasty.HUnit as HU (Assertion, assertEqual, - assertFailure, testCase, (@?=)) +import Test.Tasty.HUnit as HU (Assertion, + assertEqual, + assertFailure, + testCase, + (@?=)) lowLevelTests :: TestTree lowLevelTests = testGroup "Unit tests of low-level Haskell library" @@ -63,7 +67,7 @@ testServerCreateDestroy = testServerCall :: TestTree testServerCall = serverOnlyTest "create/destroy call" [] $ \s -> do - r <- withServerUnregCall s 1 $ const $ return $ Right () + r <- U.withServerCall s 1 $ const $ return $ Right () r @?= Left GRPCIOTimeout testServerTimeoutNoClient :: TestTree From e8d3e6450e9e4fc4d514f089fac0a65a93b07fff Mon Sep 17 00:00:00 2001 From: Joel Stanley Date: Wed, 8 Jun 2016 12:45:47 -0500 Subject: [PATCH 5/9] Split up CompletionQueue into CompletionQueue.{Unregistered,Internal} and add Unregistered namespace for Op --- grpc-haskell.cabal | 3 + src/Network/GRPC/LowLevel.hs | 2 - .../GRPC/LowLevel/Client/Unregistered.hs | 21 +- src/Network/GRPC/LowLevel/CompletionQueue.hs | 203 +----------------- .../GRPC/LowLevel/CompletionQueue/Internal.hs | 134 ++++++++++++ .../LowLevel/CompletionQueue/Unregistered.hs | 78 +++++++ src/Network/GRPC/LowLevel/GRPC.hs | 10 +- src/Network/GRPC/LowLevel/Op.hs | 13 +- src/Network/GRPC/LowLevel/Op/Unregistered.hs | 13 ++ src/Network/GRPC/LowLevel/Server.hs | 1 - .../GRPC/LowLevel/Server/Unregistered.hs | 21 +- 11 files changed, 261 insertions(+), 238 deletions(-) create mode 100644 src/Network/GRPC/LowLevel/CompletionQueue/Internal.hs create mode 100644 src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs create mode 100644 src/Network/GRPC/LowLevel/Op/Unregistered.hs diff --git a/grpc-haskell.cabal b/grpc-haskell.cabal index eb76638..6375ae2 100644 --- a/grpc-haskell.cabal +++ b/grpc-haskell.cabal @@ -45,8 +45,11 @@ library Network.GRPC.LowLevel.Server.Unregistered other-modules: Network.GRPC.LowLevel.CompletionQueue + Network.GRPC.LowLevel.CompletionQueue.Internal + Network.GRPC.LowLevel.CompletionQueue.Unregistered Network.GRPC.LowLevel.GRPC Network.GRPC.LowLevel.Op + Network.GRPC.LowLevel.Op.Unregistered Network.GRPC.LowLevel.Server Network.GRPC.LowLevel.Call Network.GRPC.LowLevel.Call.Unregistered diff --git a/src/Network/GRPC/LowLevel.hs b/src/Network/GRPC/LowLevel.hs index af91fbb..7bab209 100644 --- a/src/Network/GRPC/LowLevel.hs +++ b/src/Network/GRPC/LowLevel.hs @@ -50,7 +50,6 @@ GRPC -- * Ops , runClientOps , runServerRegOps -, runServerUnregOps , Op(..) , OpRecvResult(..) @@ -63,7 +62,6 @@ import Network.GRPC.LowLevel.Op import Network.GRPC.LowLevel.Client import Network.GRPC.LowLevel.Client.Unregistered import Network.GRPC.LowLevel.Call -import Network.GRPC.LowLevel.Call.Unregistered import Network.GRPC.Unsafe (ConnectivityState(..)) import Network.GRPC.Unsafe.Op (StatusCode(..)) diff --git a/src/Network/GRPC/LowLevel/Client/Unregistered.hs b/src/Network/GRPC/LowLevel/Client/Unregistered.hs index 9577da0..7f5fa33 100644 --- a/src/Network/GRPC/LowLevel/Client/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/Client/Unregistered.hs @@ -2,17 +2,18 @@ module Network.GRPC.LowLevel.Client.Unregistered where -import Control.Exception (finally) -import Control.Monad (join) -import Data.ByteString (ByteString) -import Foreign.Ptr (nullPtr) -import qualified Network.GRPC.Unsafe as C -import qualified Network.GRPC.Unsafe.Constants as C -import qualified Network.GRPC.Unsafe.Time as C +import Control.Exception (finally) +import Control.Monad (join) +import Data.ByteString (ByteString) +import Foreign.Ptr (nullPtr) +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Constants as C +import qualified Network.GRPC.Unsafe.Time as C import Network.GRPC.LowLevel.Call import Network.GRPC.LowLevel.Client -import Network.GRPC.LowLevel.CompletionQueue +import Network.GRPC.LowLevel.CompletionQueue (TimeoutSeconds) +import qualified Network.GRPC.LowLevel.CompletionQueue.Unregistered as U import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.Op @@ -26,8 +27,8 @@ clientCreateCall :: Client clientCreateCall Client{..} meth timeout = do let parentCall = C.Call nullPtr C.withDeadlineSeconds timeout $ \deadline -> do - channelCreateCall clientChannel parentCall C.propagateDefaults - clientCQ meth (clientEndpoint clientConfig) deadline + U.channelCreateCall clientChannel parentCall C.propagateDefaults + clientCQ meth (clientEndpoint clientConfig) deadline withClientCall :: Client -> MethodName diff --git a/src/Network/GRPC/LowLevel/CompletionQueue.hs b/src/Network/GRPC/LowLevel/CompletionQueue.hs index 44ebddb..42aaaff 100644 --- a/src/Network/GRPC/LowLevel/CompletionQueue.hs +++ b/src/Network/GRPC/LowLevel/CompletionQueue.hs @@ -14,30 +14,23 @@ module Network.GRPC.LowLevel.CompletionQueue , pluck , startBatch , channelCreateRegisteredCall - , channelCreateCall , TimeoutSeconds , isEventSuccessful , serverRegisterCompletionQueue , serverShutdownAndNotify , serverRequestRegisteredCall - , serverRequestCall , newTag ) where -import Control.Concurrent (forkIO, threadDelay) -import Control.Concurrent.STM (atomically, check, - retry) -import Control.Concurrent.STM.TVar (TVar, modifyTVar', - newTVarIO, readTVar, +import Control.Concurrent (forkIO) +import Control.Concurrent.STM (atomically, check) +import Control.Concurrent.STM.TVar (newTVarIO, readTVar, writeTVar) import Control.Exception (bracket) -import Data.IORef (IORef, - atomicModifyIORef', - newIORef) +import Data.IORef (newIORef) import Data.List (intersperse) import Foreign.Marshal.Alloc (free, malloc) -import Foreign.Ptr (nullPtr, plusPtr) import Foreign.Storable (peek) import qualified Network.GRPC.Unsafe as C import qualified Network.GRPC.Unsafe.Constants as C @@ -47,100 +40,8 @@ import qualified Network.GRPC.Unsafe.Time as C import System.Timeout (timeout) import Network.GRPC.LowLevel.Call -import qualified Network.GRPC.LowLevel.Call.Unregistered as U import Network.GRPC.LowLevel.GRPC - --- NOTE: the concurrency requirements for a CompletionQueue are a little --- complicated. There are two read operations: next and pluck. We can either --- call next on a CQ or call pluck up to 'maxCompletionQueuePluckers' times --- concurrently, but we can't mix next and pluck calls. Fortunately, we only --- need to use next when we are shutting down the queue. Thus, we do two things --- to shut down: --- 1. Set the shuttingDown 'TVar' to 'True'. When this is set, no new pluck --- calls will be allowed to start. --- 2. Wait until no threads are plucking, as counted by 'currentPluckers'. --- This logic can be seen in 'pluck' and 'shutdownCompletionQueue'. - --- NOTE: There is one more possible race condition: pushing work onto the queue --- after we begin to shut down. --- Solution: another counter, which must reach zero before the shutdown --- can start. - --- TODO: 'currentPushers' currently imposes an arbitrary limit on the number of --- concurrent pushers to the CQ, but I don't know what the limit should be set --- to. I haven't found any documentation that suggests there is a limit imposed --- by the gRPC library, but there might be. Need to investigate further. - --- | Wraps the state necessary to use a gRPC completion queue. Completion queues --- are used to wait for batches gRPC operations ('Op's) to finish running, as --- well as wait for various other operations, such as server shutdown, pinging, --- checking to see if we've been disconnected, and so forth. -data CompletionQueue = CompletionQueue {unsafeCQ :: C.CompletionQueue, - -- ^ All access to this field must be - -- guarded by a check of 'shuttingDown'. - currentPluckers :: TVar Int, - -- ^ Used to limit the number of - -- concurrent calls to pluck on this - -- queue. - -- The max value is set by gRPC in - -- 'C.maxCompletionQueuePluckers' - currentPushers :: TVar Int, - -- ^ Used to prevent new work from - -- being pushed onto the queue when - -- the queue begins to shut down. - shuttingDown :: TVar Bool, - -- ^ Used to prevent new pluck calls on - -- the queue when the queue begins to - -- shut down. - nextTag :: IORef Int - -- ^ Used to supply unique tags for work - -- items pushed onto the queue. - } - --- | Create a new 'C.Tag' for identifying work items on the 'CompletionQueue'. --- This will eventually wrap around after reaching @maxBound :: Int@, but from a --- practical perspective, that should be safe. -newTag :: CompletionQueue -> IO C.Tag -newTag CompletionQueue{..} = do - i <- atomicModifyIORef' nextTag (\i -> (i+1,i)) - return $ C.Tag $ plusPtr nullPtr i - -maxWorkPushers :: Int -maxWorkPushers = 100 --TODO: figure out what this should be. - -data CQOpType = Push | Pluck deriving (Show, Eq, Enum) - -getCount :: CQOpType -> CompletionQueue -> TVar Int -getCount Push = currentPushers -getCount Pluck = currentPluckers - -getLimit :: CQOpType -> Int -getLimit Push = maxWorkPushers -getLimit Pluck = C.maxCompletionQueuePluckers - --- | Safely brackets an operation that pushes work onto or plucks results from --- the given 'CompletionQueue'. -withPermission :: CQOpType - -> CompletionQueue - -> IO (Either GRPCIOError a) - -> IO (Either GRPCIOError a) -withPermission op cq f = - bracket acquire release doOp - where acquire = atomically $ do - isShuttingDown <- readTVar (shuttingDown cq) - if isShuttingDown - then return False - else do currCount <- readTVar $ getCount op cq - if currCount < getLimit op - then modifyTVar' (getCount op cq) (+1) >> return True - else retry - doOp gotResource = if gotResource - then f - else return $ Left GRPCIOShutdown - release gotResource = - if gotResource - then atomically $ modifyTVar' (getCount op cq) (subtract 1) - else return () +import Network.GRPC.LowLevel.CompletionQueue.Internal withCompletionQueue :: GRPC -> (CompletionQueue -> IO a) -> IO a withCompletionQueue grpc = bracket (createCompletionQueue grpc) @@ -155,35 +56,6 @@ createCompletionQueue _ = do nextTag <- newIORef minBound return $ CompletionQueue{..} -type TimeoutSeconds = Int - --- | Translate 'C.Event' to an error. The caller is responsible for ensuring --- that the event actually corresponds to an error condition; a successful event --- will be translated to a 'GRPCIOUnknownError'. -eventToError :: C.Event -> (Either GRPCIOError a) -eventToError (C.Event C.QueueShutdown _ _) = Left GRPCIOShutdown -eventToError (C.Event C.QueueTimeout _ _) = Left GRPCIOTimeout -eventToError _ = Left GRPCIOUnknownError - --- | Returns true iff the given grpc_event was a success. -isEventSuccessful :: C.Event -> Bool -isEventSuccessful (C.Event C.OpComplete True _) = True -isEventSuccessful _ = False - --- | Waits for the given number of seconds for the given tag to appear on the --- completion queue. Throws 'GRPCIOShutdown' if the completion queue is shutting --- down and cannot handle new requests. -pluck :: CompletionQueue -> C.Tag -> TimeoutSeconds - -> IO (Either GRPCIOError ()) -pluck cq@CompletionQueue{..} tag waitSeconds = do - grpcDebug $ "pluck: called with tag: " ++ show tag - ++ " and wait: " ++ show waitSeconds - withPermission Pluck cq $ do - C.withDeadlineSeconds waitSeconds $ \deadline -> do - ev <- C.grpcCompletionQueuePluck unsafeCQ tag deadline C.reserved - grpcDebug $ "pluck: finished. Event: " ++ show ev - return $ if isEventSuccessful ev then Right () else eventToError ev - -- TODO: I'm thinking it might be easier to use 'Either' uniformly everywhere -- even when it's isomorphic to 'Maybe'. If that doesn't turn out to be the -- case, switch these to 'Maybe'. @@ -241,20 +113,6 @@ channelCreateRegisteredCall handle deadline C.reserved return $ Right $ ClientCall call -channelCreateCall :: C.Channel - -> C.Call - -> C.PropagationMask - -> CompletionQueue - -> MethodName - -> Endpoint - -> C.CTimeSpecPtr - -> IO (Either GRPCIOError ClientCall) -channelCreateCall chan parent mask cq@CompletionQueue{..} meth endpt deadline = - withPermission Push cq $ do - call <- C.grpcChannelCreateCall chan parent mask unsafeCQ - (unMethodName meth) (unEndpoint endpt) deadline C.reserved - return $ Right $ ClientCall call - -- | Create the call object to handle a registered call. serverRequestRegisteredCall :: C.Server -> CompletionQueue -> TimeoutSeconds -> RegisteredMethod -> MetadataMap @@ -309,54 +167,6 @@ serverRequestRegisteredCall C.metadataArrayDestroy metadataArrayPtr free bbPtr -serverRequestCall :: C.Server -> CompletionQueue -> TimeoutSeconds - -> IO (Either GRPCIOError U.ServerCall) -serverRequestCall server cq@CompletionQueue{..} timeLimit = - withPermission Push cq $ do - callPtr <- malloc - grpcDebug $ "serverRequestCall: callPtr is " ++ show callPtr - callDetails <- C.createCallDetails - metadataArrayPtr <- C.metadataArrayCreate - metadataArray <- peek metadataArrayPtr - tag <- newTag cq - callError <- C.grpcServerRequestCall server callPtr callDetails - metadataArray unsafeCQ unsafeCQ tag - grpcDebug $ "serverRequestCall: callError was " ++ show callError - if callError /= C.CallOk - then do grpcDebug "serverRequestCall: got call error; cleaning up." - failureCleanup callPtr callDetails metadataArrayPtr - return $ Left $ GRPCIOCallError callError - else do pluckResult <- pluck cq tag timeLimit - grpcDebug $ "serverRequestCall: pluckResult was " - ++ show pluckResult - case pluckResult of - Left x -> do - grpcDebug "serverRequestCall: pluck error; cleaning up." - failureCleanup callPtr callDetails - metadataArrayPtr - return $ Left x - Right () -> do - rawCall <- peek callPtr - let call = U.ServerCall rawCall - metadataArrayPtr - Nothing - callDetails - return $ Right call - - --TODO: the gRPC library appears to hold onto these pointers for a random - -- amount of time, even after returning from the only call that uses them. - -- This results in malloc errors if - -- gRPC tries to modify them after we free them. To work around it, - -- we sleep for a while before freeing the objects. We should find a - -- permanent solution that's more robust. - where failureCleanup callPtr callDetails metadataArrayPtr = forkIO $ do - threadDelaySecs 30 - grpcDebug "serverRequestCall: doing delayed cleanup." - free callPtr - C.destroyCallDetails callDetails - C.metadataArrayDestroy metadataArrayPtr - return () - -- | Register the server's completion queue. Must be done before the server is -- started. serverRegisterCompletionQueue :: C.Server -> CompletionQueue -> IO () @@ -366,6 +176,3 @@ serverRegisterCompletionQueue server CompletionQueue{..} = serverShutdownAndNotify :: C.Server -> CompletionQueue -> C.Tag -> IO () serverShutdownAndNotify server CompletionQueue{..} tag = C.grpcServerShutdownAndNotify server unsafeCQ tag - -threadDelaySecs :: Int -> IO () -threadDelaySecs = threadDelay . (* 10^(6::Int)) diff --git a/src/Network/GRPC/LowLevel/CompletionQueue/Internal.hs b/src/Network/GRPC/LowLevel/CompletionQueue/Internal.hs new file mode 100644 index 0000000..1facf67 --- /dev/null +++ b/src/Network/GRPC/LowLevel/CompletionQueue/Internal.hs @@ -0,0 +1,134 @@ +{-# LANGUAGE RecordWildCards #-} + +module Network.GRPC.LowLevel.CompletionQueue.Internal where + +import Control.Concurrent.STM (atomically, retry) +import Control.Concurrent.STM.TVar (TVar, modifyTVar', readTVar) +import Control.Exception (bracket) +import Data.IORef (IORef, atomicModifyIORef') +import Foreign.Ptr (nullPtr, plusPtr) +import Network.GRPC.LowLevel.GRPC +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Constants as C +import qualified Network.GRPC.Unsafe.Time as C + +-- NOTE: the concurrency requirements for a CompletionQueue are a little +-- complicated. There are two read operations: next and pluck. We can either +-- call next on a CQ or call pluck up to 'maxCompletionQueuePluckers' times +-- concurrently, but we can't mix next and pluck calls. Fortunately, we only +-- need to use next when we are shutting down the queue. Thus, we do two things +-- to shut down: +-- 1. Set the shuttingDown 'TVar' to 'True'. When this is set, no new pluck +-- calls will be allowed to start. +-- 2. Wait until no threads are plucking, as counted by 'currentPluckers'. +-- This logic can be seen in 'pluck' and 'shutdownCompletionQueue'. + +-- NOTE: There is one more possible race condition: pushing work onto the queue +-- after we begin to shut down. +-- Solution: another counter, which must reach zero before the shutdown +-- can start. + +-- TODO: 'currentPushers' currently imposes an arbitrary limit on the number of +-- concurrent pushers to the CQ, but I don't know what the limit should be set +-- to. I haven't found any documentation that suggests there is a limit imposed +-- by the gRPC library, but there might be. Need to investigate further. + +-- | Wraps the state necessary to use a gRPC completion queue. Completion queues +-- are used to wait for batches gRPC operations ('Op's) to finish running, as +-- well as wait for various other operations, such as server shutdown, pinging, +-- checking to see if we've been disconnected, and so forth. +data CompletionQueue = CompletionQueue {unsafeCQ :: C.CompletionQueue, + -- ^ All access to this field must be + -- guarded by a check of 'shuttingDown'. + currentPluckers :: TVar Int, + -- ^ Used to limit the number of + -- concurrent calls to pluck on this + -- queue. + -- The max value is set by gRPC in + -- 'C.maxCompletionQueuePluckers' + currentPushers :: TVar Int, + -- ^ Used to prevent new work from + -- being pushed onto the queue when + -- the queue begins to shut down. + shuttingDown :: TVar Bool, + -- ^ Used to prevent new pluck calls on + -- the queue when the queue begins to + -- shut down. + nextTag :: IORef Int + -- ^ Used to supply unique tags for work + -- items pushed onto the queue. + } + +type TimeoutSeconds = Int + +data CQOpType = Push | Pluck deriving (Show, Eq, Enum) + +-- | Create a new 'C.Tag' for identifying work items on the 'CompletionQueue'. +-- This will eventually wrap around after reaching @maxBound :: Int@, but from a +-- practical perspective, that should be safe. +newTag :: CompletionQueue -> IO C.Tag +newTag CompletionQueue{..} = do + i <- atomicModifyIORef' nextTag (\i -> (i+1,i)) + return $ C.Tag $ plusPtr nullPtr i + +-- | Safely brackets an operation that pushes work onto or plucks results from +-- the given 'CompletionQueue'. +withPermission :: CQOpType + -> CompletionQueue + -> IO (Either GRPCIOError a) + -> IO (Either GRPCIOError a) +withPermission op cq f = + bracket acquire release doOp + where acquire = atomically $ do + isShuttingDown <- readTVar (shuttingDown cq) + if isShuttingDown + then return False + else do currCount <- readTVar $ getCount op cq + if currCount < getLimit op + then modifyTVar' (getCount op cq) (+1) >> return True + else retry + doOp gotResource = if gotResource + then f + else return $ Left GRPCIOShutdown + release gotResource = + if gotResource + then atomically $ modifyTVar' (getCount op cq) (subtract 1) + else return () + +-- | Waits for the given number of seconds for the given tag to appear on the +-- completion queue. Throws 'GRPCIOShutdown' if the completion queue is shutting +-- down and cannot handle new requests. +pluck :: CompletionQueue -> C.Tag -> TimeoutSeconds + -> IO (Either GRPCIOError ()) +pluck cq@CompletionQueue{..} tag waitSeconds = do + grpcDebug $ "pluck: called with tag: " ++ show tag + ++ " and wait: " ++ show waitSeconds + withPermission Pluck cq $ do + C.withDeadlineSeconds waitSeconds $ \deadline -> do + ev <- C.grpcCompletionQueuePluck unsafeCQ tag deadline C.reserved + grpcDebug $ "pluck: finished. Event: " ++ show ev + return $ if isEventSuccessful ev then Right () else eventToError ev + +-- | Translate 'C.Event' to an error. The caller is responsible for ensuring +-- that the event actually corresponds to an error condition; a successful event +-- will be translated to a 'GRPCIOUnknownError'. +eventToError :: C.Event -> (Either GRPCIOError a) +eventToError (C.Event C.QueueShutdown _ _) = Left GRPCIOShutdown +eventToError (C.Event C.QueueTimeout _ _) = Left GRPCIOTimeout +eventToError _ = Left GRPCIOUnknownError + +-- | Returns true iff the given grpc_event was a success. +isEventSuccessful :: C.Event -> Bool +isEventSuccessful (C.Event C.OpComplete True _) = True +isEventSuccessful _ = False + +maxWorkPushers :: Int +maxWorkPushers = 100 --TODO: figure out what this should be. + +getCount :: CQOpType -> CompletionQueue -> TVar Int +getCount Push = currentPushers +getCount Pluck = currentPluckers + +getLimit :: CQOpType -> Int +getLimit Push = maxWorkPushers +getLimit Pluck = C.maxCompletionQueuePluckers diff --git a/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs b/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs new file mode 100644 index 0000000..40ea875 --- /dev/null +++ b/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs @@ -0,0 +1,78 @@ +{-# LANGUAGE RecordWildCards #-} + +module Network.GRPC.LowLevel.CompletionQueue.Unregistered where + +import Control.Concurrent (forkIO) +import Foreign.Marshal.Alloc (free, malloc) +import Foreign.Storable (peek) +import Network.GRPC.LowLevel.Call +import qualified Network.GRPC.LowLevel.Call.Unregistered as U +import Network.GRPC.LowLevel.CompletionQueue.Internal +import Network.GRPC.LowLevel.GRPC +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Constants as C +import qualified Network.GRPC.Unsafe.Metadata as C +import qualified Network.GRPC.Unsafe.Time as C + +channelCreateCall :: C.Channel + -> C.Call + -> C.PropagationMask + -> CompletionQueue + -> MethodName + -> Endpoint + -> C.CTimeSpecPtr + -> IO (Either GRPCIOError ClientCall) +channelCreateCall chan parent mask cq@CompletionQueue{..} meth endpt deadline = + withPermission Push cq $ do + call <- C.grpcChannelCreateCall chan parent mask unsafeCQ + (unMethodName meth) (unEndpoint endpt) deadline C.reserved + return $ Right $ ClientCall call + + +serverRequestCall :: C.Server -> CompletionQueue -> TimeoutSeconds + -> IO (Either GRPCIOError U.ServerCall) +serverRequestCall server cq@CompletionQueue{..} timeLimit = + withPermission Push cq $ do + callPtr <- malloc + grpcDebug $ "serverRequestCall: callPtr is " ++ show callPtr + callDetails <- C.createCallDetails + metadataArrayPtr <- C.metadataArrayCreate + metadataArray <- peek metadataArrayPtr + tag <- newTag cq + callError <- C.grpcServerRequestCall server callPtr callDetails + metadataArray unsafeCQ unsafeCQ tag + grpcDebug $ "serverRequestCall: callError was " ++ show callError + if callError /= C.CallOk + then do grpcDebug "serverRequestCall: got call error; cleaning up." + failureCleanup callPtr callDetails metadataArrayPtr + return $ Left $ GRPCIOCallError callError + else do pluckResult <- pluck cq tag timeLimit + grpcDebug $ "serverRequestCall: pluckResult was " + ++ show pluckResult + case pluckResult of + Left x -> do + grpcDebug "serverRequestCall: pluck error; cleaning up." + failureCleanup callPtr callDetails + metadataArrayPtr + return $ Left x + Right () -> do + rawCall <- peek callPtr + let call = U.ServerCall rawCall + metadataArrayPtr + Nothing + callDetails + return $ Right call + + --TODO: the gRPC library appears to hold onto these pointers for a random + -- amount of time, even after returning from the only call that uses them. + -- This results in malloc errors if + -- gRPC tries to modify them after we free them. To work around it, + -- we sleep for a while before freeing the objects. We should find a + -- permanent solution that's more robust. + where failureCleanup callPtr callDetails metadataArrayPtr = forkIO $ do + threadDelaySecs 30 + grpcDebug "serverRequestCall: doing delayed cleanup." + free callPtr + C.destroyCallDetails callDetails + C.metadataArrayDestroy metadataArrayPtr + return () diff --git a/src/Network/GRPC/LowLevel/GRPC.hs b/src/Network/GRPC/LowLevel/GRPC.hs index 65529d6..aa4106f 100644 --- a/src/Network/GRPC/LowLevel/GRPC.hs +++ b/src/Network/GRPC/LowLevel/GRPC.hs @@ -3,12 +3,7 @@ {-# LANGUAGE MultiParamTypeClasses #-} module Network.GRPC.LowLevel.GRPC where -{- --- TODO: remove if not needed -import Control.Monad.IO.Class (liftIO, MonadIO) -import Control.Monad.Except (ExceptT(..), runExceptT, throwError, - MonadError) --} +import Control.Concurrent (threadDelay) import Control.Exception import qualified Data.ByteString as B import qualified Data.Map as M @@ -63,6 +58,9 @@ grpcDebug str = do tid <- myThreadId grpcDebug _ = return () #endif +threadDelaySecs :: Int -> IO () +threadDelaySecs = threadDelay . (* 10^(6::Int)) + {- -- TODO: remove this once finally decided on whether to use it. -- | Monad for running gRPC operations. diff --git a/src/Network/GRPC/LowLevel/Op.hs b/src/Network/GRPC/LowLevel/Op.hs index 9d71bb1..4950338 100644 --- a/src/Network/GRPC/LowLevel/Op.hs +++ b/src/Network/GRPC/LowLevel/Op.hs @@ -7,7 +7,6 @@ import Control.Exception import qualified Data.ByteString as B import qualified Data.Map.Strict as M import Data.Maybe (catMaybes) -import Data.String (IsString) import Foreign.C.String (CString) import Foreign.C.Types (CInt) import Foreign.Marshal.Alloc (free, malloc, @@ -20,7 +19,6 @@ import qualified Network.GRPC.Unsafe.Metadata as C import qualified Network.GRPC.Unsafe.Op as C import Network.GRPC.LowLevel.Call -import Network.GRPC.LowLevel.Call.Unregistered as U import Network.GRPC.LowLevel.CompletionQueue import Network.GRPC.LowLevel.GRPC @@ -224,7 +222,7 @@ runOps call cq ops timeLimit = -- | For a given call, run the given 'Op's on the given completion queue with -- the given tag. Blocks until the ops are complete or the given number of -- seconds have elapsed. --- TODO: now that 'ServerRegCall' and 'ServerUnregCall' are separate types, we +-- TODO: now that 'ServerRegCall' and 'U.ServerCall' are separate types, we -- could try to limit the input 'Op's more appropriately. E.g., we don't use -- an 'OpRecvInitialMetadata' when receiving a registered call, because gRPC -- handles that for us. @@ -243,13 +241,6 @@ runServerRegOps :: ServerRegCall -> IO (Either GRPCIOError [OpRecvResult]) runServerRegOps = runOps . internalServerRegCall -runServerUnregOps :: U.ServerCall - -> CompletionQueue - -> [Op] - -> TimeoutSeconds - -> IO (Either GRPCIOError [OpRecvResult]) -runServerUnregOps = runOps . U.internalServerCall - -- | Like 'runServerOps', but for client-side calls. runClientOps :: ClientCall -> CompletionQueue @@ -263,6 +254,6 @@ runClientOps = runOps . internalClientCall extractStatusInfo :: [OpRecvResult] -> Maybe (MetadataMap, C.StatusCode, B.ByteString) extractStatusInfo [] = Nothing -extractStatusInfo (res@(OpRecvStatusOnClientResult meta code details):_) = +extractStatusInfo (OpRecvStatusOnClientResult meta code details:_) = Just (meta, code, details) extractStatusInfo (_:xs) = extractStatusInfo xs diff --git a/src/Network/GRPC/LowLevel/Op/Unregistered.hs b/src/Network/GRPC/LowLevel/Op/Unregistered.hs new file mode 100644 index 0000000..c41b8a1 --- /dev/null +++ b/src/Network/GRPC/LowLevel/Op/Unregistered.hs @@ -0,0 +1,13 @@ +module Network.GRPC.LowLevel.Op.Unregistered where + +import Network.GRPC.LowLevel.GRPC +import Network.GRPC.LowLevel.Op +import Network.GRPC.LowLevel.CompletionQueue +import Network.GRPC.LowLevel.Call.Unregistered as U + +runServerOps :: U.ServerCall + -> CompletionQueue + -> [Op] + -> TimeoutSeconds + -> IO (Either GRPCIOError [OpRecvResult]) +runServerOps = runOps . U.internalServerCall diff --git a/src/Network/GRPC/LowLevel/Server.hs b/src/Network/GRPC/LowLevel/Server.hs index 6b269c3..f34d676 100644 --- a/src/Network/GRPC/LowLevel/Server.hs +++ b/src/Network/GRPC/LowLevel/Server.hs @@ -15,7 +15,6 @@ import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue, createCompletionQueue, pluck, serverRegisterCompletionQueue, - serverRequestCall, serverRequestRegisteredCall, serverShutdownAndNotify, shutdownCompletionQueue) diff --git a/src/Network/GRPC/LowLevel/Server/Unregistered.hs b/src/Network/GRPC/LowLevel/Server/Unregistered.hs index 0c8273f..1a007d4 100644 --- a/src/Network/GRPC/LowLevel/Server/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/Server/Unregistered.hs @@ -2,21 +2,22 @@ module Network.GRPC.LowLevel.Server.Unregistered where -import Control.Exception (finally) -import Data.ByteString (ByteString) -import Network.GRPC.LowLevel.Call (MethodName) +import Control.Exception (finally) +import Data.ByteString (ByteString) +import Network.GRPC.LowLevel.Call (MethodName) import Network.GRPC.LowLevel.Call.Unregistered -import Network.GRPC.LowLevel.CompletionQueue (TimeoutSeconds, - serverRequestCall) +import Network.GRPC.LowLevel.CompletionQueue (TimeoutSeconds) +import qualified Network.GRPC.LowLevel.CompletionQueue.Unregistered as U import Network.GRPC.LowLevel.GRPC -import Network.GRPC.LowLevel.Op +import Network.GRPC.LowLevel.Op (OpRecvResult (..)) +import qualified Network.GRPC.LowLevel.Op.Unregistered as U import Network.GRPC.LowLevel.Server -import qualified Network.GRPC.Unsafe.Op as C +import qualified Network.GRPC.Unsafe.Op as C serverCreateCall :: Server -> TimeoutSeconds -> IO (Either GRPCIOError ServerCall) serverCreateCall Server{..} timeLimit = - serverRequestCall internalServer serverCQ timeLimit + U.serverRequestCall internalServer serverCQ timeLimit withServerCall :: Server -> TimeoutSeconds -> (ServerCall -> IO (Either GRPCIOError a)) @@ -45,7 +46,7 @@ serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do withServerCall s timeLimit $ \call -> do grpcDebug "serverHandleCall(U): starting batch." let recvOps = serverOpsGetNormalCall srvMetadata - opResults <- runServerUnregOps call serverCQ recvOps timeLimit + opResults <- U.runServerOps call serverCQ recvOps timeLimit case opResults of Left x -> do grpcDebug "serverHandleNormalCall(U): ops failed; aborting" return $ Left x @@ -59,7 +60,7 @@ serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do let status = C.GrpcStatusOk let respOps = serverOpsSendNormalResponse respBody respMetadata status details - respOpsResults <- runServerUnregOps call serverCQ respOps timeLimit + respOpsResults <- U.runServerOps call serverCQ respOps timeLimit case respOpsResults of Left x -> do grpcDebug "serverHandleNormalCall(U): resp failed." return $ Left x From b08ae78dd151022aaa799445bce5524b7082ecf8 Mon Sep 17 00:00:00 2001 From: Joel Stanley Date: Wed, 8 Jun 2016 13:13:51 -0500 Subject: [PATCH 6/9] comments --- src/Network/GRPC/LowLevel/Client.hs | 3 +++ src/Network/GRPC/LowLevel/CompletionQueue.hs | 6 ++++++ src/Network/GRPC/LowLevel/Op.hs | 9 ++++----- src/Network/GRPC/LowLevel/Server.hs | 3 +++ 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/Network/GRPC/LowLevel/Client.hs b/src/Network/GRPC/LowLevel/Client.hs index ffa4479..b2324aa 100644 --- a/src/Network/GRPC/LowLevel/Client.hs +++ b/src/Network/GRPC/LowLevel/Client.hs @@ -1,5 +1,8 @@ {-# LANGUAGE RecordWildCards #-} +-- | This module defines data structures and operations pertaining to registered +-- clients using registered calls; for unregistered support, see +-- `Network.GRPC.LowLevel.Client.Unregistered`. module Network.GRPC.LowLevel.Client where import Control.Exception (bracket, finally) diff --git a/src/Network/GRPC/LowLevel/CompletionQueue.hs b/src/Network/GRPC/LowLevel/CompletionQueue.hs index 42aaaff..d924e05 100644 --- a/src/Network/GRPC/LowLevel/CompletionQueue.hs +++ b/src/Network/GRPC/LowLevel/CompletionQueue.hs @@ -3,6 +3,12 @@ -- cause race conditions, so we only expose functions that are thread safe. -- However, some of the functions we export here can cause memory leaks if used -- improperly. +-- +-- When definition operations which pertain to calls, this module only provides +-- definitions for registered calls; for unregistered variants, see +-- `Network.GRPC.LowLevel.CompletionQueue.Unregistered`. Type definitions and +-- implementation details to both are kept in +-- `Network.GRPC.LowLevel.CompletionQueue.Internal`. {-# LANGUAGE RecordWildCards #-} diff --git a/src/Network/GRPC/LowLevel/Op.hs b/src/Network/GRPC/LowLevel/Op.hs index 4950338..7ee0fe5 100644 --- a/src/Network/GRPC/LowLevel/Op.hs +++ b/src/Network/GRPC/LowLevel/Op.hs @@ -221,11 +221,10 @@ runOps call cq ops timeLimit = -- | For a given call, run the given 'Op's on the given completion queue with -- the given tag. Blocks until the ops are complete or the given number of --- seconds have elapsed. --- TODO: now that 'ServerRegCall' and 'U.ServerCall' are separate types, we --- could try to limit the input 'Op's more appropriately. E.g., we don't use --- an 'OpRecvInitialMetadata' when receiving a registered call, because gRPC --- handles that for us. +-- seconds have elapsed. TODO: now that we distinguish between different types +-- of calls at the type level, we could try to limit the input 'Op's more +-- appropriately. E.g., we don't use an 'OpRecvInitialMetadata' when receiving a +-- registered call, because gRPC handles that for us. runServerRegOps :: ServerRegCall -- ^ 'Call' that this batch is associated with. One call can be -- associated with many batches. diff --git a/src/Network/GRPC/LowLevel/Server.hs b/src/Network/GRPC/LowLevel/Server.hs index f34d676..9ae18a2 100644 --- a/src/Network/GRPC/LowLevel/Server.hs +++ b/src/Network/GRPC/LowLevel/Server.hs @@ -1,5 +1,8 @@ {-# LANGUAGE RecordWildCards #-} +-- | This module defines data structures and operations pertaining to registered +-- servers using registered calls; for unregistered support, see +-- `Network.GRPC.LowLevel.Server.Unregistered`. module Network.GRPC.LowLevel.Server where import Control.Exception (bracket, finally) From 8069ebba078f1ecbf005df87713d3ebce62ecfa2 Mon Sep 17 00:00:00 2001 From: Joel Stanley Date: Wed, 8 Jun 2016 14:38:01 -0500 Subject: [PATCH 7/9] Rename reg operations in all modules; use qualified imports whenever selecting unregistered variants --- examples/echo/echo-client/Main.hs | 9 +- examples/echo/echo-server/Main.hs | 4 +- grpc-haskell.cabal | 2 +- src/Network/GRPC/LowLevel.hs | 10 +- src/Network/GRPC/LowLevel/Call.hs | 96 ++++++++-------- .../GRPC/LowLevel/Call/Unregistered.hs | 23 ++-- src/Network/GRPC/LowLevel/Client.hs | 68 +++++------ .../GRPC/LowLevel/Client/Unregistered.hs | 10 +- src/Network/GRPC/LowLevel/CompletionQueue.hs | 44 +++---- .../LowLevel/CompletionQueue/Unregistered.hs | 6 +- src/Network/GRPC/LowLevel/Op.hs | 55 +++++---- src/Network/GRPC/LowLevel/Op/Unregistered.hs | 2 +- src/Network/GRPC/LowLevel/Server.hs | 107 ++++++++++-------- .../GRPC/LowLevel/Server/Unregistered.hs | 6 +- tests/LowLevelTests.hs | 21 ++-- 15 files changed, 240 insertions(+), 223 deletions(-) diff --git a/examples/echo/echo-client/Main.hs b/examples/echo/echo-client/Main.hs index a430f1f..c5ff1cb 100644 --- a/examples/echo/echo-client/Main.hs +++ b/examples/echo/echo-client/Main.hs @@ -1,19 +1,20 @@ -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedStrings #-} {-# OPTIONS_GHC -fno-warn-missing-signatures #-} {-# OPTIONS_GHC -fno-warn-unused-binds #-} import Control.Monad import Network.GRPC.LowLevel +import qualified Network.GRPC.LowLevel.Client.Unregistered as U echoMethod = MethodName "/echo.Echo/DoEcho" unregistered c = do - clientRequest c echoMethod 1 "hi" mempty + U.clientRequest c echoMethod 1 "hi" mempty registered c = do meth <- clientRegisterMethod c echoMethod Normal - clientRegisteredRequest c meth 1 "hi" mempty + clientRequest c meth 1 "hi" mempty run f = withGRPC $ \g -> withClient g (ClientConfig "localhost" 50051) $ \c -> f c >>= \case diff --git a/examples/echo/echo-server/Main.hs b/examples/echo/echo-server/Main.hs index 3a28d25..db89af2 100644 --- a/examples/echo/echo-server/Main.hs +++ b/examples/echo/echo-server/Main.hs @@ -33,7 +33,7 @@ regMain = withGRPC $ \grpc -> do withServer grpc (ServerConfig "localhost" 50051 methods) $ \server -> forever $ do let method = head (registeredMethods server) - result <- serverHandleNormalRegisteredCall server method 15 serverMeta $ + result <- serverHandleNormalCall server method 15 serverMeta $ \reqBody _reqMeta -> return (reqBody, serverMeta, serverMeta, StatusDetails "") case result of @@ -43,7 +43,7 @@ regMain = withGRPC $ \grpc -> do -- | loop to fork n times regLoop :: Server -> RegisteredMethod -> IO () regLoop server method = forever $ do - result <- serverHandleNormalRegisteredCall server method 15 serverMeta $ + result <- serverHandleNormalCall server method 15 serverMeta $ \reqBody _reqMeta -> return (reqBody, serverMeta, serverMeta, StatusDetails "") case result of diff --git a/grpc-haskell.cabal b/grpc-haskell.cabal index 6375ae2..a94fb86 100644 --- a/grpc-haskell.cabal +++ b/grpc-haskell.cabal @@ -43,6 +43,7 @@ library Network.GRPC.Unsafe Network.GRPC.LowLevel Network.GRPC.LowLevel.Server.Unregistered + Network.GRPC.LowLevel.Client.Unregistered other-modules: Network.GRPC.LowLevel.CompletionQueue Network.GRPC.LowLevel.CompletionQueue.Internal @@ -54,7 +55,6 @@ library Network.GRPC.LowLevel.Call Network.GRPC.LowLevel.Call.Unregistered Network.GRPC.LowLevel.Client - Network.GRPC.LowLevel.Client.Unregistered extra-libraries: grpc includes: diff --git a/src/Network/GRPC/LowLevel.hs b/src/Network/GRPC/LowLevel.hs index 7bab209..7485fda 100644 --- a/src/Network/GRPC/LowLevel.hs +++ b/src/Network/GRPC/LowLevel.hs @@ -29,11 +29,11 @@ GRPC -- * Server , ServerConfig(..) , Server -, ServerRegCall +, ServerCall , registeredMethods , withServer -, serverHandleNormalRegisteredCall -, withServerRegisteredCall +, serverHandleNormalCall +, withServerCall -- * Client , ClientConfig(..) @@ -43,13 +43,12 @@ GRPC , clientConnectivity , withClient , clientRegisterMethod -, clientRegisteredRequest , clientRequest , withClientCall -- * Ops , runClientOps -, runServerRegOps +, runServerOps , Op(..) , OpRecvResult(..) @@ -60,7 +59,6 @@ import Network.GRPC.LowLevel.Server import Network.GRPC.LowLevel.CompletionQueue import Network.GRPC.LowLevel.Op import Network.GRPC.LowLevel.Client -import Network.GRPC.LowLevel.Client.Unregistered import Network.GRPC.LowLevel.Call import Network.GRPC.Unsafe (ConnectivityState(..)) diff --git a/src/Network/GRPC/LowLevel/Call.hs b/src/Network/GRPC/LowLevel/Call.hs index 127b4ca..2907b01 100644 --- a/src/Network/GRPC/LowLevel/Call.hs +++ b/src/Network/GRPC/LowLevel/Call.hs @@ -54,32 +54,31 @@ data RegisteredMethod = RegisteredMethod {methodType :: GRPCMethodType, -- | Represents one GRPC call (i.e. request) on the client. -- This is used to associate send/receive 'Op's with a request. -data ClientCall = ClientCall {internalClientCall :: C.Call} +data ClientCall = ClientCall { unClientCall :: C.Call } --- | Represents one registered GRPC call on the server. --- Contains pointers to all the C state needed to respond to a registered call. -data ServerRegCall = ServerRegCall - {internalServerRegCall :: C.Call, - requestMetadataRecvReg :: Ptr C.MetadataArray, - optionalPayload :: Ptr C.ByteBuffer, - parentPtrReg :: Maybe (Ptr C.Call), - callDeadline :: C.CTimeSpecPtr +-- | Represents one registered GRPC call on the server. Contains pointers to all +-- the C state needed to respond to a registered call. +data ServerCall = ServerCall + { unServerCall :: C.Call, + requestMetadataRecv :: Ptr C.MetadataArray, + optionalPayload :: Ptr C.ByteBuffer, + parentPtr :: Maybe (Ptr C.Call), + callDeadline :: C.CTimeSpecPtr } -serverRegCallGetMetadata :: ServerRegCall -> IO MetadataMap -serverRegCallGetMetadata ServerRegCall{..} = do - marray <- peek requestMetadataRecvReg +serverCallGetMetadata :: ServerCall -> IO MetadataMap +serverCallGetMetadata ServerCall{..} = do + marray <- peek requestMetadataRecv C.getAllMetadataArray marray --- | Extract the client request body from the given registered call, if present. --- TODO: the reason this returns @Maybe ByteString@ is because the gRPC library --- calls the underlying out parameter "optional_payload". I am not sure exactly --- in what cases it won't be present. The C++ library checks a --- has_request_payload_ bool and passes in nullptr to request_registered_call --- if the bool is false, so we may be able to do the payload present/absent --- check earlier. -serverRegCallGetPayload :: ServerRegCall -> IO (Maybe ByteString) -serverRegCallGetPayload ServerRegCall{..} = do +-- | Extract the client request body from the given call, if present. TODO: the +-- reason this returns @Maybe ByteString@ is because the gRPC library calls the +-- underlying out parameter "optional_payload". I am not sure exactly in what +-- cases it won't be present. The C++ library checks a has_request_payload_ bool +-- and passes in nullptr to request_registered_call if the bool is false, so we +-- may be able to do the payload present/absent check earlier. +serverCallGetPayload :: ServerCall -> IO (Maybe ByteString) +serverCallGetPayload ServerCall{..} = do bb@(C.ByteBuffer rawPtr) <- peek optionalPayload if rawPtr == nullPtr then return Nothing @@ -94,48 +93,47 @@ debugClientCall (ClientCall (C.Call ptr)) = debugClientCall = const $ return () #endif -debugServerRegCall :: ServerRegCall -> IO () +debugServerCall :: ServerCall -> IO () #ifdef DEBUG -debugServerRegCall call@(ServerRegCall (C.Call ptr) _ _ _ _) = do - grpcDebug $ "debugServerRegCall: server call: " ++ (show ptr) - grpcDebug $ "debugServerRegCall: metadata ptr: " - ++ show (requestMetadataRecvReg call) - metadataArr <- peek (requestMetadataRecvReg call) +debugServerCall call@(ServerCall (C.Call ptr) _ _ _ _) = do + grpcDebug $ "debugServerCall(R): server call: " ++ (show ptr) + grpcDebug $ "debugServerCall(R): metadata ptr: " + ++ show (requestMetadataRecv call) + metadataArr <- peek (requestMetadataRecv call) metadata <- C.getAllMetadataArray metadataArr - grpcDebug $ "debugServerRegCall: metadata received: " ++ (show metadata) - grpcDebug $ "debugServerRegCall: payload ptr: " ++ show (optionalPayload call) + grpcDebug $ "debugServerCall(R): metadata received: " ++ (show metadata) + grpcDebug $ "debugServerCall(R): payload ptr: " ++ show (optionalPayload call) payload <- peek (optionalPayload call) bs <- C.copyByteBufferToByteString payload - grpcDebug $ "debugServerRegCall: payload contents: " ++ show bs - forM_ (parentPtrReg call) $ \parentPtr' -> do - grpcDebug $ "debugServerRegCall: parent ptr: " ++ show parentPtr' + grpcDebug $ "debugServerCall(R): payload contents: " ++ show bs + forM_ (parentPtr call) $ \parentPtr' -> do + grpcDebug $ "debugServerCall(R): parent ptr: " ++ show parentPtr' (C.Call parent) <- peek parentPtr' - grpcDebug $ "debugServerRegCall: parent: " ++ show parent - grpcDebug $ "debugServerRegCall: deadline ptr: " ++ show (callDeadline call) + grpcDebug $ "debugServerCall(R): parent: " ++ show parent + grpcDebug $ "debugServerCall(R): deadline ptr: " ++ show (callDeadline call) timespec <- peek (callDeadline call) - grpcDebug $ "debugServerRegCall: deadline: " ++ show (C.timeSpec timespec) + grpcDebug $ "debugServerCall(R): deadline: " ++ show (C.timeSpec timespec) #else -{-# INLINE debugServerRegCall #-} -debugServerRegCall = const $ return () +{-# INLINE debugServerCall #-} +debugServerCall = const $ return () #endif destroyClientCall :: ClientCall -> IO () destroyClientCall ClientCall{..} = do grpcDebug "Destroying client-side call object." - C.grpcCallDestroy internalClientCall + C.grpcCallDestroy unClientCall -destroyServerRegCall :: ServerRegCall -> IO () -destroyServerRegCall call@ServerRegCall{..} = do - grpcDebug "destroyServerRegCall: entered." - debugServerRegCall call - grpcDebug $ "Destroying server-side call object: " - ++ show internalServerRegCall - C.grpcCallDestroy internalServerRegCall - grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecvReg - C.metadataArrayDestroy requestMetadataRecvReg +destroyServerCall :: ServerCall -> IO () +destroyServerCall call@ServerCall{..} = do + grpcDebug "destroyServerCall(R): entered." + debugServerCall call + grpcDebug $ "Destroying server-side call object: " ++ show unServerCall + C.grpcCallDestroy unServerCall + grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecv + C.metadataArrayDestroy requestMetadataRecv grpcDebug $ "destroying optional payload" ++ show optionalPayload C.destroyReceivingByteBuffer optionalPayload - grpcDebug $ "freeing parentPtr: " ++ show parentPtrReg - forM_ parentPtrReg free + grpcDebug $ "freeing parentPtr: " ++ show parentPtr + forM_ parentPtr free grpcDebug $ "destroying deadline." ++ show callDeadline C.timespecDestroy callDeadline diff --git a/src/Network/GRPC/LowLevel/Call/Unregistered.hs b/src/Network/GRPC/LowLevel/Call/Unregistered.hs index 60307ef..dc714f3 100644 --- a/src/Network/GRPC/LowLevel/Call/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/Call/Unregistered.hs @@ -3,21 +3,19 @@ module Network.GRPC.LowLevel.Call.Unregistered where import Control.Monad -import Foreign.Marshal.Alloc (free) -import Foreign.Ptr (Ptr) -import Foreign.Storable (peek) - -import qualified Network.GRPC.Unsafe as C -import qualified Network.GRPC.Unsafe.Metadata as C - -import Network.GRPC.LowLevel.Call -import Network.GRPC.LowLevel.GRPC (MetadataMap, grpcDebug) +import Foreign.Marshal.Alloc (free) +import Foreign.Ptr (Ptr) +import Foreign.Storable (peek) +import Network.GRPC.LowLevel.Call (Host (..), MethodName (..)) +import Network.GRPC.LowLevel.GRPC (MetadataMap, grpcDebug) +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Metadata as C -- | Represents one unregistered GRPC call on the server. -- Contains pointers to all the C state needed to respond to an unregistered -- call. data ServerCall = ServerCall - { internalServerCall :: C.Call + { unServerCall :: C.Call , requestMetadataRecv :: Ptr C.MetadataArray , parentPtr :: Maybe (Ptr C.Call) , callDetails :: C.CallDetails @@ -61,9 +59,8 @@ destroyServerCall :: ServerCall -> IO () destroyServerCall call@ServerCall{..} = do grpcDebug "destroyServerCall(U): entered." debugServerCall call - grpcDebug $ "Destroying server-side call object: " - ++ show internalServerCall - C.grpcCallDestroy internalServerCall + grpcDebug $ "Destroying server-side call object: " ++ show unServerCall + C.grpcCallDestroy unServerCall grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecv C.metadataArrayDestroy requestMetadataRecv grpcDebug $ "freeing parentPtr: " ++ show parentPtr diff --git a/src/Network/GRPC/LowLevel/Client.hs b/src/Network/GRPC/LowLevel/Client.hs index b2324aa..bbb2e8b 100644 --- a/src/Network/GRPC/LowLevel/Client.hs +++ b/src/Network/GRPC/LowLevel/Client.hs @@ -58,7 +58,7 @@ clientConnectivity Client{..} = C.grpcChannelCheckConnectivityState clientChannel False -- | Register a method on the client so that we can call it with --- 'clientRegisteredRequest'. +-- 'clientRequest'. clientRegisterMethod :: Client -> MethodName -- ^ method name, e.g. "/foo" @@ -74,27 +74,30 @@ clientRegisterMethod _ _ _ = error "Streaming methods not yet implemented." -- | Create a new call on the client for a registered method. -- Returns 'Left' if the CQ is shutting down or if the job to create a call -- timed out. -clientCreateRegisteredCall :: Client -> RegisteredMethod -> TimeoutSeconds - -> IO (Either GRPCIOError ClientCall) -clientCreateRegisteredCall Client{..} RegisteredMethod{..} timeout = do +clientCreateCall :: Client + -> RegisteredMethod + -> TimeoutSeconds + -> IO (Either GRPCIOError ClientCall) +clientCreateCall Client{..} RegisteredMethod{..} timeout = do let parentCall = C.Call nullPtr --Unsure what this does. null is safe, though. C.withDeadlineSeconds timeout $ \deadline -> do - channelCreateRegisteredCall clientChannel parentCall C.propagateDefaults - clientCQ methodHandle deadline + channelCreateCall clientChannel parentCall C.propagateDefaults + clientCQ methodHandle deadline -- TODO: the error-handling refactor made this quite ugly. It could be fixed -- by switching to ExceptT IO. -- | Handles safe creation and cleanup of a client call -withClientRegisteredCall :: Client -> RegisteredMethod -> TimeoutSeconds - -> (ClientCall - -> IO (Either GRPCIOError a)) - -> IO (Either GRPCIOError a) -withClientRegisteredCall client regmethod timeout f = do - createResult <- clientCreateRegisteredCall client regmethod timeout +withClientCall :: Client + -> RegisteredMethod + -> TimeoutSeconds + -> (ClientCall -> IO (Either GRPCIOError a)) + -> IO (Either GRPCIOError a) +withClientCall client regmethod timeout f = do + createResult <- clientCreateCall client regmethod timeout case createResult of Left x -> return $ Left x Right call -> f call `finally` logDestroy call - where logDestroy c = grpcDebug "withClientRegisteredCall: destroying." + where logDestroy c = grpcDebug "withClientCall(R): destroying." >> destroyClientCall c data NormalRequestResult = NormalRequestResult @@ -131,25 +134,24 @@ compileNormalRequestResults x = -- server's response. TODO: This is preliminary until we figure out how many -- different variations on sending request ops will be needed for full gRPC -- functionality. -clientRegisteredRequest :: Client - -> RegisteredMethod - -> TimeoutSeconds - -- ^ Timeout of both the grpc_call and the - -- max time to wait for the completion of the batch. - -- TODO: I think we will need to decouple the - -- lifetime of the call from the queue deadline once - -- we expose functionality for streaming calls, where - -- one call object persists across many batches. - -> ByteString - -- ^ The body of the request. - -> MetadataMap - -- ^ Metadata to send with the request. - -> IO (Either GRPCIOError NormalRequestResult) -clientRegisteredRequest client@(Client{..}) rm@(RegisteredMethod{..}) - timeLimit body meta = +clientRequest :: Client + -> RegisteredMethod + -> TimeoutSeconds + -- ^ Timeout of both the grpc_call and the max time to wait for + -- the completion of the batch. TODO: I think we will need to + -- decouple the lifetime of the call from the queue deadline once + -- we expose functionality for streaming calls, where one call + -- object persists across many batches. + -> ByteString + -- ^ The body of the request + -> MetadataMap + -- ^ Metadata to send with the request + -> IO (Either GRPCIOError NormalRequestResult) +clientRequest client@(Client{..}) rm@(RegisteredMethod{..}) + timeLimit body meta = fmap join $ case methodType of - Normal -> withClientRegisteredCall client rm timeLimit $ \call -> do - grpcDebug "clientRegisteredRequest: created call." + Normal -> withClientCall client rm timeLimit $ \call -> do + grpcDebug "clientRequest(R): created call." debugClientCall call -- NOTE: sendOps and recvOps *must* be in separate batches or -- the client hangs when the server can't be reached. @@ -158,7 +160,7 @@ clientRegisteredRequest client@(Client{..}) rm@(RegisteredMethod{..}) , OpSendCloseFromClient] sendRes <- runClientOps call clientCQ sendOps timeLimit case sendRes of - Left x -> do grpcDebug "clientRegisteredRequest: batch error." + Left x -> do grpcDebug "clientRequest(R) : batch error." return $ Left x Right rs -> do let recvOps = [OpRecvInitialMetadata, @@ -167,7 +169,7 @@ clientRegisteredRequest client@(Client{..}) rm@(RegisteredMethod{..}) recvRes <- runClientOps call clientCQ recvOps timeLimit case recvRes of Left x -> do - grpcDebug "clientRegisteredRequest: batch error." + grpcDebug "clientRequest(R): batch error." return $ Left x Right rs' -> do return $ Right $ compileNormalRequestResults (rs ++ rs') diff --git a/src/Network/GRPC/LowLevel/Client/Unregistered.hs b/src/Network/GRPC/LowLevel/Client/Unregistered.hs index 7f5fa33..03b6082 100644 --- a/src/Network/GRPC/LowLevel/Client/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/Client/Unregistered.hs @@ -11,7 +11,11 @@ import qualified Network.GRPC.Unsafe.Constants as C import qualified Network.GRPC.Unsafe.Time as C import Network.GRPC.LowLevel.Call -import Network.GRPC.LowLevel.Client +import Network.GRPC.LowLevel.Client (Client (..), + NormalRequestResult (..), + clientEndpoint, + clientNormalRequestOps, + compileNormalRequestResults) import Network.GRPC.LowLevel.CompletionQueue (TimeoutSeconds) import qualified Network.GRPC.LowLevel.CompletionQueue.Unregistered as U import Network.GRPC.LowLevel.GRPC @@ -40,7 +44,7 @@ withClientCall client method timeout f = do case createResult of Left x -> return $ Left x Right call -> f call `finally` logDestroy call - where logDestroy c = grpcDebug "withClientCall: destroying." + where logDestroy c = grpcDebug "withClientCall(U): destroying." >> destroyClientCall c -- | Makes a normal (non-streaming) request without needing to register a method @@ -61,7 +65,7 @@ clientRequest client@Client{..} meth timeLimit body meta = withClientCall client meth timeLimit $ \call -> do let ops = clientNormalRequestOps body meta results <- runClientOps call clientCQ ops timeLimit - grpcDebug "clientRequest: ops ran." + grpcDebug "clientRequest(U): ops ran." case results of Left x -> return $ Left x Right rs -> return $ Right $ compileNormalRequestResults rs diff --git a/src/Network/GRPC/LowLevel/CompletionQueue.hs b/src/Network/GRPC/LowLevel/CompletionQueue.hs index d924e05..3e78bb9 100644 --- a/src/Network/GRPC/LowLevel/CompletionQueue.hs +++ b/src/Network/GRPC/LowLevel/CompletionQueue.hs @@ -19,12 +19,12 @@ module Network.GRPC.LowLevel.CompletionQueue , shutdownCompletionQueue , pluck , startBatch - , channelCreateRegisteredCall + , channelCreateCall , TimeoutSeconds , isEventSuccessful , serverRegisterCompletionQueue , serverShutdownAndNotify - , serverRequestRegisteredCall + , serverRequestCall , newTag ) where @@ -104,14 +104,17 @@ shutdownCompletionQueue (CompletionQueue{..}) = do C.QueueTimeout -> drainLoop C.OpComplete -> drainLoop -channelCreateRegisteredCall :: C.Channel -> C.Call -> C.PropagationMask - -> CompletionQueue -> C.CallHandle - -> C.CTimeSpecPtr - -> IO (Either GRPCIOError ClientCall) -channelCreateRegisteredCall +channelCreateCall :: C.Channel + -> C.Call + -> C.PropagationMask + -> CompletionQueue + -> C.CallHandle + -> C.CTimeSpecPtr + -> IO (Either GRPCIOError ClientCall) +channelCreateCall chan parent mask cq@CompletionQueue{..} handle deadline = withPermission Push cq $ do - grpcDebug $ "channelCreateRegisteredCall: call with " + grpcDebug $ "channelCreateCall: call with " ++ concat (intersperse " " [show chan, show parent, show mask, show unsafeCQ, show handle, show deadline]) @@ -120,10 +123,13 @@ channelCreateRegisteredCall return $ Right $ ClientCall call -- | Create the call object to handle a registered call. -serverRequestRegisteredCall :: C.Server -> CompletionQueue -> TimeoutSeconds - -> RegisteredMethod -> MetadataMap - -> IO (Either GRPCIOError ServerRegCall) -serverRequestRegisteredCall +serverRequestCall :: C.Server + -> CompletionQueue + -> TimeoutSeconds + -> RegisteredMethod + -> MetadataMap + -> IO (Either GRPCIOError ServerCall) +serverRequestCall server cq@CompletionQueue{..} timeLimit RegisteredMethod{..} initMeta = withPermission Push cq $ do -- TODO: Is gRPC supposed to populate this deadline? @@ -146,28 +152,28 @@ serverRequestRegisteredCall callError <- C.grpcServerRequestRegisteredCall server methodHandle callPtr deadline metadataArray bbPtr unsafeCQ unsafeCQ tag - grpcDebug $ "serverRequestRegisteredCall: callError: " + grpcDebug $ "serverRequestCall(R): callError: " ++ show callError if callError /= C.CallOk - then do grpcDebug "serverRequestRegisteredCall: callError. cleaning up" + then do grpcDebug "serverRequestCall(R): callError. cleaning up" failureCleanup deadline callPtr metadataArrayPtr bbPtr return $ Left $ GRPCIOCallError callError else do pluckResult <- pluck cq tag timeLimit - grpcDebug "serverRequestRegisteredCall: finished pluck." + grpcDebug "serverRequestCall(R): finished pluck." case pluckResult of Left x -> do - grpcDebug "serverRequestRegisteredCall: cleanup pluck err" + grpcDebug "serverRequestCall(R): cleanup pluck err" failureCleanup deadline callPtr metadataArrayPtr bbPtr return $ Left x Right () -> do rawCall <- peek callPtr - let assembledCall = ServerRegCall rawCall metadataArrayPtr - bbPtr Nothing deadline + let assembledCall = ServerCall rawCall metadataArrayPtr + bbPtr Nothing deadline return $ Right assembledCall -- TODO: see TODO for failureCleanup in serverRequestCall. where failureCleanup deadline callPtr metadataArrayPtr bbPtr = forkIO $ do threadDelaySecs 30 - grpcDebug "serverRequestRegisteredCall: doing delayed cleanup." + grpcDebug "serverRequestCall(R): doing delayed cleanup." C.timespecDestroy deadline free callPtr C.metadataArrayDestroy metadataArrayPtr diff --git a/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs b/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs index 40ea875..67cc169 100644 --- a/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs @@ -29,8 +29,10 @@ channelCreateCall chan parent mask cq@CompletionQueue{..} meth endpt deadline = return $ Right $ ClientCall call -serverRequestCall :: C.Server -> CompletionQueue -> TimeoutSeconds - -> IO (Either GRPCIOError U.ServerCall) +serverRequestCall :: C.Server + -> CompletionQueue + -> TimeoutSeconds + -> IO (Either GRPCIOError U.ServerCall) serverRequestCall server cq@CompletionQueue{..} timeLimit = withPermission Push cq $ do callPtr <- malloc diff --git a/src/Network/GRPC/LowLevel/Op.hs b/src/Network/GRPC/LowLevel/Op.hs index 7ee0fe5..ce17f1d 100644 --- a/src/Network/GRPC/LowLevel/Op.hs +++ b/src/Network/GRPC/LowLevel/Op.hs @@ -4,23 +4,22 @@ module Network.GRPC.LowLevel.Op where import Control.Exception -import qualified Data.ByteString as B -import qualified Data.Map.Strict as M -import Data.Maybe (catMaybes) -import Foreign.C.String (CString) -import Foreign.C.Types (CInt) -import Foreign.Marshal.Alloc (free, malloc, - mallocBytes) -import Foreign.Ptr (Ptr, nullPtr) -import Foreign.Storable (peek, poke) -import qualified Network.GRPC.Unsafe as C (Call) -import qualified Network.GRPC.Unsafe.ByteBuffer as C -import qualified Network.GRPC.Unsafe.Metadata as C -import qualified Network.GRPC.Unsafe.Op as C - +import qualified Data.ByteString as B +import qualified Data.Map.Strict as M +import Data.Maybe (catMaybes) +import Foreign.C.String (CString) +import Foreign.C.Types (CInt) +import Foreign.Marshal.Alloc (free, malloc, + mallocBytes) +import Foreign.Ptr (Ptr, nullPtr) +import Foreign.Storable (peek, poke) import Network.GRPC.LowLevel.Call import Network.GRPC.LowLevel.CompletionQueue import Network.GRPC.LowLevel.GRPC +import qualified Network.GRPC.Unsafe as C (Call) +import qualified Network.GRPC.Unsafe.ByteBuffer as C +import qualified Network.GRPC.Unsafe.Metadata as C +import qualified Network.GRPC.Unsafe.Op as C -- | Sum describing all possible send and receive operations that can be batched -- and executed by gRPC. Usually these are processed in a handful of @@ -219,26 +218,26 @@ runOps call cq ops timeLimit = fmap (Right . catMaybes) $ mapM resultFromOpContext contexts Left err -> return $ Left err --- | For a given call, run the given 'Op's on the given completion queue with --- the given tag. Blocks until the ops are complete or the given number of +-- | For a given server call, run the given 'Op's on the given completion queue +-- with the given tag. Blocks until the ops are complete or the given number of -- seconds have elapsed. TODO: now that we distinguish between different types -- of calls at the type level, we could try to limit the input 'Op's more -- appropriately. E.g., we don't use an 'OpRecvInitialMetadata' when receiving a -- registered call, because gRPC handles that for us. -runServerRegOps :: ServerRegCall - -- ^ 'Call' that this batch is associated with. One call can be - -- associated with many batches. - -> CompletionQueue - -- ^ Queue on which our tag will be placed once our ops are done - -- running. - -> [Op] +runServerOps :: ServerCall + -- ^ 'Call' that this batch is associated with. One call can be + -- associated with many batches. + -> CompletionQueue + -- ^ Queue on which our tag will be placed once our ops are done + -- running. + -> [Op] -- ^ The list of 'Op's to execute. - -> TimeoutSeconds + -> TimeoutSeconds -- ^ How long to block waiting for the tag to appear on the - --queue. If we time out, the result of this action will be + -- queue. If we time out, the result of this action will be -- @CallBatchError BatchTimeout@. - -> IO (Either GRPCIOError [OpRecvResult]) -runServerRegOps = runOps . internalServerRegCall + -> IO (Either GRPCIOError [OpRecvResult]) +runServerOps = runOps . unServerCall -- | Like 'runServerOps', but for client-side calls. runClientOps :: ClientCall @@ -246,7 +245,7 @@ runClientOps :: ClientCall -> [Op] -> TimeoutSeconds -> IO (Either GRPCIOError [OpRecvResult]) -runClientOps = runOps . internalClientCall +runClientOps = runOps . unClientCall -- | If response status info is present in the given 'OpRecvResult's, returns -- a tuple of trailing metadata, status code, and status details. diff --git a/src/Network/GRPC/LowLevel/Op/Unregistered.hs b/src/Network/GRPC/LowLevel/Op/Unregistered.hs index c41b8a1..55b4077 100644 --- a/src/Network/GRPC/LowLevel/Op/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/Op/Unregistered.hs @@ -10,4 +10,4 @@ runServerOps :: U.ServerCall -> [Op] -> TimeoutSeconds -> IO (Either GRPCIOError [OpRecvResult]) -runServerOps = runOps . U.internalServerCall +runServerOps = runOps . U.unServerCall diff --git a/src/Network/GRPC/LowLevel/Server.hs b/src/Network/GRPC/LowLevel/Server.hs index 9ae18a2..b55151f 100644 --- a/src/Network/GRPC/LowLevel/Server.hs +++ b/src/Network/GRPC/LowLevel/Server.hs @@ -5,24 +5,23 @@ -- `Network.GRPC.LowLevel.Server.Unregistered`. module Network.GRPC.LowLevel.Server where -import Control.Exception (bracket, finally) +import Control.Exception (bracket, finally) import Control.Monad -import Data.ByteString (ByteString) -import Foreign.Ptr (nullPtr) -import qualified Network.GRPC.Unsafe as C -import qualified Network.GRPC.Unsafe.Op as C - +import Data.ByteString (ByteString) +import Foreign.Ptr (nullPtr) import Network.GRPC.LowLevel.Call -import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue, - TimeoutSeconds, - createCompletionQueue, - pluck, - serverRegisterCompletionQueue, - serverRequestRegisteredCall, - serverShutdownAndNotify, - shutdownCompletionQueue) +import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue, + TimeoutSeconds, + createCompletionQueue, + pluck, + serverRegisterCompletionQueue, + serverRequestCall, + serverShutdownAndNotify, + shutdownCompletionQueue) import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.Op +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Op as C -- | Wraps various gRPC state needed to run a server. data Server = Server @@ -123,24 +122,27 @@ serverRegisterMethod _ _ _ _ = error "Streaming methods not implemented yet." -- | Create a 'Call' with which to wait for the invocation of a registered -- method. -serverCreateRegisteredCall :: Server -> RegisteredMethod -> TimeoutSeconds - -> MetadataMap - -> IO (Either GRPCIOError ServerRegCall) -serverCreateRegisteredCall Server{..} rm timeLimit initMeta = - serverRequestRegisteredCall internalServer serverCQ timeLimit rm initMeta +serverCreateCall :: Server + -> RegisteredMethod + -> TimeoutSeconds + -> MetadataMap + -> IO (Either GRPCIOError ServerCall) +serverCreateCall Server{..} rm timeLimit initMeta = + serverRequestCall internalServer serverCQ timeLimit rm initMeta -withServerRegisteredCall :: Server -> RegisteredMethod -> TimeoutSeconds - -> MetadataMap - -> (ServerRegCall - -> IO (Either GRPCIOError a)) - -> IO (Either GRPCIOError a) -withServerRegisteredCall server regmethod timeout initMeta f = do - createResult <- serverCreateRegisteredCall server regmethod timeout initMeta +withServerCall :: Server + -> RegisteredMethod + -> TimeoutSeconds + -> MetadataMap + -> (ServerCall -> IO (Either GRPCIOError a)) + -> IO (Either GRPCIOError a) +withServerCall server regmethod timeout initMeta f = do + createResult <- serverCreateCall server regmethod timeout initMeta case createResult of Left x -> return $ Left x Right call -> f call `finally` logDestroy call where logDestroy c = grpcDebug "withServerRegisteredCall: destroying." - >> destroyServerRegCall c + >> destroyServerCall c -- | Sequence of 'Op's needed to receive a normal (non-streaming) call. serverOpsGetNormalCall :: MetadataMap -> [Op] @@ -174,44 +176,49 @@ serverOpsSendNormalRegisteredResponse OpSendMessage body, OpSendStatusFromServer trailingMeta code details] +-- | A handler for an registered server call; bytestring parameter is request +-- body, with the bytestring response body in the result tuple. The first +-- metadata parameter refers to the request metadata, with the two metadata +-- values in the result tuple being the initial and trailing metadata +-- respectively. + +-- TODO: make a more rigid type for this with a Maybe MetadataMap for the +-- trailing meta, and use it for both kinds of call handlers. +type ServerHandler + = ByteString -> MetadataMap + -> IO (ByteString, MetadataMap, MetadataMap, StatusDetails) + -- TODO: we will want to replace this with some more general concept that also -- works with streaming calls in the future. -- | Wait for and then handle a normal (non-streaming) call. -serverHandleNormalRegisteredCall :: Server - -> RegisteredMethod - -> TimeoutSeconds - -> MetadataMap - -- ^ Initial server metadata - -> (ByteString -> MetadataMap - -> IO (ByteString, - MetadataMap, - MetadataMap, - StatusDetails)) - -- ^ Handler function takes a request body and - -- metadata and returns a response body and - -- metadata. - -> IO (Either GRPCIOError ()) -serverHandleNormalRegisteredCall s@Server{..} rm timeLimit srvMetadata f = do +serverHandleNormalCall :: Server + -> RegisteredMethod + -> TimeoutSeconds + -> MetadataMap + -- ^ Initial server metadata + -> ServerHandler + -> IO (Either GRPCIOError ()) +serverHandleNormalCall s@Server{..} rm timeLimit srvMetadata f = do -- TODO: we use this timeLimit twice, so the max time spent is 2*timeLimit. -- Should we just hard-code time limits instead? Not sure if client -- programmer cares, since this function will likely just be put in a loop -- anyway. - withServerRegisteredCall s rm timeLimit srvMetadata $ \call -> do - grpcDebug "serverHandleNormalRegisteredCall: starting batch." - debugServerRegCall call - payload <- serverRegCallGetPayload call + withServerCall s rm timeLimit srvMetadata $ \call -> do + grpcDebug "serverHandleNormalCall(R): starting batch." + debugServerCall call + payload <- serverCallGetPayload call case payload of --TODO: what should we do with an empty payload? Have the handler take -- @Maybe ByteString@? Need to figure out when/why payload would be empty. - Nothing -> error "serverHandleNormalRegisteredCall: payload empty." + Nothing -> error "serverHandleNormalCall(R): payload empty." Just requestBody -> do - requestMeta <- serverRegCallGetMetadata call + requestMeta <- serverCallGetMetadata call (respBody, initMeta, trailingMeta, details) <- f requestBody requestMeta let status = C.GrpcStatusOk let respOps = serverOpsSendNormalRegisteredResponse respBody initMeta trailingMeta status details - respOpsResults <- runServerRegOps call serverCQ respOps timeLimit - grpcDebug "serverHandleNormalRegisteredCall: finished response ops." + respOpsResults <- runServerOps call serverCQ respOps timeLimit + grpcDebug "serverHandleNormalCall(R): finished response ops." case respOpsResults of Left x -> return $ Left x Right _ -> return $ Right () diff --git a/src/Network/GRPC/LowLevel/Server/Unregistered.hs b/src/Network/GRPC/LowLevel/Server/Unregistered.hs index 1a007d4..4069a01 100644 --- a/src/Network/GRPC/LowLevel/Server/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/Server/Unregistered.hs @@ -11,7 +11,9 @@ import qualified Network.GRPC.LowLevel.CompletionQueue.Unregistered as U import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.Op (OpRecvResult (..)) import qualified Network.GRPC.LowLevel.Op.Unregistered as U -import Network.GRPC.LowLevel.Server +import Network.GRPC.LowLevel.Server (Server (..), + serverOpsGetNormalCall, + serverOpsSendNormalResponse) import qualified Network.GRPC.Unsafe.Op as C serverCreateCall :: Server -> TimeoutSeconds @@ -44,7 +46,7 @@ serverHandleNormalCall :: Server -> IO (Either GRPCIOError ()) serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do withServerCall s timeLimit $ \call -> do - grpcDebug "serverHandleCall(U): starting batch." + grpcDebug "serverHandleNormalCall(U): starting batch." let recvOps = serverOpsGetNormalCall srvMetadata opResults <- U.runServerOps call serverCQ recvOps timeLimit case opResults of diff --git a/tests/LowLevelTests.hs b/tests/LowLevelTests.hs index 2e0032c..8f542ce 100644 --- a/tests/LowLevelTests.hs +++ b/tests/LowLevelTests.hs @@ -11,7 +11,8 @@ import Control.Monad import Data.ByteString (ByteString) import qualified Data.Map as M import Network.GRPC.LowLevel -import Network.GRPC.LowLevel.Server.Unregistered as U +import qualified Network.GRPC.LowLevel.Client.Unregistered as U +import qualified Network.GRPC.LowLevel.Server.Unregistered as U import Test.Tasty import Test.Tasty.HUnit as HU (Assertion, assertEqual, @@ -50,14 +51,14 @@ testClientCreateDestroy = testClientCall :: TestTree testClientCall = clientOnlyTest "create/destroy call" $ \c -> do - r <- withClientCall c "foo" 10 $ const $ return $ Right () + r <- U.withClientCall c "/foo" 10 $ const $ return $ Right () r @?= Right () testClientTimeoutNoServer :: TestTree testClientTimeoutNoServer = clientOnlyTest "request timeout when server DNE" $ \c -> do rm <- clientRegisterMethod c "/foo" Normal - r <- clientRegisteredRequest c rm 1 "Hello" mempty + r <- clientRequest c rm 1 "Hello" mempty r @?= Left GRPCIOTimeout testServerCreateDestroy :: TestTree @@ -74,7 +75,7 @@ testServerTimeoutNoClient :: TestTree testServerTimeoutNoClient = serverOnlyTest "wait timeout when client DNE" [("/foo", Normal)] $ \s -> do let rm = head (registeredMethods s) - r <- serverHandleNormalRegisteredCall s rm 1 mempty $ \_ _ -> + r <- serverHandleNormalCall s rm 1 mempty $ \_ _ -> return ("", mempty, mempty, StatusDetails "details") r @?= Left GRPCIOTimeout @@ -89,13 +90,13 @@ testWrongEndpoint = -- further client c = do rm <- clientRegisterMethod c "/bar" Normal - r <- clientRegisteredRequest c rm 1 "Hello!" mempty + r <- clientRequest c rm 1 "Hello!" mempty r @?= Left (GRPCIOBadStatusCode GrpcStatusDeadlineExceeded (StatusDetails "Deadline Exceeded")) server s = do length (registeredMethods s) @?= 1 let rm = head (registeredMethods s) - r <- serverHandleNormalRegisteredCall s rm 10 mempty $ \_ _ -> do + r <- serverHandleNormalCall s rm 10 mempty $ \_ _ -> do return ("reply test", dummyMeta, dummyMeta, StatusDetails "details string") r @?= Right () @@ -112,7 +113,7 @@ testPayload = clientMD = [("foo_key", "foo_val"), ("bar_key", "bar_val")] client c = do rm <- clientRegisterMethod c "/foo" Normal - clientRegisteredRequest c rm 10 "Hello!" clientMD >>= do + clientRequest c rm 10 "Hello!" clientMD >>= do checkReqRslt $ \NormalRequestResult{..} -> do rspCode @?= GrpcStatusOk rspBody @?= "reply test" @@ -122,7 +123,7 @@ testPayload = server s = do length (registeredMethods s) @?= 1 let rm = head (registeredMethods s) - r <- serverHandleNormalRegisteredCall s rm 11 mempty $ \reqBody reqMD -> do + r <- serverHandleNormalCall s rm 11 mempty $ \reqBody reqMD -> do reqBody @?= "Hello!" checkMD "Server metadata mismatch" clientMD reqMD return ("reply test", dummyMeta, dummyMeta, StatusDetails "details string") @@ -133,13 +134,13 @@ testPayloadUnregistered = csTest "unregistered normal request/response" client server [] where client c = do - clientRequest c "/foo" 10 "Hello!" mempty >>= do + U.clientRequest c "/foo" 10 "Hello!" mempty >>= do checkReqRslt $ \NormalRequestResult{..} -> do rspCode @?= GrpcStatusOk rspBody @?= "reply test" details @?= "details string" server s = do - r <- serverHandleNormalCall s 11 mempty $ \body _md meth -> do + r <- U.serverHandleNormalCall s 11 mempty $ \body _md meth -> do body @?= "Hello!" meth @?= "/foo" return ("reply test", mempty, "details string") From 4780a0c8ed6c9b0cde52ff5d19cd8b10b58a93f9 Mon Sep 17 00:00:00 2001 From: Joel Stanley Date: Wed, 8 Jun 2016 15:03:35 -0500 Subject: [PATCH 8/9] Drop distinction between runServerOps and runClientOps --- src/Network/GRPC/LowLevel.hs | 2 - src/Network/GRPC/LowLevel/Client.hs | 5 +- .../GRPC/LowLevel/Client/Unregistered.hs | 23 +++--- src/Network/GRPC/LowLevel/Op.hs | 71 ++++++++----------- src/Network/GRPC/LowLevel/Server.hs | 2 +- 5 files changed, 44 insertions(+), 59 deletions(-) diff --git a/src/Network/GRPC/LowLevel.hs b/src/Network/GRPC/LowLevel.hs index 7485fda..3976d07 100644 --- a/src/Network/GRPC/LowLevel.hs +++ b/src/Network/GRPC/LowLevel.hs @@ -47,8 +47,6 @@ GRPC , withClientCall -- * Ops -, runClientOps -, runServerOps , Op(..) , OpRecvResult(..) diff --git a/src/Network/GRPC/LowLevel/Client.hs b/src/Network/GRPC/LowLevel/Client.hs index bbb2e8b..44d78a1 100644 --- a/src/Network/GRPC/LowLevel/Client.hs +++ b/src/Network/GRPC/LowLevel/Client.hs @@ -153,12 +153,13 @@ clientRequest client@(Client{..}) rm@(RegisteredMethod{..}) Normal -> withClientCall client rm timeLimit $ \call -> do grpcDebug "clientRequest(R): created call." debugClientCall call + let call' = unClientCall call -- NOTE: sendOps and recvOps *must* be in separate batches or -- the client hangs when the server can't be reached. let sendOps = [OpSendInitialMetadata meta , OpSendMessage body , OpSendCloseFromClient] - sendRes <- runClientOps call clientCQ sendOps timeLimit + sendRes <- runOps call' clientCQ sendOps timeLimit case sendRes of Left x -> do grpcDebug "clientRequest(R) : batch error." return $ Left x @@ -166,7 +167,7 @@ clientRequest client@(Client{..}) rm@(RegisteredMethod{..}) let recvOps = [OpRecvInitialMetadata, OpRecvMessage, OpRecvStatusOnClient] - recvRes <- runClientOps call clientCQ recvOps timeLimit + recvRes <- runOps call' clientCQ recvOps timeLimit case recvRes of Left x -> do grpcDebug "clientRequest(R): batch error." diff --git a/src/Network/GRPC/LowLevel/Client/Unregistered.hs b/src/Network/GRPC/LowLevel/Client/Unregistered.hs index 03b6082..1623801 100644 --- a/src/Network/GRPC/LowLevel/Client/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/Client/Unregistered.hs @@ -48,23 +48,22 @@ withClientCall client method timeout f = do >> destroyClientCall c -- | Makes a normal (non-streaming) request without needing to register a method --- first. Probably only useful for testing. TODO: This is preliminary, like --- 'clientRegisteredRequest'. +-- first. Probably only useful for testing. clientRequest :: Client - -> MethodName - -- ^ Method name, e.g. "/foo" - -> TimeoutSeconds - -- ^ "Number of seconds until request times out" - -> ByteString - -- ^ Request body. - -> MetadataMap - -- ^ Request metadata. - -> IO (Either GRPCIOError NormalRequestResult) + -> MethodName + -- ^ Method name, e.g. "/foo" + -> TimeoutSeconds + -- ^ "Number of seconds until request times out" + -> ByteString + -- ^ Request body. + -> MetadataMap + -- ^ Request metadata. + -> IO (Either GRPCIOError NormalRequestResult) clientRequest client@Client{..} meth timeLimit body meta = fmap join $ do withClientCall client meth timeLimit $ \call -> do let ops = clientNormalRequestOps body meta - results <- runClientOps call clientCQ ops timeLimit + results <- runOps (unClientCall call) clientCQ ops timeLimit grpcDebug "clientRequest(U): ops ran." case results of Left x -> return $ Left x diff --git a/src/Network/GRPC/LowLevel/Op.hs b/src/Network/GRPC/LowLevel/Op.hs index ce17f1d..b4fa0a6 100644 --- a/src/Network/GRPC/LowLevel/Op.hs +++ b/src/Network/GRPC/LowLevel/Op.hs @@ -181,21 +181,37 @@ resultFromOpContext _ = do grpcDebug "resultFromOpContext: saw non-result op type." return Nothing ---TODO: the list of 'Op's type is less specific than it could be. There are only --- a few different sequences of 'Op's we will see in practice. Once we figure --- out what those are, we should create a more specific sum type. However, since --- ops can fail, the list of 'OpRecvResult' returned by 'runOps' can vary in --- their contents and are perhaps less amenable to simplification. --- In the meantime, from looking at the core tests, it looks like it is safe to --- say that we always get a GRPC_CALL_ERROR_TOO_MANY_OPERATIONS error if we use --- the same 'Op' twice in the same batch, so we might want to change the list to --- a set. I don't think order matters within a batch. Need to check. +-- | For a given call, run the given 'Op's on the given completion queue with +-- the given tag. Blocks until the ops are complete or the given number of +-- seconds have elapsed. TODO: now that we distinguish between different types +-- of calls at the type level, we could try to limit the input 'Op's more +-- appropriately. E.g., we don't use an 'OpRecvInitialMetadata' when receiving a +-- registered call, because gRPC handles that for us. + +-- TODO: the list of 'Op's type is less specific than it could be. There are +-- only a few different sequences of 'Op's we will see in practice. Once we +-- figure out what those are, we should create a more specific sum +-- type. However, since ops can fail, the list of 'OpRecvResult' returned by +-- 'runOps' can vary in their contents and are perhaps less amenable to +-- simplification. In the meantime, from looking at the core tests, it looks +-- like it is safe to say that we always get a +-- GRPC_CALL_ERROR_TOO_MANY_OPERATIONS error if we use the same 'Op' twice in +-- the same batch, so we might want to change the list to a set. I don't think +-- order matters within a batch. Need to check. runOps :: C.Call - -> CompletionQueue - -> [Op] - -> TimeoutSeconds - -> IO (Either GRPCIOError [OpRecvResult]) + -- ^ 'Call' that this batch is associated with. One call can be + -- associated with many batches. + -> CompletionQueue + -- ^ Queue on which our tag will be placed once our ops are done + -- running. + -> [Op] + -- ^ The list of 'Op's to execute. + -> TimeoutSeconds + -- ^ How long to block waiting for the tag to appear on the queue. If + -- we time out, the result of this action will be @CallBatchError + -- BatchTimeout@. + -> IO (Either GRPCIOError [OpRecvResult]) runOps call cq ops timeLimit = let l = length ops in withOpArray l $ \opArray -> do @@ -218,35 +234,6 @@ runOps call cq ops timeLimit = fmap (Right . catMaybes) $ mapM resultFromOpContext contexts Left err -> return $ Left err --- | For a given server call, run the given 'Op's on the given completion queue --- with the given tag. Blocks until the ops are complete or the given number of --- seconds have elapsed. TODO: now that we distinguish between different types --- of calls at the type level, we could try to limit the input 'Op's more --- appropriately. E.g., we don't use an 'OpRecvInitialMetadata' when receiving a --- registered call, because gRPC handles that for us. -runServerOps :: ServerCall - -- ^ 'Call' that this batch is associated with. One call can be - -- associated with many batches. - -> CompletionQueue - -- ^ Queue on which our tag will be placed once our ops are done - -- running. - -> [Op] - -- ^ The list of 'Op's to execute. - -> TimeoutSeconds - -- ^ How long to block waiting for the tag to appear on the - -- queue. If we time out, the result of this action will be - -- @CallBatchError BatchTimeout@. - -> IO (Either GRPCIOError [OpRecvResult]) -runServerOps = runOps . unServerCall - --- | Like 'runServerOps', but for client-side calls. -runClientOps :: ClientCall - -> CompletionQueue - -> [Op] - -> TimeoutSeconds - -> IO (Either GRPCIOError [OpRecvResult]) -runClientOps = runOps . unClientCall - -- | If response status info is present in the given 'OpRecvResult's, returns -- a tuple of trailing metadata, status code, and status details. extractStatusInfo :: [OpRecvResult] diff --git a/src/Network/GRPC/LowLevel/Server.hs b/src/Network/GRPC/LowLevel/Server.hs index b55151f..aaca99b 100644 --- a/src/Network/GRPC/LowLevel/Server.hs +++ b/src/Network/GRPC/LowLevel/Server.hs @@ -217,7 +217,7 @@ serverHandleNormalCall s@Server{..} rm timeLimit srvMetadata f = do let status = C.GrpcStatusOk let respOps = serverOpsSendNormalRegisteredResponse respBody initMeta trailingMeta status details - respOpsResults <- runServerOps call serverCQ respOps timeLimit + respOpsResults <- runOps (unServerCall call) serverCQ respOps timeLimit grpcDebug "serverHandleNormalCall(R): finished response ops." case respOpsResults of Left x -> return $ Left x From acefc35b8fa9ec5d8e7d2be88521921957f712f2 Mon Sep 17 00:00:00 2001 From: Joel Stanley Date: Wed, 8 Jun 2016 15:53:09 -0500 Subject: [PATCH 9/9] Oops: remove runServerOps, -Network.GRPC.LowLevel.Op.Unregistered module as it is no longer needed --- grpc-haskell.cabal | 1 - src/Network/GRPC/LowLevel/Op/Unregistered.hs | 13 ------------- src/Network/GRPC/LowLevel/Server/Unregistered.hs | 12 ++++++------ 3 files changed, 6 insertions(+), 20 deletions(-) delete mode 100644 src/Network/GRPC/LowLevel/Op/Unregistered.hs diff --git a/grpc-haskell.cabal b/grpc-haskell.cabal index a94fb86..2522f79 100644 --- a/grpc-haskell.cabal +++ b/grpc-haskell.cabal @@ -50,7 +50,6 @@ library Network.GRPC.LowLevel.CompletionQueue.Unregistered Network.GRPC.LowLevel.GRPC Network.GRPC.LowLevel.Op - Network.GRPC.LowLevel.Op.Unregistered Network.GRPC.LowLevel.Server Network.GRPC.LowLevel.Call Network.GRPC.LowLevel.Call.Unregistered diff --git a/src/Network/GRPC/LowLevel/Op/Unregistered.hs b/src/Network/GRPC/LowLevel/Op/Unregistered.hs deleted file mode 100644 index 55b4077..0000000 --- a/src/Network/GRPC/LowLevel/Op/Unregistered.hs +++ /dev/null @@ -1,13 +0,0 @@ -module Network.GRPC.LowLevel.Op.Unregistered where - -import Network.GRPC.LowLevel.GRPC -import Network.GRPC.LowLevel.Op -import Network.GRPC.LowLevel.CompletionQueue -import Network.GRPC.LowLevel.Call.Unregistered as U - -runServerOps :: U.ServerCall - -> CompletionQueue - -> [Op] - -> TimeoutSeconds - -> IO (Either GRPCIOError [OpRecvResult]) -runServerOps = runOps . U.unServerCall diff --git a/src/Network/GRPC/LowLevel/Server/Unregistered.hs b/src/Network/GRPC/LowLevel/Server/Unregistered.hs index 4069a01..e513bac 100644 --- a/src/Network/GRPC/LowLevel/Server/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/Server/Unregistered.hs @@ -7,10 +7,9 @@ import Data.ByteString (ByteString) import Network.GRPC.LowLevel.Call (MethodName) import Network.GRPC.LowLevel.Call.Unregistered import Network.GRPC.LowLevel.CompletionQueue (TimeoutSeconds) -import qualified Network.GRPC.LowLevel.CompletionQueue.Unregistered as U +import Network.GRPC.LowLevel.CompletionQueue.Unregistered (serverRequestCall) import Network.GRPC.LowLevel.GRPC -import Network.GRPC.LowLevel.Op (OpRecvResult (..)) -import qualified Network.GRPC.LowLevel.Op.Unregistered as U +import Network.GRPC.LowLevel.Op (OpRecvResult (..), runOps) import Network.GRPC.LowLevel.Server (Server (..), serverOpsGetNormalCall, serverOpsSendNormalResponse) @@ -19,7 +18,7 @@ import qualified Network.GRPC.Unsafe.Op as C serverCreateCall :: Server -> TimeoutSeconds -> IO (Either GRPCIOError ServerCall) serverCreateCall Server{..} timeLimit = - U.serverRequestCall internalServer serverCQ timeLimit + serverRequestCall internalServer serverCQ timeLimit withServerCall :: Server -> TimeoutSeconds -> (ServerCall -> IO (Either GRPCIOError a)) @@ -48,7 +47,8 @@ serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do withServerCall s timeLimit $ \call -> do grpcDebug "serverHandleNormalCall(U): starting batch." let recvOps = serverOpsGetNormalCall srvMetadata - opResults <- U.runServerOps call serverCQ recvOps timeLimit + call' = unServerCall call + opResults <- runOps call' serverCQ recvOps timeLimit case opResults of Left x -> do grpcDebug "serverHandleNormalCall(U): ops failed; aborting" return $ Left x @@ -62,7 +62,7 @@ serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do let status = C.GrpcStatusOk let respOps = serverOpsSendNormalResponse respBody respMetadata status details - respOpsResults <- U.runServerOps call serverCQ respOps timeLimit + respOpsResults <- runOps call' serverCQ respOps timeLimit case respOpsResults of Left x -> do grpcDebug "serverHandleNormalCall(U): resp failed." return $ Left x