diff --git a/examples/echo/echo-client/Main.hs b/examples/echo/echo-client/Main.hs index a430f1f..c5ff1cb 100644 --- a/examples/echo/echo-client/Main.hs +++ b/examples/echo/echo-client/Main.hs @@ -1,19 +1,20 @@ -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedStrings #-} {-# OPTIONS_GHC -fno-warn-missing-signatures #-} {-# OPTIONS_GHC -fno-warn-unused-binds #-} import Control.Monad import Network.GRPC.LowLevel +import qualified Network.GRPC.LowLevel.Client.Unregistered as U echoMethod = MethodName "/echo.Echo/DoEcho" unregistered c = do - clientRequest c echoMethod 1 "hi" mempty + U.clientRequest c echoMethod 1 "hi" mempty registered c = do meth <- clientRegisterMethod c echoMethod Normal - clientRegisteredRequest c meth 1 "hi" mempty + clientRequest c meth 1 "hi" mempty run f = withGRPC $ \g -> withClient g (ClientConfig "localhost" 50051) $ \c -> f c >>= \case diff --git a/examples/echo/echo-server/Main.hs b/examples/echo/echo-server/Main.hs index 30d8b2b..db89af2 100644 --- a/examples/echo/echo-server/Main.hs +++ b/examples/echo/echo-server/Main.hs @@ -1,18 +1,20 @@ +{-# LANGUAGE OverloadedLists #-} {-# LANGUAGE OverloadedStrings #-} +{-# OPTIONS_GHC -fno-warn-missing-signatures #-} +{-# OPTIONS_GHC -fno-warn-unused-binds #-} -import Control.Concurrent.Async (async, wait) -import Control.Monad (forever) -import Data.ByteString (ByteString) -import qualified Data.Map as M -import Network.GRPC.LowLevel - +import Control.Concurrent.Async (async, wait) +import Control.Monad (forever) +import Data.ByteString (ByteString) +import Network.GRPC.LowLevel +import qualified Network.GRPC.LowLevel.Server.Unregistered as U serverMeta :: MetadataMap -serverMeta = M.fromList [("test_meta", "test_meta_value")] +serverMeta = [("test_meta", "test_meta_value")] handler :: ByteString -> MetadataMap -> MethodName -> IO (ByteString, MetadataMap, StatusDetails) -handler reqBody reqMeta method = do +handler reqBody _reqMeta _method = do --putStrLn $ "Got request for method: " ++ show method --putStrLn $ "Got metadata: " ++ show reqMeta return (reqBody, serverMeta, StatusDetails "") @@ -20,7 +22,7 @@ handler reqBody reqMeta method = do unregMain :: IO () unregMain = withGRPC $ \grpc -> do withServer grpc (ServerConfig "localhost" 50051 []) $ \server -> forever $ do - result <- serverHandleNormalCall server 15 serverMeta handler + result <- U.serverHandleNormalCall server 15 serverMeta handler case result of Left x -> putStrLn $ "handle call result error: " ++ show x Right _ -> return () @@ -31,7 +33,7 @@ regMain = withGRPC $ \grpc -> do withServer grpc (ServerConfig "localhost" 50051 methods) $ \server -> forever $ do let method = head (registeredMethods server) - result <- serverHandleNormalRegisteredCall server method 15 serverMeta $ + result <- serverHandleNormalCall server method 15 serverMeta $ \reqBody _reqMeta -> return (reqBody, serverMeta, serverMeta, StatusDetails "") case result of @@ -41,7 +43,7 @@ regMain = withGRPC $ \grpc -> do -- | loop to fork n times regLoop :: Server -> RegisteredMethod -> IO () regLoop server method = forever $ do - result <- serverHandleNormalRegisteredCall server method 15 serverMeta $ + result <- serverHandleNormalCall server method 15 serverMeta $ \reqBody _reqMeta -> return (reqBody, serverMeta, serverMeta, StatusDetails "") case result of diff --git a/grpc-haskell.cabal b/grpc-haskell.cabal index 72373f8..2522f79 100644 --- a/grpc-haskell.cabal +++ b/grpc-haskell.cabal @@ -42,12 +42,17 @@ library Network.GRPC.Unsafe.Op Network.GRPC.Unsafe Network.GRPC.LowLevel + Network.GRPC.LowLevel.Server.Unregistered + Network.GRPC.LowLevel.Client.Unregistered other-modules: Network.GRPC.LowLevel.CompletionQueue + Network.GRPC.LowLevel.CompletionQueue.Internal + Network.GRPC.LowLevel.CompletionQueue.Unregistered Network.GRPC.LowLevel.GRPC Network.GRPC.LowLevel.Op Network.GRPC.LowLevel.Server Network.GRPC.LowLevel.Call + Network.GRPC.LowLevel.Call.Unregistered Network.GRPC.LowLevel.Client extra-libraries: grpc diff --git a/src/Network/GRPC/LowLevel.hs b/src/Network/GRPC/LowLevel.hs index aa96213..3976d07 100644 --- a/src/Network/GRPC/LowLevel.hs +++ b/src/Network/GRPC/LowLevel.hs @@ -29,14 +29,11 @@ GRPC -- * Server , ServerConfig(..) , Server -, ServerRegCall -, ServerUnregCall +, ServerCall , registeredMethods , withServer -, serverHandleNormalRegisteredCall , serverHandleNormalCall -, withServerUnregCall -, withServerRegisteredCall +, withServerCall -- * Client , ClientConfig(..) @@ -46,14 +43,10 @@ GRPC , clientConnectivity , withClient , clientRegisterMethod -, clientRegisteredRequest , clientRequest , withClientCall -- * Ops -, runClientOps -, runServerRegOps -, runServerUnregOps , Op(..) , OpRecvResult(..) diff --git a/src/Network/GRPC/LowLevel/Call.hs b/src/Network/GRPC/LowLevel/Call.hs index d8349ca..2907b01 100644 --- a/src/Network/GRPC/LowLevel/Call.hs +++ b/src/Network/GRPC/LowLevel/Call.hs @@ -1,21 +1,24 @@ -{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE RecordWildCards #-} +-- | This module defines data structures and operations pertaining to registered +-- calls; for unregistered call support, see +-- `Network.GRPC.LowLevel.Call.Unregistered`. module Network.GRPC.LowLevel.Call where import Control.Monad -import Data.ByteString (ByteString) -import Data.String (IsString) -import Foreign.Marshal.Alloc (free) -import Foreign.Ptr (Ptr, nullPtr) -import Foreign.Storable (peek) +import Data.ByteString (ByteString) +import Data.String (IsString) +import Foreign.Marshal.Alloc (free) +import Foreign.Ptr (Ptr, nullPtr) +import Foreign.Storable (peek) -import qualified Network.GRPC.Unsafe as C -import qualified Network.GRPC.Unsafe.Time as C -import qualified Network.GRPC.Unsafe.Metadata as C +import qualified Network.GRPC.Unsafe as C import qualified Network.GRPC.Unsafe.ByteBuffer as C +import qualified Network.GRPC.Unsafe.Metadata as C +import qualified Network.GRPC.Unsafe.Time as C -import Network.GRPC.LowLevel.GRPC (grpcDebug, MetadataMap) +import Network.GRPC.LowLevel.GRPC (MetadataMap, grpcDebug) -- | Models the four types of RPC call supported by gRPC. We currently only -- support the first alternative, and only in a preliminary fashion. @@ -51,59 +54,36 @@ data RegisteredMethod = RegisteredMethod {methodType :: GRPCMethodType, -- | Represents one GRPC call (i.e. request) on the client. -- This is used to associate send/receive 'Op's with a request. -data ClientCall = ClientCall {internalClientCall :: C.Call} +data ClientCall = ClientCall { unClientCall :: C.Call } --- | Represents one registered GRPC call on the server. --- Contains pointers to all the C state needed to respond to a registered call. -data ServerRegCall = ServerRegCall - {internalServerRegCall :: C.Call, - requestMetadataRecvReg :: Ptr C.MetadataArray, - optionalPayload :: Ptr C.ByteBuffer, - parentPtrReg :: Maybe (Ptr C.Call), - callDeadline :: C.CTimeSpecPtr +-- | Represents one registered GRPC call on the server. Contains pointers to all +-- the C state needed to respond to a registered call. +data ServerCall = ServerCall + { unServerCall :: C.Call, + requestMetadataRecv :: Ptr C.MetadataArray, + optionalPayload :: Ptr C.ByteBuffer, + parentPtr :: Maybe (Ptr C.Call), + callDeadline :: C.CTimeSpecPtr } -serverRegCallGetMetadata :: ServerRegCall -> IO MetadataMap -serverRegCallGetMetadata ServerRegCall{..} = do - marray <- peek requestMetadataRecvReg +serverCallGetMetadata :: ServerCall -> IO MetadataMap +serverCallGetMetadata ServerCall{..} = do + marray <- peek requestMetadataRecv C.getAllMetadataArray marray --- | Extract the client request body from the given registered call, if present. --- TODO: the reason this returns @Maybe ByteString@ is because the gRPC library --- calls the underlying out parameter "optional_payload". I am not sure exactly --- in what cases it won't be present. The C++ library checks a --- has_request_payload_ bool and passes in nullptr to request_registered_call --- if the bool is false, so we may be able to do the payload present/absent --- check earlier. -serverRegCallGetPayload :: ServerRegCall -> IO (Maybe ByteString) -serverRegCallGetPayload ServerRegCall{..} = do +-- | Extract the client request body from the given call, if present. TODO: the +-- reason this returns @Maybe ByteString@ is because the gRPC library calls the +-- underlying out parameter "optional_payload". I am not sure exactly in what +-- cases it won't be present. The C++ library checks a has_request_payload_ bool +-- and passes in nullptr to request_registered_call if the bool is false, so we +-- may be able to do the payload present/absent check earlier. +serverCallGetPayload :: ServerCall -> IO (Maybe ByteString) +serverCallGetPayload ServerCall{..} = do bb@(C.ByteBuffer rawPtr) <- peek optionalPayload if rawPtr == nullPtr then return Nothing else Just <$> C.copyByteBufferToByteString bb --- | Represents one unregistered GRPC call on the server. --- Contains pointers to all the C state needed to respond to an unregistered --- call. -data ServerUnregCall = ServerUnregCall - {internalServerUnregCall :: C.Call, - requestMetadataRecvUnreg :: Ptr C.MetadataArray, - parentPtrUnreg :: Maybe (Ptr C.Call), - callDetails :: C.CallDetails} - -serverUnregCallGetMetadata :: ServerUnregCall -> IO MetadataMap -serverUnregCallGetMetadata ServerUnregCall{..} = do - marray <- peek requestMetadataRecvUnreg - C.getAllMetadataArray marray - -serverUnregCallGetMethodName :: ServerUnregCall -> IO MethodName -serverUnregCallGetMethodName ServerUnregCall{..} = - MethodName <$> C.callDetailsGetMethod callDetails - -serverUnregCallGetHost :: ServerUnregCall -> IO Host -serverUnregCallGetHost ServerUnregCall{..} = - Host <$> C.callDetailsGetHost callDetails - debugClientCall :: ClientCall -> IO () {-# INLINE debugClientCall #-} #ifdef DEBUG @@ -113,84 +93,47 @@ debugClientCall (ClientCall (C.Call ptr)) = debugClientCall = const $ return () #endif -debugServerRegCall :: ServerRegCall -> IO () +debugServerCall :: ServerCall -> IO () #ifdef DEBUG -debugServerRegCall call@(ServerRegCall (C.Call ptr) _ _ _ _) = do - grpcDebug $ "debugServerRegCall: server call: " ++ (show ptr) - grpcDebug $ "debugServerRegCall: metadata ptr: " - ++ show (requestMetadataRecvReg call) - metadataArr <- peek (requestMetadataRecvReg call) +debugServerCall call@(ServerCall (C.Call ptr) _ _ _ _) = do + grpcDebug $ "debugServerCall(R): server call: " ++ (show ptr) + grpcDebug $ "debugServerCall(R): metadata ptr: " + ++ show (requestMetadataRecv call) + metadataArr <- peek (requestMetadataRecv call) metadata <- C.getAllMetadataArray metadataArr - grpcDebug $ "debugServerRegCall: metadata received: " ++ (show metadata) - grpcDebug $ "debugServerRegCall: payload ptr: " ++ show (optionalPayload call) + grpcDebug $ "debugServerCall(R): metadata received: " ++ (show metadata) + grpcDebug $ "debugServerCall(R): payload ptr: " ++ show (optionalPayload call) payload <- peek (optionalPayload call) bs <- C.copyByteBufferToByteString payload - grpcDebug $ "debugServerRegCall: payload contents: " ++ show bs - forM_ (parentPtrReg call) $ \parentPtr' -> do - grpcDebug $ "debugServerRegCall: parent ptr: " ++ show parentPtr' + grpcDebug $ "debugServerCall(R): payload contents: " ++ show bs + forM_ (parentPtr call) $ \parentPtr' -> do + grpcDebug $ "debugServerCall(R): parent ptr: " ++ show parentPtr' (C.Call parent) <- peek parentPtr' - grpcDebug $ "debugServerRegCall: parent: " ++ show parent - grpcDebug $ "debugServerRegCall: deadline ptr: " ++ show (callDeadline call) + grpcDebug $ "debugServerCall(R): parent: " ++ show parent + grpcDebug $ "debugServerCall(R): deadline ptr: " ++ show (callDeadline call) timespec <- peek (callDeadline call) - grpcDebug $ "debugServerRegCall: deadline: " ++ show (C.timeSpec timespec) + grpcDebug $ "debugServerCall(R): deadline: " ++ show (C.timeSpec timespec) #else -{-# INLINE debugServerRegCall #-} -debugServerRegCall = const $ return () +{-# INLINE debugServerCall #-} +debugServerCall = const $ return () #endif -debugServerUnregCall :: ServerUnregCall -> IO () -#ifdef DEBUG -debugServerUnregCall call@(ServerUnregCall (C.Call ptr) _ _ _) = do - grpcDebug $ "debugServerUnregCall: server call: " ++ (show ptr) - grpcDebug $ "debugServerUnregCall: metadata ptr: " - ++ show (requestMetadataRecvUnreg call) - metadataArr <- peek (requestMetadataRecvUnreg call) - metadata <- C.getAllMetadataArray metadataArr - grpcDebug $ "debugServerUnregCall: metadata received: " ++ (show metadata) - forM_ (parentPtrUnreg call) $ \parentPtr' -> do - grpcDebug $ "debugServerRegCall: parent ptr: " ++ show parentPtr' - (C.Call parent) <- peek parentPtr' - grpcDebug $ "debugServerRegCall: parent: " ++ show parent - grpcDebug $ "debugServerUnregCall: callDetails ptr: " - ++ show (callDetails call) - --TODO: need functions for getting data out of call_details. -#else -{-# INLINE debugServerUnregCall #-} -debugServerUnregCall = const $ return () -#endif - - destroyClientCall :: ClientCall -> IO () destroyClientCall ClientCall{..} = do grpcDebug "Destroying client-side call object." - C.grpcCallDestroy internalClientCall + C.grpcCallDestroy unClientCall -destroyServerRegCall :: ServerRegCall -> IO () -destroyServerRegCall call@ServerRegCall{..} = do - grpcDebug "destroyServerRegCall: entered." - debugServerRegCall call - grpcDebug $ "Destroying server-side call object: " - ++ show internalServerRegCall - C.grpcCallDestroy internalServerRegCall - grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecvReg - C.metadataArrayDestroy requestMetadataRecvReg +destroyServerCall :: ServerCall -> IO () +destroyServerCall call@ServerCall{..} = do + grpcDebug "destroyServerCall(R): entered." + debugServerCall call + grpcDebug $ "Destroying server-side call object: " ++ show unServerCall + C.grpcCallDestroy unServerCall + grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecv + C.metadataArrayDestroy requestMetadataRecv grpcDebug $ "destroying optional payload" ++ show optionalPayload C.destroyReceivingByteBuffer optionalPayload - grpcDebug $ "freeing parentPtr: " ++ show parentPtrReg - forM_ parentPtrReg free + grpcDebug $ "freeing parentPtr: " ++ show parentPtr + forM_ parentPtr free grpcDebug $ "destroying deadline." ++ show callDeadline C.timespecDestroy callDeadline - -destroyServerUnregCall :: ServerUnregCall -> IO () -destroyServerUnregCall call@ServerUnregCall{..} = do - grpcDebug "destroyServerUnregCall: entered." - debugServerUnregCall call - grpcDebug $ "Destroying server-side call object: " - ++ show internalServerUnregCall - C.grpcCallDestroy internalServerUnregCall - grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecvUnreg - C.metadataArrayDestroy requestMetadataRecvUnreg - grpcDebug $ "freeing parentPtrUnreg: " ++ show parentPtrUnreg - forM_ parentPtrUnreg free - grpcDebug $ "destroying call details: " ++ show callDetails - C.destroyCallDetails callDetails diff --git a/src/Network/GRPC/LowLevel/Call/Unregistered.hs b/src/Network/GRPC/LowLevel/Call/Unregistered.hs new file mode 100644 index 0000000..dc714f3 --- /dev/null +++ b/src/Network/GRPC/LowLevel/Call/Unregistered.hs @@ -0,0 +1,69 @@ +{-# LANGUAGE RecordWildCards #-} + +module Network.GRPC.LowLevel.Call.Unregistered where + +import Control.Monad +import Foreign.Marshal.Alloc (free) +import Foreign.Ptr (Ptr) +import Foreign.Storable (peek) +import Network.GRPC.LowLevel.Call (Host (..), MethodName (..)) +import Network.GRPC.LowLevel.GRPC (MetadataMap, grpcDebug) +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Metadata as C + +-- | Represents one unregistered GRPC call on the server. +-- Contains pointers to all the C state needed to respond to an unregistered +-- call. +data ServerCall = ServerCall + { unServerCall :: C.Call + , requestMetadataRecv :: Ptr C.MetadataArray + , parentPtr :: Maybe (Ptr C.Call) + , callDetails :: C.CallDetails + } + +serverCallGetMetadata :: ServerCall -> IO MetadataMap +serverCallGetMetadata ServerCall{..} = do + marray <- peek requestMetadataRecv + C.getAllMetadataArray marray + +serverCallGetMethodName :: ServerCall -> IO MethodName +serverCallGetMethodName ServerCall{..} = + MethodName <$> C.callDetailsGetMethod callDetails + +serverCallGetHost :: ServerCall -> IO Host +serverCallGetHost ServerCall{..} = + Host <$> C.callDetailsGetHost callDetails + +debugServerCall :: ServerCall -> IO () +#ifdef DEBUG +debugServerCall call@(ServerCall (C.Call ptr) _ _ _) = do + grpcDebug $ "debugServerCall(U): server call: " ++ (show ptr) + grpcDebug $ "debugServerCall(U): metadata ptr: " + ++ show (requestMetadataRecv call) + metadataArr <- peek (requestMetadataRecv call) + metadata <- C.getAllMetadataArray metadataArr + grpcDebug $ "debugServerCall(U): metadata received: " ++ (show metadata) + forM_ (parentPtr call) $ \parentPtr' -> do + grpcDebug $ "debugServerCall(U): parent ptr: " ++ show parentPtr' + (C.Call parent) <- peek parentPtr' + grpcDebug $ "debugServerCall(U): parent: " ++ show parent + grpcDebug $ "debugServerCall(U): callDetails ptr: " + ++ show (callDetails call) + --TODO: need functions for getting data out of call_details. +#else +{-# INLINE debugServerCall #-} +debugServerCall = const $ return () +#endif + +destroyServerCall :: ServerCall -> IO () +destroyServerCall call@ServerCall{..} = do + grpcDebug "destroyServerCall(U): entered." + debugServerCall call + grpcDebug $ "Destroying server-side call object: " ++ show unServerCall + C.grpcCallDestroy unServerCall + grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecv + C.metadataArrayDestroy requestMetadataRecv + grpcDebug $ "freeing parentPtr: " ++ show parentPtr + forM_ parentPtr free + grpcDebug $ "destroying call details: " ++ show callDetails + C.destroyCallDetails callDetails diff --git a/src/Network/GRPC/LowLevel/Client.hs b/src/Network/GRPC/LowLevel/Client.hs index 3b57381..44d78a1 100644 --- a/src/Network/GRPC/LowLevel/Client.hs +++ b/src/Network/GRPC/LowLevel/Client.hs @@ -1,25 +1,28 @@ {-# LANGUAGE RecordWildCards #-} +-- | This module defines data structures and operations pertaining to registered +-- clients using registered calls; for unregistered support, see +-- `Network.GRPC.LowLevel.Client.Unregistered`. module Network.GRPC.LowLevel.Client where -import Control.Exception (bracket, finally) -import Control.Monad (join) -import Data.ByteString (ByteString) -import Foreign.Ptr (nullPtr) -import qualified Network.GRPC.Unsafe as C -import qualified Network.GRPC.Unsafe.Time as C -import qualified Network.GRPC.Unsafe.Constants as C -import qualified Network.GRPC.Unsafe.Op as C +import Control.Exception (bracket, finally) +import Control.Monad (join) +import Data.ByteString (ByteString) +import Foreign.Ptr (nullPtr) +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Constants as C +import qualified Network.GRPC.Unsafe.Op as C +import qualified Network.GRPC.Unsafe.Time as C -import Network.GRPC.LowLevel.GRPC -import Network.GRPC.LowLevel.CompletionQueue import Network.GRPC.LowLevel.Call +import Network.GRPC.LowLevel.CompletionQueue +import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.Op -- | Represents the context needed to perform client-side gRPC operations. data Client = Client {clientChannel :: C.Channel, - clientCQ :: CompletionQueue, - clientConfig :: ClientConfig + clientCQ :: CompletionQueue, + clientConfig :: ClientConfig } -- | Configuration necessary to set up a client. @@ -55,7 +58,7 @@ clientConnectivity Client{..} = C.grpcChannelCheckConnectivityState clientChannel False -- | Register a method on the client so that we can call it with --- 'clientRegisteredRequest'. +-- 'clientRequest'. clientRegisterMethod :: Client -> MethodName -- ^ method name, e.g. "/foo" @@ -71,54 +74,31 @@ clientRegisterMethod _ _ _ = error "Streaming methods not yet implemented." -- | Create a new call on the client for a registered method. -- Returns 'Left' if the CQ is shutting down or if the job to create a call -- timed out. -clientCreateRegisteredCall :: Client -> RegisteredMethod -> TimeoutSeconds - -> IO (Either GRPCIOError ClientCall) -clientCreateRegisteredCall Client{..} RegisteredMethod{..} timeout = do +clientCreateCall :: Client + -> RegisteredMethod + -> TimeoutSeconds + -> IO (Either GRPCIOError ClientCall) +clientCreateCall Client{..} RegisteredMethod{..} timeout = do let parentCall = C.Call nullPtr --Unsure what this does. null is safe, though. C.withDeadlineSeconds timeout $ \deadline -> do - channelCreateRegisteredCall clientChannel parentCall C.propagateDefaults - clientCQ methodHandle deadline + channelCreateCall clientChannel parentCall C.propagateDefaults + clientCQ methodHandle deadline -- TODO: the error-handling refactor made this quite ugly. It could be fixed -- by switching to ExceptT IO. -- | Handles safe creation and cleanup of a client call -withClientRegisteredCall :: Client -> RegisteredMethod -> TimeoutSeconds - -> (ClientCall - -> IO (Either GRPCIOError a)) - -> IO (Either GRPCIOError a) -withClientRegisteredCall client regmethod timeout f = do - createResult <- clientCreateRegisteredCall client regmethod timeout - case createResult of - Left x -> return $ Left x - Right call -> f call `finally` logDestroy call - where logDestroy c = grpcDebug "withClientRegisteredCall: destroying." - >> destroyClientCall c - --- | Create a call on the client for an endpoint without using the --- method registration machinery. In practice, we'll probably only use the --- registered method version, but we include this for completeness and testing. -clientCreateCall :: Client - -> MethodName - -> TimeoutSeconds - -> IO (Either GRPCIOError ClientCall) -clientCreateCall Client{..} meth timeout = do - let parentCall = C.Call nullPtr - C.withDeadlineSeconds timeout $ \deadline -> do - channelCreateCall clientChannel parentCall C.propagateDefaults - clientCQ meth (clientEndpoint clientConfig) deadline - withClientCall :: Client - -> MethodName + -> RegisteredMethod -> TimeoutSeconds -> (ClientCall -> IO (Either GRPCIOError a)) -> IO (Either GRPCIOError a) -withClientCall client method timeout f = do - createResult <- clientCreateCall client method timeout +withClientCall client regmethod timeout f = do + createResult <- clientCreateCall client regmethod timeout case createResult of Left x -> return $ Left x Right call -> f call `finally` logDestroy call - where logDestroy c = grpcDebug "withClientCall: destroying." - >> destroyClientCall c + where logDestroy c = grpcDebug "withClientCall(R): destroying." + >> destroyClientCall c data NormalRequestResult = NormalRequestResult { rspBody :: ByteString @@ -154,71 +134,48 @@ compileNormalRequestResults x = -- server's response. TODO: This is preliminary until we figure out how many -- different variations on sending request ops will be needed for full gRPC -- functionality. -clientRegisteredRequest :: Client - -> RegisteredMethod - -> TimeoutSeconds - -- ^ Timeout of both the grpc_call and the - -- max time to wait for the completion of the batch. - -- TODO: I think we will need to decouple the - -- lifetime of the call from the queue deadline once - -- we expose functionality for streaming calls, where - -- one call object persists across many batches. - -> ByteString - -- ^ The body of the request. - -> MetadataMap - -- ^ Metadata to send with the request. - -> IO (Either GRPCIOError NormalRequestResult) -clientRegisteredRequest client@(Client{..}) rm@(RegisteredMethod{..}) - timeLimit body meta = +clientRequest :: Client + -> RegisteredMethod + -> TimeoutSeconds + -- ^ Timeout of both the grpc_call and the max time to wait for + -- the completion of the batch. TODO: I think we will need to + -- decouple the lifetime of the call from the queue deadline once + -- we expose functionality for streaming calls, where one call + -- object persists across many batches. + -> ByteString + -- ^ The body of the request + -> MetadataMap + -- ^ Metadata to send with the request + -> IO (Either GRPCIOError NormalRequestResult) +clientRequest client@(Client{..}) rm@(RegisteredMethod{..}) + timeLimit body meta = fmap join $ case methodType of - Normal -> withClientRegisteredCall client rm timeLimit $ \call -> do - grpcDebug "clientRegisteredRequest: created call." + Normal -> withClientCall client rm timeLimit $ \call -> do + grpcDebug "clientRequest(R): created call." debugClientCall call + let call' = unClientCall call -- NOTE: sendOps and recvOps *must* be in separate batches or -- the client hangs when the server can't be reached. let sendOps = [OpSendInitialMetadata meta , OpSendMessage body , OpSendCloseFromClient] - sendRes <- runClientOps call clientCQ sendOps timeLimit + sendRes <- runOps call' clientCQ sendOps timeLimit case sendRes of - Left x -> do grpcDebug "clientRegisteredRequest: batch error." + Left x -> do grpcDebug "clientRequest(R) : batch error." return $ Left x Right rs -> do let recvOps = [OpRecvInitialMetadata, OpRecvMessage, OpRecvStatusOnClient] - recvRes <- runClientOps call clientCQ recvOps timeLimit + recvRes <- runOps call' clientCQ recvOps timeLimit case recvRes of Left x -> do - grpcDebug "clientRegisteredRequest: batch error." + grpcDebug "clientRequest(R): batch error." return $ Left x Right rs' -> do return $ Right $ compileNormalRequestResults (rs ++ rs') _ -> error "Streaming methods not yet implemented." --- | Makes a normal (non-streaming) request without needing to register a method --- first. Probably only useful for testing. TODO: This is preliminary, like --- 'clientRegisteredRequest'. -clientRequest :: Client - -> MethodName - -- ^ Method name, e.g. "/foo" - -> TimeoutSeconds - -- ^ "Number of seconds until request times out" - -> ByteString - -- ^ Request body. - -> MetadataMap - -- ^ Request metadata. - -> IO (Either GRPCIOError NormalRequestResult) -clientRequest client@Client{..} meth timeLimit body meta = - fmap join $ do - withClientCall client meth timeLimit $ \call -> do - let ops = clientNormalRequestOps body meta - results <- runClientOps call clientCQ ops timeLimit - grpcDebug "clientRequest: ops ran." - case results of - Left x -> return $ Left x - Right rs -> return $ Right $ compileNormalRequestResults rs - clientNormalRequestOps :: ByteString -> MetadataMap -> [Op] clientNormalRequestOps body metadata = [OpSendInitialMetadata metadata, diff --git a/src/Network/GRPC/LowLevel/Client/Unregistered.hs b/src/Network/GRPC/LowLevel/Client/Unregistered.hs new file mode 100644 index 0000000..1623801 --- /dev/null +++ b/src/Network/GRPC/LowLevel/Client/Unregistered.hs @@ -0,0 +1,70 @@ +{-# LANGUAGE RecordWildCards #-} + +module Network.GRPC.LowLevel.Client.Unregistered where + +import Control.Exception (finally) +import Control.Monad (join) +import Data.ByteString (ByteString) +import Foreign.Ptr (nullPtr) +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Constants as C +import qualified Network.GRPC.Unsafe.Time as C + +import Network.GRPC.LowLevel.Call +import Network.GRPC.LowLevel.Client (Client (..), + NormalRequestResult (..), + clientEndpoint, + clientNormalRequestOps, + compileNormalRequestResults) +import Network.GRPC.LowLevel.CompletionQueue (TimeoutSeconds) +import qualified Network.GRPC.LowLevel.CompletionQueue.Unregistered as U +import Network.GRPC.LowLevel.GRPC +import Network.GRPC.LowLevel.Op + +-- | Create a call on the client for an endpoint without using the +-- method registration machinery. In practice, we'll probably only use the +-- registered method version, but we include this for completeness and testing. +clientCreateCall :: Client + -> MethodName + -> TimeoutSeconds + -> IO (Either GRPCIOError ClientCall) +clientCreateCall Client{..} meth timeout = do + let parentCall = C.Call nullPtr + C.withDeadlineSeconds timeout $ \deadline -> do + U.channelCreateCall clientChannel parentCall C.propagateDefaults + clientCQ meth (clientEndpoint clientConfig) deadline + +withClientCall :: Client + -> MethodName + -> TimeoutSeconds + -> (ClientCall -> IO (Either GRPCIOError a)) + -> IO (Either GRPCIOError a) +withClientCall client method timeout f = do + createResult <- clientCreateCall client method timeout + case createResult of + Left x -> return $ Left x + Right call -> f call `finally` logDestroy call + where logDestroy c = grpcDebug "withClientCall(U): destroying." + >> destroyClientCall c + +-- | Makes a normal (non-streaming) request without needing to register a method +-- first. Probably only useful for testing. +clientRequest :: Client + -> MethodName + -- ^ Method name, e.g. "/foo" + -> TimeoutSeconds + -- ^ "Number of seconds until request times out" + -> ByteString + -- ^ Request body. + -> MetadataMap + -- ^ Request metadata. + -> IO (Either GRPCIOError NormalRequestResult) +clientRequest client@Client{..} meth timeLimit body meta = + fmap join $ do + withClientCall client meth timeLimit $ \call -> do + let ops = clientNormalRequestOps body meta + results <- runOps (unClientCall call) clientCQ ops timeLimit + grpcDebug "clientRequest(U): ops ran." + case results of + Left x -> return $ Left x + Right rs -> return $ Right $ compileNormalRequestResults rs diff --git a/src/Network/GRPC/LowLevel/CompletionQueue.hs b/src/Network/GRPC/LowLevel/CompletionQueue.hs index dd14388..3e78bb9 100644 --- a/src/Network/GRPC/LowLevel/CompletionQueue.hs +++ b/src/Network/GRPC/LowLevel/CompletionQueue.hs @@ -3,6 +3,12 @@ -- cause race conditions, so we only expose functions that are thread safe. -- However, some of the functions we export here can cause memory leaks if used -- improperly. +-- +-- When definition operations which pertain to calls, this module only provides +-- definitions for registered calls; for unregistered variants, see +-- `Network.GRPC.LowLevel.CompletionQueue.Unregistered`. Type definitions and +-- implementation details to both are kept in +-- `Network.GRPC.LowLevel.CompletionQueue.Internal`. {-# LANGUAGE RecordWildCards #-} @@ -13,130 +19,35 @@ module Network.GRPC.LowLevel.CompletionQueue , shutdownCompletionQueue , pluck , startBatch - , channelCreateRegisteredCall , channelCreateCall , TimeoutSeconds , isEventSuccessful , serverRegisterCompletionQueue , serverShutdownAndNotify - , serverRequestRegisteredCall , serverRequestCall , newTag ) where -import Control.Concurrent (forkIO, threadDelay) -import Control.Concurrent.STM (atomically, check, retry) -import Control.Concurrent.STM.TVar (TVar, modifyTVar', newTVarIO, - readTVar, writeTVar) -import Control.Exception (bracket) -import Data.IORef (IORef, atomicModifyIORef', - newIORef) -import Data.List (intersperse) -import Foreign.Marshal.Alloc (free, malloc) -import Foreign.Ptr (nullPtr, plusPtr) -import Foreign.Storable (peek) -import qualified Network.GRPC.Unsafe as C -import qualified Network.GRPC.Unsafe.Constants as C -import qualified Network.GRPC.Unsafe.Metadata as C -import qualified Network.GRPC.Unsafe.Op as C -import qualified Network.GRPC.Unsafe.Time as C -import System.Timeout (timeout) +import Control.Concurrent (forkIO) +import Control.Concurrent.STM (atomically, check) +import Control.Concurrent.STM.TVar (newTVarIO, readTVar, + writeTVar) +import Control.Exception (bracket) +import Data.IORef (newIORef) +import Data.List (intersperse) +import Foreign.Marshal.Alloc (free, malloc) +import Foreign.Storable (peek) +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Constants as C +import qualified Network.GRPC.Unsafe.Metadata as C +import qualified Network.GRPC.Unsafe.Op as C +import qualified Network.GRPC.Unsafe.Time as C +import System.Timeout (timeout) import Network.GRPC.LowLevel.Call import Network.GRPC.LowLevel.GRPC - --- NOTE: the concurrency requirements for a CompletionQueue are a little --- complicated. There are two read operations: next and pluck. We can either --- call next on a CQ or call pluck up to 'maxCompletionQueuePluckers' times --- concurrently, but we can't mix next and pluck calls. Fortunately, we only --- need to use next when we are shutting down the queue. Thus, we do two things --- to shut down: --- 1. Set the shuttingDown 'TVar' to 'True'. When this is set, no new pluck --- calls will be allowed to start. --- 2. Wait until no threads are plucking, as counted by 'currentPluckers'. --- This logic can be seen in 'pluck' and 'shutdownCompletionQueue'. - --- NOTE: There is one more possible race condition: pushing work onto the queue --- after we begin to shut down. --- Solution: another counter, which must reach zero before the shutdown --- can start. - --- TODO: 'currentPushers' currently imposes an arbitrary limit on the number of --- concurrent pushers to the CQ, but I don't know what the limit should be set --- to. I haven't found any documentation that suggests there is a limit imposed --- by the gRPC library, but there might be. Need to investigate further. - --- | Wraps the state necessary to use a gRPC completion queue. Completion queues --- are used to wait for batches gRPC operations ('Op's) to finish running, as --- well as wait for various other operations, such as server shutdown, pinging, --- checking to see if we've been disconnected, and so forth. -data CompletionQueue = CompletionQueue {unsafeCQ :: C.CompletionQueue, - -- ^ All access to this field must be - -- guarded by a check of 'shuttingDown'. - currentPluckers :: TVar Int, - -- ^ Used to limit the number of - -- concurrent calls to pluck on this - -- queue. - -- The max value is set by gRPC in - -- 'C.maxCompletionQueuePluckers' - currentPushers :: TVar Int, - -- ^ Used to prevent new work from - -- being pushed onto the queue when - -- the queue begins to shut down. - shuttingDown :: TVar Bool, - -- ^ Used to prevent new pluck calls on - -- the queue when the queue begins to - -- shut down. - nextTag :: IORef Int - -- ^ Used to supply unique tags for work - -- items pushed onto the queue. - } - --- | Create a new 'C.Tag' for identifying work items on the 'CompletionQueue'. --- This will eventually wrap around after reaching @maxBound :: Int@, but from a --- practical perspective, that should be safe. -newTag :: CompletionQueue -> IO C.Tag -newTag CompletionQueue{..} = do - i <- atomicModifyIORef' nextTag (\i -> (i+1,i)) - return $ C.Tag $ plusPtr nullPtr i - -maxWorkPushers :: Int -maxWorkPushers = 100 --TODO: figure out what this should be. - -data CQOpType = Push | Pluck deriving (Show, Eq, Enum) - -getCount :: CQOpType -> CompletionQueue -> TVar Int -getCount Push = currentPushers -getCount Pluck = currentPluckers - -getLimit :: CQOpType -> Int -getLimit Push = maxWorkPushers -getLimit Pluck = C.maxCompletionQueuePluckers - --- | Safely brackets an operation that pushes work onto or plucks results from --- the given 'CompletionQueue'. -withPermission :: CQOpType - -> CompletionQueue - -> IO (Either GRPCIOError a) - -> IO (Either GRPCIOError a) -withPermission op cq f = - bracket acquire release doOp - where acquire = atomically $ do - isShuttingDown <- readTVar (shuttingDown cq) - if isShuttingDown - then return False - else do currCount <- readTVar $ getCount op cq - if currCount < getLimit op - then modifyTVar' (getCount op cq) (+1) >> return True - else retry - doOp gotResource = if gotResource - then f - else return $ Left GRPCIOShutdown - release gotResource = - if gotResource - then atomically $ modifyTVar' (getCount op cq) (subtract 1) - else return () +import Network.GRPC.LowLevel.CompletionQueue.Internal withCompletionQueue :: GRPC -> (CompletionQueue -> IO a) -> IO a withCompletionQueue grpc = bracket (createCompletionQueue grpc) @@ -151,35 +62,6 @@ createCompletionQueue _ = do nextTag <- newIORef minBound return $ CompletionQueue{..} -type TimeoutSeconds = Int - --- | Translate 'C.Event' to an error. The caller is responsible for ensuring --- that the event actually corresponds to an error condition; a successful event --- will be translated to a 'GRPCIOUnknownError'. -eventToError :: C.Event -> (Either GRPCIOError a) -eventToError (C.Event C.QueueShutdown _ _) = Left GRPCIOShutdown -eventToError (C.Event C.QueueTimeout _ _) = Left GRPCIOTimeout -eventToError _ = Left GRPCIOUnknownError - --- | Returns true iff the given grpc_event was a success. -isEventSuccessful :: C.Event -> Bool -isEventSuccessful (C.Event C.OpComplete True _) = True -isEventSuccessful _ = False - --- | Waits for the given number of seconds for the given tag to appear on the --- completion queue. Throws 'GRPCIOShutdown' if the completion queue is shutting --- down and cannot handle new requests. -pluck :: CompletionQueue -> C.Tag -> TimeoutSeconds - -> IO (Either GRPCIOError ()) -pluck cq@CompletionQueue{..} tag waitSeconds = do - grpcDebug $ "pluck: called with tag: " ++ show tag - ++ " and wait: " ++ show waitSeconds - withPermission Pluck cq $ do - C.withDeadlineSeconds waitSeconds $ \deadline -> do - ev <- C.grpcCompletionQueuePluck unsafeCQ tag deadline C.reserved - grpcDebug $ "pluck: finished. Event: " ++ show ev - return $ if isEventSuccessful ev then Right () else eventToError ev - -- TODO: I'm thinking it might be easier to use 'Either' uniformly everywhere -- even when it's isomorphic to 'Maybe'. If that doesn't turn out to be the -- case, switch these to 'Maybe'. @@ -222,14 +104,17 @@ shutdownCompletionQueue (CompletionQueue{..}) = do C.QueueTimeout -> drainLoop C.OpComplete -> drainLoop -channelCreateRegisteredCall :: C.Channel -> C.Call -> C.PropagationMask - -> CompletionQueue -> C.CallHandle - -> C.CTimeSpecPtr - -> IO (Either GRPCIOError ClientCall) -channelCreateRegisteredCall +channelCreateCall :: C.Channel + -> C.Call + -> C.PropagationMask + -> CompletionQueue + -> C.CallHandle + -> C.CTimeSpecPtr + -> IO (Either GRPCIOError ClientCall) +channelCreateCall chan parent mask cq@CompletionQueue{..} handle deadline = withPermission Push cq $ do - grpcDebug $ "channelCreateRegisteredCall: call with " + grpcDebug $ "channelCreateCall: call with " ++ concat (intersperse " " [show chan, show parent, show mask, show unsafeCQ, show handle, show deadline]) @@ -237,25 +122,14 @@ channelCreateRegisteredCall handle deadline C.reserved return $ Right $ ClientCall call -channelCreateCall :: C.Channel - -> C.Call - -> C.PropagationMask - -> CompletionQueue - -> MethodName - -> Endpoint - -> C.CTimeSpecPtr - -> IO (Either GRPCIOError ClientCall) -channelCreateCall chan parent mask cq@CompletionQueue{..} meth endpt deadline = - withPermission Push cq $ do - call <- C.grpcChannelCreateCall chan parent mask unsafeCQ - (unMethodName meth) (unEndpoint endpt) deadline C.reserved - return $ Right $ ClientCall call - -- | Create the call object to handle a registered call. -serverRequestRegisteredCall :: C.Server -> CompletionQueue -> TimeoutSeconds - -> RegisteredMethod -> MetadataMap - -> IO (Either GRPCIOError ServerRegCall) -serverRequestRegisteredCall +serverRequestCall :: C.Server + -> CompletionQueue + -> TimeoutSeconds + -> RegisteredMethod + -> MetadataMap + -> IO (Either GRPCIOError ServerCall) +serverRequestCall server cq@CompletionQueue{..} timeLimit RegisteredMethod{..} initMeta = withPermission Push cq $ do -- TODO: Is gRPC supposed to populate this deadline? @@ -278,81 +152,33 @@ serverRequestRegisteredCall callError <- C.grpcServerRequestRegisteredCall server methodHandle callPtr deadline metadataArray bbPtr unsafeCQ unsafeCQ tag - grpcDebug $ "serverRequestRegisteredCall: callError: " + grpcDebug $ "serverRequestCall(R): callError: " ++ show callError if callError /= C.CallOk - then do grpcDebug "serverRequestRegisteredCall: callError. cleaning up" + then do grpcDebug "serverRequestCall(R): callError. cleaning up" failureCleanup deadline callPtr metadataArrayPtr bbPtr return $ Left $ GRPCIOCallError callError else do pluckResult <- pluck cq tag timeLimit - grpcDebug "serverRequestRegisteredCall: finished pluck." + grpcDebug "serverRequestCall(R): finished pluck." case pluckResult of Left x -> do - grpcDebug "serverRequestRegisteredCall: cleanup pluck err" + grpcDebug "serverRequestCall(R): cleanup pluck err" failureCleanup deadline callPtr metadataArrayPtr bbPtr return $ Left x Right () -> do rawCall <- peek callPtr - let assembledCall = ServerRegCall rawCall metadataArrayPtr - bbPtr Nothing deadline + let assembledCall = ServerCall rawCall metadataArrayPtr + bbPtr Nothing deadline return $ Right assembledCall -- TODO: see TODO for failureCleanup in serverRequestCall. where failureCleanup deadline callPtr metadataArrayPtr bbPtr = forkIO $ do threadDelaySecs 30 - grpcDebug "serverRequestRegisteredCall: doing delayed cleanup." + grpcDebug "serverRequestCall(R): doing delayed cleanup." C.timespecDestroy deadline free callPtr C.metadataArrayDestroy metadataArrayPtr free bbPtr -serverRequestCall :: C.Server -> CompletionQueue -> TimeoutSeconds - -> IO (Either GRPCIOError ServerUnregCall) -serverRequestCall server cq@CompletionQueue{..} timeLimit = - withPermission Push cq $ do - callPtr <- malloc - grpcDebug $ "serverRequestCall: callPtr is " ++ show callPtr - callDetails <- C.createCallDetails - metadataArrayPtr <- C.metadataArrayCreate - metadataArray <- peek metadataArrayPtr - tag <- newTag cq - callError <- C.grpcServerRequestCall server callPtr callDetails - metadataArray unsafeCQ unsafeCQ tag - grpcDebug $ "serverRequestCall: callError was " ++ show callError - if callError /= C.CallOk - then do grpcDebug "serverRequestCall: got call error; cleaning up." - failureCleanup callPtr callDetails metadataArrayPtr - return $ Left $ GRPCIOCallError callError - else do pluckResult <- pluck cq tag timeLimit - grpcDebug $ "serverRequestCall: pluckResult was " - ++ show pluckResult - case pluckResult of - Left x -> do - grpcDebug "serverRequestCall: pluck error; cleaning up." - failureCleanup callPtr callDetails - metadataArrayPtr - return $ Left x - Right () -> do - rawCall <- peek callPtr - let call = ServerUnregCall rawCall - metadataArrayPtr - Nothing - callDetails - return $ Right call - - --TODO: the gRPC library appears to hold onto these pointers for a random - -- amount of time, even after returning from the only call that uses them. - -- This results in malloc errors if - -- gRPC tries to modify them after we free them. To work around it, - -- we sleep for a while before freeing the objects. We should find a - -- permanent solution that's more robust. - where failureCleanup callPtr callDetails metadataArrayPtr = forkIO $ do - threadDelaySecs 30 - grpcDebug "serverRequestCall: doing delayed cleanup." - free callPtr - C.destroyCallDetails callDetails - C.metadataArrayDestroy metadataArrayPtr - return () - -- | Register the server's completion queue. Must be done before the server is -- started. serverRegisterCompletionQueue :: C.Server -> CompletionQueue -> IO () @@ -362,6 +188,3 @@ serverRegisterCompletionQueue server CompletionQueue{..} = serverShutdownAndNotify :: C.Server -> CompletionQueue -> C.Tag -> IO () serverShutdownAndNotify server CompletionQueue{..} tag = C.grpcServerShutdownAndNotify server unsafeCQ tag - -threadDelaySecs :: Int -> IO () -threadDelaySecs = threadDelay . (* 10^(6::Int)) diff --git a/src/Network/GRPC/LowLevel/CompletionQueue/Internal.hs b/src/Network/GRPC/LowLevel/CompletionQueue/Internal.hs new file mode 100644 index 0000000..1facf67 --- /dev/null +++ b/src/Network/GRPC/LowLevel/CompletionQueue/Internal.hs @@ -0,0 +1,134 @@ +{-# LANGUAGE RecordWildCards #-} + +module Network.GRPC.LowLevel.CompletionQueue.Internal where + +import Control.Concurrent.STM (atomically, retry) +import Control.Concurrent.STM.TVar (TVar, modifyTVar', readTVar) +import Control.Exception (bracket) +import Data.IORef (IORef, atomicModifyIORef') +import Foreign.Ptr (nullPtr, plusPtr) +import Network.GRPC.LowLevel.GRPC +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Constants as C +import qualified Network.GRPC.Unsafe.Time as C + +-- NOTE: the concurrency requirements for a CompletionQueue are a little +-- complicated. There are two read operations: next and pluck. We can either +-- call next on a CQ or call pluck up to 'maxCompletionQueuePluckers' times +-- concurrently, but we can't mix next and pluck calls. Fortunately, we only +-- need to use next when we are shutting down the queue. Thus, we do two things +-- to shut down: +-- 1. Set the shuttingDown 'TVar' to 'True'. When this is set, no new pluck +-- calls will be allowed to start. +-- 2. Wait until no threads are plucking, as counted by 'currentPluckers'. +-- This logic can be seen in 'pluck' and 'shutdownCompletionQueue'. + +-- NOTE: There is one more possible race condition: pushing work onto the queue +-- after we begin to shut down. +-- Solution: another counter, which must reach zero before the shutdown +-- can start. + +-- TODO: 'currentPushers' currently imposes an arbitrary limit on the number of +-- concurrent pushers to the CQ, but I don't know what the limit should be set +-- to. I haven't found any documentation that suggests there is a limit imposed +-- by the gRPC library, but there might be. Need to investigate further. + +-- | Wraps the state necessary to use a gRPC completion queue. Completion queues +-- are used to wait for batches gRPC operations ('Op's) to finish running, as +-- well as wait for various other operations, such as server shutdown, pinging, +-- checking to see if we've been disconnected, and so forth. +data CompletionQueue = CompletionQueue {unsafeCQ :: C.CompletionQueue, + -- ^ All access to this field must be + -- guarded by a check of 'shuttingDown'. + currentPluckers :: TVar Int, + -- ^ Used to limit the number of + -- concurrent calls to pluck on this + -- queue. + -- The max value is set by gRPC in + -- 'C.maxCompletionQueuePluckers' + currentPushers :: TVar Int, + -- ^ Used to prevent new work from + -- being pushed onto the queue when + -- the queue begins to shut down. + shuttingDown :: TVar Bool, + -- ^ Used to prevent new pluck calls on + -- the queue when the queue begins to + -- shut down. + nextTag :: IORef Int + -- ^ Used to supply unique tags for work + -- items pushed onto the queue. + } + +type TimeoutSeconds = Int + +data CQOpType = Push | Pluck deriving (Show, Eq, Enum) + +-- | Create a new 'C.Tag' for identifying work items on the 'CompletionQueue'. +-- This will eventually wrap around after reaching @maxBound :: Int@, but from a +-- practical perspective, that should be safe. +newTag :: CompletionQueue -> IO C.Tag +newTag CompletionQueue{..} = do + i <- atomicModifyIORef' nextTag (\i -> (i+1,i)) + return $ C.Tag $ plusPtr nullPtr i + +-- | Safely brackets an operation that pushes work onto or plucks results from +-- the given 'CompletionQueue'. +withPermission :: CQOpType + -> CompletionQueue + -> IO (Either GRPCIOError a) + -> IO (Either GRPCIOError a) +withPermission op cq f = + bracket acquire release doOp + where acquire = atomically $ do + isShuttingDown <- readTVar (shuttingDown cq) + if isShuttingDown + then return False + else do currCount <- readTVar $ getCount op cq + if currCount < getLimit op + then modifyTVar' (getCount op cq) (+1) >> return True + else retry + doOp gotResource = if gotResource + then f + else return $ Left GRPCIOShutdown + release gotResource = + if gotResource + then atomically $ modifyTVar' (getCount op cq) (subtract 1) + else return () + +-- | Waits for the given number of seconds for the given tag to appear on the +-- completion queue. Throws 'GRPCIOShutdown' if the completion queue is shutting +-- down and cannot handle new requests. +pluck :: CompletionQueue -> C.Tag -> TimeoutSeconds + -> IO (Either GRPCIOError ()) +pluck cq@CompletionQueue{..} tag waitSeconds = do + grpcDebug $ "pluck: called with tag: " ++ show tag + ++ " and wait: " ++ show waitSeconds + withPermission Pluck cq $ do + C.withDeadlineSeconds waitSeconds $ \deadline -> do + ev <- C.grpcCompletionQueuePluck unsafeCQ tag deadline C.reserved + grpcDebug $ "pluck: finished. Event: " ++ show ev + return $ if isEventSuccessful ev then Right () else eventToError ev + +-- | Translate 'C.Event' to an error. The caller is responsible for ensuring +-- that the event actually corresponds to an error condition; a successful event +-- will be translated to a 'GRPCIOUnknownError'. +eventToError :: C.Event -> (Either GRPCIOError a) +eventToError (C.Event C.QueueShutdown _ _) = Left GRPCIOShutdown +eventToError (C.Event C.QueueTimeout _ _) = Left GRPCIOTimeout +eventToError _ = Left GRPCIOUnknownError + +-- | Returns true iff the given grpc_event was a success. +isEventSuccessful :: C.Event -> Bool +isEventSuccessful (C.Event C.OpComplete True _) = True +isEventSuccessful _ = False + +maxWorkPushers :: Int +maxWorkPushers = 100 --TODO: figure out what this should be. + +getCount :: CQOpType -> CompletionQueue -> TVar Int +getCount Push = currentPushers +getCount Pluck = currentPluckers + +getLimit :: CQOpType -> Int +getLimit Push = maxWorkPushers +getLimit Pluck = C.maxCompletionQueuePluckers diff --git a/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs b/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs new file mode 100644 index 0000000..67cc169 --- /dev/null +++ b/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs @@ -0,0 +1,80 @@ +{-# LANGUAGE RecordWildCards #-} + +module Network.GRPC.LowLevel.CompletionQueue.Unregistered where + +import Control.Concurrent (forkIO) +import Foreign.Marshal.Alloc (free, malloc) +import Foreign.Storable (peek) +import Network.GRPC.LowLevel.Call +import qualified Network.GRPC.LowLevel.Call.Unregistered as U +import Network.GRPC.LowLevel.CompletionQueue.Internal +import Network.GRPC.LowLevel.GRPC +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Constants as C +import qualified Network.GRPC.Unsafe.Metadata as C +import qualified Network.GRPC.Unsafe.Time as C + +channelCreateCall :: C.Channel + -> C.Call + -> C.PropagationMask + -> CompletionQueue + -> MethodName + -> Endpoint + -> C.CTimeSpecPtr + -> IO (Either GRPCIOError ClientCall) +channelCreateCall chan parent mask cq@CompletionQueue{..} meth endpt deadline = + withPermission Push cq $ do + call <- C.grpcChannelCreateCall chan parent mask unsafeCQ + (unMethodName meth) (unEndpoint endpt) deadline C.reserved + return $ Right $ ClientCall call + + +serverRequestCall :: C.Server + -> CompletionQueue + -> TimeoutSeconds + -> IO (Either GRPCIOError U.ServerCall) +serverRequestCall server cq@CompletionQueue{..} timeLimit = + withPermission Push cq $ do + callPtr <- malloc + grpcDebug $ "serverRequestCall: callPtr is " ++ show callPtr + callDetails <- C.createCallDetails + metadataArrayPtr <- C.metadataArrayCreate + metadataArray <- peek metadataArrayPtr + tag <- newTag cq + callError <- C.grpcServerRequestCall server callPtr callDetails + metadataArray unsafeCQ unsafeCQ tag + grpcDebug $ "serverRequestCall: callError was " ++ show callError + if callError /= C.CallOk + then do grpcDebug "serverRequestCall: got call error; cleaning up." + failureCleanup callPtr callDetails metadataArrayPtr + return $ Left $ GRPCIOCallError callError + else do pluckResult <- pluck cq tag timeLimit + grpcDebug $ "serverRequestCall: pluckResult was " + ++ show pluckResult + case pluckResult of + Left x -> do + grpcDebug "serverRequestCall: pluck error; cleaning up." + failureCleanup callPtr callDetails + metadataArrayPtr + return $ Left x + Right () -> do + rawCall <- peek callPtr + let call = U.ServerCall rawCall + metadataArrayPtr + Nothing + callDetails + return $ Right call + + --TODO: the gRPC library appears to hold onto these pointers for a random + -- amount of time, even after returning from the only call that uses them. + -- This results in malloc errors if + -- gRPC tries to modify them after we free them. To work around it, + -- we sleep for a while before freeing the objects. We should find a + -- permanent solution that's more robust. + where failureCleanup callPtr callDetails metadataArrayPtr = forkIO $ do + threadDelaySecs 30 + grpcDebug "serverRequestCall: doing delayed cleanup." + free callPtr + C.destroyCallDetails callDetails + C.metadataArrayDestroy metadataArrayPtr + return () diff --git a/src/Network/GRPC/LowLevel/GRPC.hs b/src/Network/GRPC/LowLevel/GRPC.hs index 65529d6..aa4106f 100644 --- a/src/Network/GRPC/LowLevel/GRPC.hs +++ b/src/Network/GRPC/LowLevel/GRPC.hs @@ -3,12 +3,7 @@ {-# LANGUAGE MultiParamTypeClasses #-} module Network.GRPC.LowLevel.GRPC where -{- --- TODO: remove if not needed -import Control.Monad.IO.Class (liftIO, MonadIO) -import Control.Monad.Except (ExceptT(..), runExceptT, throwError, - MonadError) --} +import Control.Concurrent (threadDelay) import Control.Exception import qualified Data.ByteString as B import qualified Data.Map as M @@ -63,6 +58,9 @@ grpcDebug str = do tid <- myThreadId grpcDebug _ = return () #endif +threadDelaySecs :: Int -> IO () +threadDelaySecs = threadDelay . (* 10^(6::Int)) + {- -- TODO: remove this once finally decided on whether to use it. -- | Monad for running gRPC operations. diff --git a/src/Network/GRPC/LowLevel/Op.hs b/src/Network/GRPC/LowLevel/Op.hs index fc39016..b4fa0a6 100644 --- a/src/Network/GRPC/LowLevel/Op.hs +++ b/src/Network/GRPC/LowLevel/Op.hs @@ -7,22 +7,20 @@ import Control.Exception import qualified Data.ByteString as B import qualified Data.Map.Strict as M import Data.Maybe (catMaybes) -import Data.String (IsString) import Foreign.C.String (CString) import Foreign.C.Types (CInt) import Foreign.Marshal.Alloc (free, malloc, mallocBytes) import Foreign.Ptr (Ptr, nullPtr) import Foreign.Storable (peek, poke) +import Network.GRPC.LowLevel.Call +import Network.GRPC.LowLevel.CompletionQueue +import Network.GRPC.LowLevel.GRPC import qualified Network.GRPC.Unsafe as C (Call) import qualified Network.GRPC.Unsafe.ByteBuffer as C import qualified Network.GRPC.Unsafe.Metadata as C import qualified Network.GRPC.Unsafe.Op as C -import Network.GRPC.LowLevel.Call -import Network.GRPC.LowLevel.CompletionQueue -import Network.GRPC.LowLevel.GRPC - -- | Sum describing all possible send and receive operations that can be batched -- and executed by gRPC. Usually these are processed in a handful of -- combinations depending on the 'MethodType' of the call being run. @@ -183,21 +181,37 @@ resultFromOpContext _ = do grpcDebug "resultFromOpContext: saw non-result op type." return Nothing ---TODO: the list of 'Op's type is less specific than it could be. There are only --- a few different sequences of 'Op's we will see in practice. Once we figure --- out what those are, we should create a more specific sum type. However, since --- ops can fail, the list of 'OpRecvResult' returned by 'runOps' can vary in --- their contents and are perhaps less amenable to simplification. --- In the meantime, from looking at the core tests, it looks like it is safe to --- say that we always get a GRPC_CALL_ERROR_TOO_MANY_OPERATIONS error if we use --- the same 'Op' twice in the same batch, so we might want to change the list to --- a set. I don't think order matters within a batch. Need to check. +-- | For a given call, run the given 'Op's on the given completion queue with +-- the given tag. Blocks until the ops are complete or the given number of +-- seconds have elapsed. TODO: now that we distinguish between different types +-- of calls at the type level, we could try to limit the input 'Op's more +-- appropriately. E.g., we don't use an 'OpRecvInitialMetadata' when receiving a +-- registered call, because gRPC handles that for us. + +-- TODO: the list of 'Op's type is less specific than it could be. There are +-- only a few different sequences of 'Op's we will see in practice. Once we +-- figure out what those are, we should create a more specific sum +-- type. However, since ops can fail, the list of 'OpRecvResult' returned by +-- 'runOps' can vary in their contents and are perhaps less amenable to +-- simplification. In the meantime, from looking at the core tests, it looks +-- like it is safe to say that we always get a +-- GRPC_CALL_ERROR_TOO_MANY_OPERATIONS error if we use the same 'Op' twice in +-- the same batch, so we might want to change the list to a set. I don't think +-- order matters within a batch. Need to check. runOps :: C.Call - -> CompletionQueue - -> [Op] - -> TimeoutSeconds - -> IO (Either GRPCIOError [OpRecvResult]) + -- ^ 'Call' that this batch is associated with. One call can be + -- associated with many batches. + -> CompletionQueue + -- ^ Queue on which our tag will be placed once our ops are done + -- running. + -> [Op] + -- ^ The list of 'Op's to execute. + -> TimeoutSeconds + -- ^ How long to block waiting for the tag to appear on the queue. If + -- we time out, the result of this action will be @CallBatchError + -- BatchTimeout@. + -> IO (Either GRPCIOError [OpRecvResult]) runOps call cq ops timeLimit = let l = length ops in withOpArray l $ \opArray -> do @@ -220,48 +234,11 @@ runOps call cq ops timeLimit = fmap (Right . catMaybes) $ mapM resultFromOpContext contexts Left err -> return $ Left err --- | For a given call, run the given 'Op's on the given completion queue with --- the given tag. Blocks until the ops are complete or the given number of --- seconds have elapsed. --- TODO: now that 'ServerRegCall' and 'ServerUnregCall' are separate types, we --- could try to limit the input 'Op's more appropriately. E.g., we don't use --- an 'OpRecvInitialMetadata' when receiving a registered call, because gRPC --- handles that for us. -runServerRegOps :: ServerRegCall - -- ^ 'Call' that this batch is associated with. One call can be - -- associated with many batches. - -> CompletionQueue - -- ^ Queue on which our tag will be placed once our ops are done - -- running. - -> [Op] - -- ^ The list of 'Op's to execute. - -> TimeoutSeconds - -- ^ How long to block waiting for the tag to appear on the - --queue. If we time out, the result of this action will be - -- @CallBatchError BatchTimeout@. - -> IO (Either GRPCIOError [OpRecvResult]) -runServerRegOps = runOps . internalServerRegCall - -runServerUnregOps :: ServerUnregCall - -> CompletionQueue - -> [Op] - -> TimeoutSeconds - -> IO (Either GRPCIOError [OpRecvResult]) -runServerUnregOps = runOps . internalServerUnregCall - --- | Like 'runServerOps', but for client-side calls. -runClientOps :: ClientCall - -> CompletionQueue - -> [Op] - -> TimeoutSeconds - -> IO (Either GRPCIOError [OpRecvResult]) -runClientOps = runOps . internalClientCall - -- | If response status info is present in the given 'OpRecvResult's, returns -- a tuple of trailing metadata, status code, and status details. extractStatusInfo :: [OpRecvResult] -> Maybe (MetadataMap, C.StatusCode, B.ByteString) extractStatusInfo [] = Nothing -extractStatusInfo (res@(OpRecvStatusOnClientResult meta code details):_) = +extractStatusInfo (OpRecvStatusOnClientResult meta code details:_) = Just (meta, code, details) extractStatusInfo (_:xs) = extractStatusInfo xs diff --git a/src/Network/GRPC/LowLevel/Server.hs b/src/Network/GRPC/LowLevel/Server.hs index 4c3bc74..aaca99b 100644 --- a/src/Network/GRPC/LowLevel/Server.hs +++ b/src/Network/GRPC/LowLevel/Server.hs @@ -1,15 +1,14 @@ {-# LANGUAGE RecordWildCards #-} +-- | This module defines data structures and operations pertaining to registered +-- servers using registered calls; for unregistered support, see +-- `Network.GRPC.LowLevel.Server.Unregistered`. module Network.GRPC.LowLevel.Server where -import Control.Concurrent (threadDelay) import Control.Exception (bracket, finally) import Control.Monad import Data.ByteString (ByteString) import Foreign.Ptr (nullPtr) -import qualified Network.GRPC.Unsafe as C -import qualified Network.GRPC.Unsafe.Op as C - import Network.GRPC.LowLevel.Call import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue, TimeoutSeconds, @@ -17,26 +16,27 @@ import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue, pluck, serverRegisterCompletionQueue, serverRequestCall, - serverRequestRegisteredCall, serverShutdownAndNotify, shutdownCompletionQueue) import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.Op +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Op as C -- | Wraps various gRPC state needed to run a server. data Server = Server - { internalServer :: C.Server - , serverCQ :: CompletionQueue + { internalServer :: C.Server + , serverCQ :: CompletionQueue , registeredMethods :: [RegisteredMethod] - , serverConfig :: ServerConfig + , serverConfig :: ServerConfig } -- | Configuration needed to start a server. data ServerConfig = ServerConfig - { host :: Host + { host :: Host -- ^ Name of the host the server is running on. Not sure how this is -- used. Setting to "localhost" works fine in tests. - , port :: Port + , port :: Port -- ^ Port on which to listen for requests. , methodsToRegister :: [(MethodName, GRPCMethodType)] -- ^ List of (method name, method type) tuples specifying all methods to @@ -122,40 +122,27 @@ serverRegisterMethod _ _ _ _ = error "Streaming methods not implemented yet." -- | Create a 'Call' with which to wait for the invocation of a registered -- method. -serverCreateRegisteredCall :: Server -> RegisteredMethod -> TimeoutSeconds - -> MetadataMap - -> IO (Either GRPCIOError ServerRegCall) -serverCreateRegisteredCall Server{..} rm timeLimit initMeta = - serverRequestRegisteredCall internalServer serverCQ timeLimit rm initMeta +serverCreateCall :: Server + -> RegisteredMethod + -> TimeoutSeconds + -> MetadataMap + -> IO (Either GRPCIOError ServerCall) +serverCreateCall Server{..} rm timeLimit initMeta = + serverRequestCall internalServer serverCQ timeLimit rm initMeta -withServerRegisteredCall :: Server -> RegisteredMethod -> TimeoutSeconds - -> MetadataMap - -> (ServerRegCall - -> IO (Either GRPCIOError a)) - -> IO (Either GRPCIOError a) -withServerRegisteredCall server regmethod timeout initMeta f = do - createResult <- serverCreateRegisteredCall server regmethod timeout initMeta +withServerCall :: Server + -> RegisteredMethod + -> TimeoutSeconds + -> MetadataMap + -> (ServerCall -> IO (Either GRPCIOError a)) + -> IO (Either GRPCIOError a) +withServerCall server regmethod timeout initMeta f = do + createResult <- serverCreateCall server regmethod timeout initMeta case createResult of Left x -> return $ Left x Right call -> f call `finally` logDestroy call where logDestroy c = grpcDebug "withServerRegisteredCall: destroying." - >> destroyServerRegCall c - -serverCreateUnregCall :: Server -> TimeoutSeconds - -> IO (Either GRPCIOError ServerUnregCall) -serverCreateUnregCall Server{..} timeLimit = - serverRequestCall internalServer serverCQ timeLimit - -withServerUnregCall :: Server -> TimeoutSeconds - -> (ServerUnregCall -> IO (Either GRPCIOError a)) - -> IO (Either GRPCIOError a) -withServerUnregCall server timeout f = do - createResult <- serverCreateUnregCall server timeout - case createResult of - Left x -> return $ Left x - Right call -> f call `finally` logDestroy call - where logDestroy c = grpcDebug "withServerCall: destroying." - >> destroyServerUnregCall c + >> destroyServerCall c -- | Sequence of 'Op's needed to receive a normal (non-streaming) call. serverOpsGetNormalCall :: MetadataMap -> [Op] @@ -189,84 +176,49 @@ serverOpsSendNormalRegisteredResponse OpSendMessage body, OpSendStatusFromServer trailingMeta code details] +-- | A handler for an registered server call; bytestring parameter is request +-- body, with the bytestring response body in the result tuple. The first +-- metadata parameter refers to the request metadata, with the two metadata +-- values in the result tuple being the initial and trailing metadata +-- respectively. + +-- TODO: make a more rigid type for this with a Maybe MetadataMap for the +-- trailing meta, and use it for both kinds of call handlers. +type ServerHandler + = ByteString -> MetadataMap + -> IO (ByteString, MetadataMap, MetadataMap, StatusDetails) + -- TODO: we will want to replace this with some more general concept that also -- works with streaming calls in the future. -- | Wait for and then handle a normal (non-streaming) call. -serverHandleNormalRegisteredCall :: Server - -> RegisteredMethod - -> TimeoutSeconds - -> MetadataMap - -- ^ Initial server metadata - -> (ByteString -> MetadataMap - -> IO (ByteString, - MetadataMap, - MetadataMap, - StatusDetails)) - -- ^ Handler function takes a request body and - -- metadata and returns a response body and - -- metadata. - -> IO (Either GRPCIOError ()) -serverHandleNormalRegisteredCall s@Server{..} rm timeLimit srvMetadata f = do +serverHandleNormalCall :: Server + -> RegisteredMethod + -> TimeoutSeconds + -> MetadataMap + -- ^ Initial server metadata + -> ServerHandler + -> IO (Either GRPCIOError ()) +serverHandleNormalCall s@Server{..} rm timeLimit srvMetadata f = do -- TODO: we use this timeLimit twice, so the max time spent is 2*timeLimit. -- Should we just hard-code time limits instead? Not sure if client -- programmer cares, since this function will likely just be put in a loop -- anyway. - withServerRegisteredCall s rm timeLimit srvMetadata $ \call -> do - grpcDebug "serverHandleNormalRegisteredCall: starting batch." - debugServerRegCall call - payload <- serverRegCallGetPayload call + withServerCall s rm timeLimit srvMetadata $ \call -> do + grpcDebug "serverHandleNormalCall(R): starting batch." + debugServerCall call + payload <- serverCallGetPayload call case payload of --TODO: what should we do with an empty payload? Have the handler take -- @Maybe ByteString@? Need to figure out when/why payload would be empty. - Nothing -> error "serverHandleNormalRegisteredCall: payload empty." + Nothing -> error "serverHandleNormalCall(R): payload empty." Just requestBody -> do - requestMeta <- serverRegCallGetMetadata call + requestMeta <- serverCallGetMetadata call (respBody, initMeta, trailingMeta, details) <- f requestBody requestMeta let status = C.GrpcStatusOk let respOps = serverOpsSendNormalRegisteredResponse respBody initMeta trailingMeta status details - respOpsResults <- runServerRegOps call serverCQ respOps timeLimit - grpcDebug "serverHandleNormalRegisteredCall: finished response ops." + respOpsResults <- runOps (unServerCall call) serverCQ respOps timeLimit + grpcDebug "serverHandleNormalCall(R): finished response ops." case respOpsResults of Left x -> return $ Left x Right _ -> return $ Right () - --- TODO: This is preliminary. --- We still need to provide the method name to the handler. --- | Handle one unregistered call. -serverHandleNormalCall :: Server -> TimeoutSeconds - -> MetadataMap - -- ^ Initial server metadata. - -> (ByteString -> MetadataMap -> MethodName - -> IO (ByteString, MetadataMap, StatusDetails)) - -- ^ Handler function takes a request body and - -- metadata and returns a response body and metadata. - -> IO (Either GRPCIOError ()) -serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do - withServerUnregCall s timeLimit $ \call -> do - grpcDebug "serverHandleNormalCall: starting batch." - let recvOps = serverOpsGetNormalCall srvMetadata - opResults <- runServerUnregOps call serverCQ recvOps timeLimit - case opResults of - Left x -> do grpcDebug "serverHandleNormalCall: ops failed; aborting" - return $ Left x - Right [OpRecvMessageResult (Just body)] -> do - requestMeta <- serverUnregCallGetMetadata call - grpcDebug $ "got client metadata: " ++ show requestMeta - methodName <- serverUnregCallGetMethodName call - hostName <- serverUnregCallGetHost call - grpcDebug $ "call_details host is: " ++ show hostName - (respBody, respMetadata, details) <- f body requestMeta methodName - let status = C.GrpcStatusOk - let respOps = serverOpsSendNormalResponse - respBody respMetadata status details - respOpsResults <- runServerUnregOps call serverCQ respOps timeLimit - case respOpsResults of - Left x -> do grpcDebug "serverHandleNormalCall: resp failed." - return $ Left x - Right _ -> grpcDebug "serverHandleNormalCall: ops done." - >> return (Right ()) - x -> error $ "impossible pattern match: " ++ show x - -_nowarn_unused :: a -_nowarn_unused = undefined threadDelay diff --git a/src/Network/GRPC/LowLevel/Server/Unregistered.hs b/src/Network/GRPC/LowLevel/Server/Unregistered.hs new file mode 100644 index 0000000..e513bac --- /dev/null +++ b/src/Network/GRPC/LowLevel/Server/Unregistered.hs @@ -0,0 +1,71 @@ +{-# LANGUAGE RecordWildCards #-} + +module Network.GRPC.LowLevel.Server.Unregistered where + +import Control.Exception (finally) +import Data.ByteString (ByteString) +import Network.GRPC.LowLevel.Call (MethodName) +import Network.GRPC.LowLevel.Call.Unregistered +import Network.GRPC.LowLevel.CompletionQueue (TimeoutSeconds) +import Network.GRPC.LowLevel.CompletionQueue.Unregistered (serverRequestCall) +import Network.GRPC.LowLevel.GRPC +import Network.GRPC.LowLevel.Op (OpRecvResult (..), runOps) +import Network.GRPC.LowLevel.Server (Server (..), + serverOpsGetNormalCall, + serverOpsSendNormalResponse) +import qualified Network.GRPC.Unsafe.Op as C + +serverCreateCall :: Server -> TimeoutSeconds + -> IO (Either GRPCIOError ServerCall) +serverCreateCall Server{..} timeLimit = + serverRequestCall internalServer serverCQ timeLimit + +withServerCall :: Server -> TimeoutSeconds + -> (ServerCall -> IO (Either GRPCIOError a)) + -> IO (Either GRPCIOError a) +withServerCall server timeout f = do + createResult <- serverCreateCall server timeout + case createResult of + Left x -> return $ Left x + Right call -> f call `finally` logDestroy call + where logDestroy c = grpcDebug "withServerCall: destroying." + >> destroyServerCall c + +-- | A handler for an unregistered server call; bytestring arguments are the +-- request body and response body respectively. +type ServerHandler + = ByteString -> MetadataMap -> MethodName + -> IO (ByteString, MetadataMap, StatusDetails) + +-- | Handle one unregistered call. +serverHandleNormalCall :: Server + -> TimeoutSeconds + -> MetadataMap -- ^ Initial server metadata. + -> ServerHandler + -> IO (Either GRPCIOError ()) +serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do + withServerCall s timeLimit $ \call -> do + grpcDebug "serverHandleNormalCall(U): starting batch." + let recvOps = serverOpsGetNormalCall srvMetadata + call' = unServerCall call + opResults <- runOps call' serverCQ recvOps timeLimit + case opResults of + Left x -> do grpcDebug "serverHandleNormalCall(U): ops failed; aborting" + return $ Left x + Right [OpRecvMessageResult (Just body)] -> do + requestMeta <- serverCallGetMetadata call + grpcDebug $ "got client metadata: " ++ show requestMeta + methodName <- serverCallGetMethodName call + hostName <- serverCallGetHost call + grpcDebug $ "call_details host is: " ++ show hostName + (respBody, respMetadata, details) <- f body requestMeta methodName + let status = C.GrpcStatusOk + let respOps = serverOpsSendNormalResponse + respBody respMetadata status details + respOpsResults <- runOps call' serverCQ respOps timeLimit + case respOpsResults of + Left x -> do grpcDebug "serverHandleNormalCall(U): resp failed." + return $ Left x + Right _ -> grpcDebug "serverHandleNormalCall(U): ops done." + >> return (Right ()) + x -> error $ "impossible pattern match: " ++ show x diff --git a/tests/LowLevelTests.hs b/tests/LowLevelTests.hs index d2d5d90..8f542ce 100644 --- a/tests/LowLevelTests.hs +++ b/tests/LowLevelTests.hs @@ -5,25 +5,30 @@ module LowLevelTests (lowLevelTests) where -import Control.Concurrent (threadDelay) +import Control.Concurrent (threadDelay) import Control.Concurrent.Async import Control.Monad -import Data.ByteString (ByteString) -import qualified Data.Map as M +import Data.ByteString (ByteString) +import qualified Data.Map as M import Network.GRPC.LowLevel +import qualified Network.GRPC.LowLevel.Client.Unregistered as U +import qualified Network.GRPC.LowLevel.Server.Unregistered as U import Test.Tasty -import Test.Tasty.HUnit as HU (Assertion, assertEqual, - assertFailure, testCase, (@?=)) +import Test.Tasty.HUnit as HU (Assertion, + assertEqual, + assertFailure, + testCase, + (@?=)) lowLevelTests :: TestTree lowLevelTests = testGroup "Unit tests of low-level Haskell library" [ testGRPCBracket , testCompletionQueueCreateDestroy - , testServerCreateDestroy , testClientCreateDestroy - , testWithServerCall - , testWithClientCall + , testClientCall , testClientTimeoutNoServer + , testServerCreateDestroy + , testServerCall , testServerTimeoutNoClient -- , testWrongEndpoint , testPayload @@ -39,38 +44,38 @@ testCompletionQueueCreateDestroy = testCase "Create/destroy CQ" $ withGRPC $ \g -> withCompletionQueue g nop -testServerCreateDestroy :: TestTree -testServerCreateDestroy = - serverOnlyTest "start/stop" [] nop - testClientCreateDestroy :: TestTree testClientCreateDestroy = clientOnlyTest "start/stop" nop -testWithServerCall :: TestTree -testWithServerCall = - serverOnlyTest "create/destroy call" [] $ \s -> do - r <- withServerUnregCall s 1 $ const $ return $ Right () - r @?= Left GRPCIOTimeout - -testWithClientCall :: TestTree -testWithClientCall = +testClientCall :: TestTree +testClientCall = clientOnlyTest "create/destroy call" $ \c -> do - r <- withClientCall c "foo" 10 $ const $ return $ Right () + r <- U.withClientCall c "/foo" 10 $ const $ return $ Right () r @?= Right () testClientTimeoutNoServer :: TestTree testClientTimeoutNoServer = clientOnlyTest "request timeout when server DNE" $ \c -> do rm <- clientRegisterMethod c "/foo" Normal - r <- clientRegisteredRequest c rm 1 "Hello" mempty + r <- clientRequest c rm 1 "Hello" mempty + r @?= Left GRPCIOTimeout + +testServerCreateDestroy :: TestTree +testServerCreateDestroy = + serverOnlyTest "start/stop" [] nop + +testServerCall :: TestTree +testServerCall = + serverOnlyTest "create/destroy call" [] $ \s -> do + r <- U.withServerCall s 1 $ const $ return $ Right () r @?= Left GRPCIOTimeout testServerTimeoutNoClient :: TestTree testServerTimeoutNoClient = serverOnlyTest "wait timeout when client DNE" [("/foo", Normal)] $ \s -> do let rm = head (registeredMethods s) - r <- serverHandleNormalRegisteredCall s rm 1 mempty $ \_ _ -> + r <- serverHandleNormalCall s rm 1 mempty $ \_ _ -> return ("", mempty, mempty, StatusDetails "details") r @?= Left GRPCIOTimeout @@ -85,13 +90,13 @@ testWrongEndpoint = -- further client c = do rm <- clientRegisterMethod c "/bar" Normal - r <- clientRegisteredRequest c rm 1 "Hello!" mempty + r <- clientRequest c rm 1 "Hello!" mempty r @?= Left (GRPCIOBadStatusCode GrpcStatusDeadlineExceeded (StatusDetails "Deadline Exceeded")) server s = do length (registeredMethods s) @?= 1 let rm = head (registeredMethods s) - r <- serverHandleNormalRegisteredCall s rm 10 mempty $ \_ _ -> do + r <- serverHandleNormalCall s rm 10 mempty $ \_ _ -> do return ("reply test", dummyMeta, dummyMeta, StatusDetails "details string") r @?= Right () @@ -108,7 +113,7 @@ testPayload = clientMD = [("foo_key", "foo_val"), ("bar_key", "bar_val")] client c = do rm <- clientRegisterMethod c "/foo" Normal - clientRegisteredRequest c rm 10 "Hello!" clientMD >>= do + clientRequest c rm 10 "Hello!" clientMD >>= do checkReqRslt $ \NormalRequestResult{..} -> do rspCode @?= GrpcStatusOk rspBody @?= "reply test" @@ -118,7 +123,7 @@ testPayload = server s = do length (registeredMethods s) @?= 1 let rm = head (registeredMethods s) - r <- serverHandleNormalRegisteredCall s rm 11 mempty $ \reqBody reqMD -> do + r <- serverHandleNormalCall s rm 11 mempty $ \reqBody reqMD -> do reqBody @?= "Hello!" checkMD "Server metadata mismatch" clientMD reqMD return ("reply test", dummyMeta, dummyMeta, StatusDetails "details string") @@ -129,13 +134,13 @@ testPayloadUnregistered = csTest "unregistered normal request/response" client server [] where client c = do - clientRequest c "/foo" 10 "Hello!" mempty >>= do + U.clientRequest c "/foo" 10 "Hello!" mempty >>= do checkReqRslt $ \NormalRequestResult{..} -> do rspCode @?= GrpcStatusOk rspBody @?= "reply test" details @?= "details string" server s = do - r <- serverHandleNormalCall s 11 mempty $ \body _md meth -> do + r <- U.serverHandleNormalCall s 11 mempty $ \body _md meth -> do body @?= "Hello!" meth @?= "/foo" return ("reply test", mempty, "details string")