diff --git a/examples/echo/echo-client/Main.hs b/examples/echo/echo-client/Main.hs index a430f1f..c5ff1cb 100644 --- a/examples/echo/echo-client/Main.hs +++ b/examples/echo/echo-client/Main.hs @@ -1,19 +1,20 @@ -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedStrings #-} {-# OPTIONS_GHC -fno-warn-missing-signatures #-} {-# OPTIONS_GHC -fno-warn-unused-binds #-} import Control.Monad import Network.GRPC.LowLevel +import qualified Network.GRPC.LowLevel.Client.Unregistered as U echoMethod = MethodName "/echo.Echo/DoEcho" unregistered c = do - clientRequest c echoMethod 1 "hi" mempty + U.clientRequest c echoMethod 1 "hi" mempty registered c = do meth <- clientRegisterMethod c echoMethod Normal - clientRegisteredRequest c meth 1 "hi" mempty + clientRequest c meth 1 "hi" mempty run f = withGRPC $ \g -> withClient g (ClientConfig "localhost" 50051) $ \c -> f c >>= \case diff --git a/examples/echo/echo-server/Main.hs b/examples/echo/echo-server/Main.hs index 3a28d25..db89af2 100644 --- a/examples/echo/echo-server/Main.hs +++ b/examples/echo/echo-server/Main.hs @@ -33,7 +33,7 @@ regMain = withGRPC $ \grpc -> do withServer grpc (ServerConfig "localhost" 50051 methods) $ \server -> forever $ do let method = head (registeredMethods server) - result <- serverHandleNormalRegisteredCall server method 15 serverMeta $ + result <- serverHandleNormalCall server method 15 serverMeta $ \reqBody _reqMeta -> return (reqBody, serverMeta, serverMeta, StatusDetails "") case result of @@ -43,7 +43,7 @@ regMain = withGRPC $ \grpc -> do -- | loop to fork n times regLoop :: Server -> RegisteredMethod -> IO () regLoop server method = forever $ do - result <- serverHandleNormalRegisteredCall server method 15 serverMeta $ + result <- serverHandleNormalCall server method 15 serverMeta $ \reqBody _reqMeta -> return (reqBody, serverMeta, serverMeta, StatusDetails "") case result of diff --git a/grpc-haskell.cabal b/grpc-haskell.cabal index 6375ae2..a94fb86 100644 --- a/grpc-haskell.cabal +++ b/grpc-haskell.cabal @@ -43,6 +43,7 @@ library Network.GRPC.Unsafe Network.GRPC.LowLevel Network.GRPC.LowLevel.Server.Unregistered + Network.GRPC.LowLevel.Client.Unregistered other-modules: Network.GRPC.LowLevel.CompletionQueue Network.GRPC.LowLevel.CompletionQueue.Internal @@ -54,7 +55,6 @@ library Network.GRPC.LowLevel.Call Network.GRPC.LowLevel.Call.Unregistered Network.GRPC.LowLevel.Client - Network.GRPC.LowLevel.Client.Unregistered extra-libraries: grpc includes: diff --git a/src/Network/GRPC/LowLevel.hs b/src/Network/GRPC/LowLevel.hs index 7bab209..7485fda 100644 --- a/src/Network/GRPC/LowLevel.hs +++ b/src/Network/GRPC/LowLevel.hs @@ -29,11 +29,11 @@ GRPC -- * Server , ServerConfig(..) , Server -, ServerRegCall +, ServerCall , registeredMethods , withServer -, serverHandleNormalRegisteredCall -, withServerRegisteredCall +, serverHandleNormalCall +, withServerCall -- * Client , ClientConfig(..) @@ -43,13 +43,12 @@ GRPC , clientConnectivity , withClient , clientRegisterMethod -, clientRegisteredRequest , clientRequest , withClientCall -- * Ops , runClientOps -, runServerRegOps +, runServerOps , Op(..) , OpRecvResult(..) @@ -60,7 +59,6 @@ import Network.GRPC.LowLevel.Server import Network.GRPC.LowLevel.CompletionQueue import Network.GRPC.LowLevel.Op import Network.GRPC.LowLevel.Client -import Network.GRPC.LowLevel.Client.Unregistered import Network.GRPC.LowLevel.Call import Network.GRPC.Unsafe (ConnectivityState(..)) diff --git a/src/Network/GRPC/LowLevel/Call.hs b/src/Network/GRPC/LowLevel/Call.hs index 127b4ca..2907b01 100644 --- a/src/Network/GRPC/LowLevel/Call.hs +++ b/src/Network/GRPC/LowLevel/Call.hs @@ -54,32 +54,31 @@ data RegisteredMethod = RegisteredMethod {methodType :: GRPCMethodType, -- | Represents one GRPC call (i.e. request) on the client. -- This is used to associate send/receive 'Op's with a request. -data ClientCall = ClientCall {internalClientCall :: C.Call} +data ClientCall = ClientCall { unClientCall :: C.Call } --- | Represents one registered GRPC call on the server. --- Contains pointers to all the C state needed to respond to a registered call. -data ServerRegCall = ServerRegCall - {internalServerRegCall :: C.Call, - requestMetadataRecvReg :: Ptr C.MetadataArray, - optionalPayload :: Ptr C.ByteBuffer, - parentPtrReg :: Maybe (Ptr C.Call), - callDeadline :: C.CTimeSpecPtr +-- | Represents one registered GRPC call on the server. Contains pointers to all +-- the C state needed to respond to a registered call. +data ServerCall = ServerCall + { unServerCall :: C.Call, + requestMetadataRecv :: Ptr C.MetadataArray, + optionalPayload :: Ptr C.ByteBuffer, + parentPtr :: Maybe (Ptr C.Call), + callDeadline :: C.CTimeSpecPtr } -serverRegCallGetMetadata :: ServerRegCall -> IO MetadataMap -serverRegCallGetMetadata ServerRegCall{..} = do - marray <- peek requestMetadataRecvReg +serverCallGetMetadata :: ServerCall -> IO MetadataMap +serverCallGetMetadata ServerCall{..} = do + marray <- peek requestMetadataRecv C.getAllMetadataArray marray --- | Extract the client request body from the given registered call, if present. --- TODO: the reason this returns @Maybe ByteString@ is because the gRPC library --- calls the underlying out parameter "optional_payload". I am not sure exactly --- in what cases it won't be present. The C++ library checks a --- has_request_payload_ bool and passes in nullptr to request_registered_call --- if the bool is false, so we may be able to do the payload present/absent --- check earlier. -serverRegCallGetPayload :: ServerRegCall -> IO (Maybe ByteString) -serverRegCallGetPayload ServerRegCall{..} = do +-- | Extract the client request body from the given call, if present. TODO: the +-- reason this returns @Maybe ByteString@ is because the gRPC library calls the +-- underlying out parameter "optional_payload". I am not sure exactly in what +-- cases it won't be present. The C++ library checks a has_request_payload_ bool +-- and passes in nullptr to request_registered_call if the bool is false, so we +-- may be able to do the payload present/absent check earlier. +serverCallGetPayload :: ServerCall -> IO (Maybe ByteString) +serverCallGetPayload ServerCall{..} = do bb@(C.ByteBuffer rawPtr) <- peek optionalPayload if rawPtr == nullPtr then return Nothing @@ -94,48 +93,47 @@ debugClientCall (ClientCall (C.Call ptr)) = debugClientCall = const $ return () #endif -debugServerRegCall :: ServerRegCall -> IO () +debugServerCall :: ServerCall -> IO () #ifdef DEBUG -debugServerRegCall call@(ServerRegCall (C.Call ptr) _ _ _ _) = do - grpcDebug $ "debugServerRegCall: server call: " ++ (show ptr) - grpcDebug $ "debugServerRegCall: metadata ptr: " - ++ show (requestMetadataRecvReg call) - metadataArr <- peek (requestMetadataRecvReg call) +debugServerCall call@(ServerCall (C.Call ptr) _ _ _ _) = do + grpcDebug $ "debugServerCall(R): server call: " ++ (show ptr) + grpcDebug $ "debugServerCall(R): metadata ptr: " + ++ show (requestMetadataRecv call) + metadataArr <- peek (requestMetadataRecv call) metadata <- C.getAllMetadataArray metadataArr - grpcDebug $ "debugServerRegCall: metadata received: " ++ (show metadata) - grpcDebug $ "debugServerRegCall: payload ptr: " ++ show (optionalPayload call) + grpcDebug $ "debugServerCall(R): metadata received: " ++ (show metadata) + grpcDebug $ "debugServerCall(R): payload ptr: " ++ show (optionalPayload call) payload <- peek (optionalPayload call) bs <- C.copyByteBufferToByteString payload - grpcDebug $ "debugServerRegCall: payload contents: " ++ show bs - forM_ (parentPtrReg call) $ \parentPtr' -> do - grpcDebug $ "debugServerRegCall: parent ptr: " ++ show parentPtr' + grpcDebug $ "debugServerCall(R): payload contents: " ++ show bs + forM_ (parentPtr call) $ \parentPtr' -> do + grpcDebug $ "debugServerCall(R): parent ptr: " ++ show parentPtr' (C.Call parent) <- peek parentPtr' - grpcDebug $ "debugServerRegCall: parent: " ++ show parent - grpcDebug $ "debugServerRegCall: deadline ptr: " ++ show (callDeadline call) + grpcDebug $ "debugServerCall(R): parent: " ++ show parent + grpcDebug $ "debugServerCall(R): deadline ptr: " ++ show (callDeadline call) timespec <- peek (callDeadline call) - grpcDebug $ "debugServerRegCall: deadline: " ++ show (C.timeSpec timespec) + grpcDebug $ "debugServerCall(R): deadline: " ++ show (C.timeSpec timespec) #else -{-# INLINE debugServerRegCall #-} -debugServerRegCall = const $ return () +{-# INLINE debugServerCall #-} +debugServerCall = const $ return () #endif destroyClientCall :: ClientCall -> IO () destroyClientCall ClientCall{..} = do grpcDebug "Destroying client-side call object." - C.grpcCallDestroy internalClientCall + C.grpcCallDestroy unClientCall -destroyServerRegCall :: ServerRegCall -> IO () -destroyServerRegCall call@ServerRegCall{..} = do - grpcDebug "destroyServerRegCall: entered." - debugServerRegCall call - grpcDebug $ "Destroying server-side call object: " - ++ show internalServerRegCall - C.grpcCallDestroy internalServerRegCall - grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecvReg - C.metadataArrayDestroy requestMetadataRecvReg +destroyServerCall :: ServerCall -> IO () +destroyServerCall call@ServerCall{..} = do + grpcDebug "destroyServerCall(R): entered." + debugServerCall call + grpcDebug $ "Destroying server-side call object: " ++ show unServerCall + C.grpcCallDestroy unServerCall + grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecv + C.metadataArrayDestroy requestMetadataRecv grpcDebug $ "destroying optional payload" ++ show optionalPayload C.destroyReceivingByteBuffer optionalPayload - grpcDebug $ "freeing parentPtr: " ++ show parentPtrReg - forM_ parentPtrReg free + grpcDebug $ "freeing parentPtr: " ++ show parentPtr + forM_ parentPtr free grpcDebug $ "destroying deadline." ++ show callDeadline C.timespecDestroy callDeadline diff --git a/src/Network/GRPC/LowLevel/Call/Unregistered.hs b/src/Network/GRPC/LowLevel/Call/Unregistered.hs index 60307ef..dc714f3 100644 --- a/src/Network/GRPC/LowLevel/Call/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/Call/Unregistered.hs @@ -3,21 +3,19 @@ module Network.GRPC.LowLevel.Call.Unregistered where import Control.Monad -import Foreign.Marshal.Alloc (free) -import Foreign.Ptr (Ptr) -import Foreign.Storable (peek) - -import qualified Network.GRPC.Unsafe as C -import qualified Network.GRPC.Unsafe.Metadata as C - -import Network.GRPC.LowLevel.Call -import Network.GRPC.LowLevel.GRPC (MetadataMap, grpcDebug) +import Foreign.Marshal.Alloc (free) +import Foreign.Ptr (Ptr) +import Foreign.Storable (peek) +import Network.GRPC.LowLevel.Call (Host (..), MethodName (..)) +import Network.GRPC.LowLevel.GRPC (MetadataMap, grpcDebug) +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Metadata as C -- | Represents one unregistered GRPC call on the server. -- Contains pointers to all the C state needed to respond to an unregistered -- call. data ServerCall = ServerCall - { internalServerCall :: C.Call + { unServerCall :: C.Call , requestMetadataRecv :: Ptr C.MetadataArray , parentPtr :: Maybe (Ptr C.Call) , callDetails :: C.CallDetails @@ -61,9 +59,8 @@ destroyServerCall :: ServerCall -> IO () destroyServerCall call@ServerCall{..} = do grpcDebug "destroyServerCall(U): entered." debugServerCall call - grpcDebug $ "Destroying server-side call object: " - ++ show internalServerCall - C.grpcCallDestroy internalServerCall + grpcDebug $ "Destroying server-side call object: " ++ show unServerCall + C.grpcCallDestroy unServerCall grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecv C.metadataArrayDestroy requestMetadataRecv grpcDebug $ "freeing parentPtr: " ++ show parentPtr diff --git a/src/Network/GRPC/LowLevel/Client.hs b/src/Network/GRPC/LowLevel/Client.hs index b2324aa..bbb2e8b 100644 --- a/src/Network/GRPC/LowLevel/Client.hs +++ b/src/Network/GRPC/LowLevel/Client.hs @@ -58,7 +58,7 @@ clientConnectivity Client{..} = C.grpcChannelCheckConnectivityState clientChannel False -- | Register a method on the client so that we can call it with --- 'clientRegisteredRequest'. +-- 'clientRequest'. clientRegisterMethod :: Client -> MethodName -- ^ method name, e.g. "/foo" @@ -74,27 +74,30 @@ clientRegisterMethod _ _ _ = error "Streaming methods not yet implemented." -- | Create a new call on the client for a registered method. -- Returns 'Left' if the CQ is shutting down or if the job to create a call -- timed out. -clientCreateRegisteredCall :: Client -> RegisteredMethod -> TimeoutSeconds - -> IO (Either GRPCIOError ClientCall) -clientCreateRegisteredCall Client{..} RegisteredMethod{..} timeout = do +clientCreateCall :: Client + -> RegisteredMethod + -> TimeoutSeconds + -> IO (Either GRPCIOError ClientCall) +clientCreateCall Client{..} RegisteredMethod{..} timeout = do let parentCall = C.Call nullPtr --Unsure what this does. null is safe, though. C.withDeadlineSeconds timeout $ \deadline -> do - channelCreateRegisteredCall clientChannel parentCall C.propagateDefaults - clientCQ methodHandle deadline + channelCreateCall clientChannel parentCall C.propagateDefaults + clientCQ methodHandle deadline -- TODO: the error-handling refactor made this quite ugly. It could be fixed -- by switching to ExceptT IO. -- | Handles safe creation and cleanup of a client call -withClientRegisteredCall :: Client -> RegisteredMethod -> TimeoutSeconds - -> (ClientCall - -> IO (Either GRPCIOError a)) - -> IO (Either GRPCIOError a) -withClientRegisteredCall client regmethod timeout f = do - createResult <- clientCreateRegisteredCall client regmethod timeout +withClientCall :: Client + -> RegisteredMethod + -> TimeoutSeconds + -> (ClientCall -> IO (Either GRPCIOError a)) + -> IO (Either GRPCIOError a) +withClientCall client regmethod timeout f = do + createResult <- clientCreateCall client regmethod timeout case createResult of Left x -> return $ Left x Right call -> f call `finally` logDestroy call - where logDestroy c = grpcDebug "withClientRegisteredCall: destroying." + where logDestroy c = grpcDebug "withClientCall(R): destroying." >> destroyClientCall c data NormalRequestResult = NormalRequestResult @@ -131,25 +134,24 @@ compileNormalRequestResults x = -- server's response. TODO: This is preliminary until we figure out how many -- different variations on sending request ops will be needed for full gRPC -- functionality. -clientRegisteredRequest :: Client - -> RegisteredMethod - -> TimeoutSeconds - -- ^ Timeout of both the grpc_call and the - -- max time to wait for the completion of the batch. - -- TODO: I think we will need to decouple the - -- lifetime of the call from the queue deadline once - -- we expose functionality for streaming calls, where - -- one call object persists across many batches. - -> ByteString - -- ^ The body of the request. - -> MetadataMap - -- ^ Metadata to send with the request. - -> IO (Either GRPCIOError NormalRequestResult) -clientRegisteredRequest client@(Client{..}) rm@(RegisteredMethod{..}) - timeLimit body meta = +clientRequest :: Client + -> RegisteredMethod + -> TimeoutSeconds + -- ^ Timeout of both the grpc_call and the max time to wait for + -- the completion of the batch. TODO: I think we will need to + -- decouple the lifetime of the call from the queue deadline once + -- we expose functionality for streaming calls, where one call + -- object persists across many batches. + -> ByteString + -- ^ The body of the request + -> MetadataMap + -- ^ Metadata to send with the request + -> IO (Either GRPCIOError NormalRequestResult) +clientRequest client@(Client{..}) rm@(RegisteredMethod{..}) + timeLimit body meta = fmap join $ case methodType of - Normal -> withClientRegisteredCall client rm timeLimit $ \call -> do - grpcDebug "clientRegisteredRequest: created call." + Normal -> withClientCall client rm timeLimit $ \call -> do + grpcDebug "clientRequest(R): created call." debugClientCall call -- NOTE: sendOps and recvOps *must* be in separate batches or -- the client hangs when the server can't be reached. @@ -158,7 +160,7 @@ clientRegisteredRequest client@(Client{..}) rm@(RegisteredMethod{..}) , OpSendCloseFromClient] sendRes <- runClientOps call clientCQ sendOps timeLimit case sendRes of - Left x -> do grpcDebug "clientRegisteredRequest: batch error." + Left x -> do grpcDebug "clientRequest(R) : batch error." return $ Left x Right rs -> do let recvOps = [OpRecvInitialMetadata, @@ -167,7 +169,7 @@ clientRegisteredRequest client@(Client{..}) rm@(RegisteredMethod{..}) recvRes <- runClientOps call clientCQ recvOps timeLimit case recvRes of Left x -> do - grpcDebug "clientRegisteredRequest: batch error." + grpcDebug "clientRequest(R): batch error." return $ Left x Right rs' -> do return $ Right $ compileNormalRequestResults (rs ++ rs') diff --git a/src/Network/GRPC/LowLevel/Client/Unregistered.hs b/src/Network/GRPC/LowLevel/Client/Unregistered.hs index 7f5fa33..03b6082 100644 --- a/src/Network/GRPC/LowLevel/Client/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/Client/Unregistered.hs @@ -11,7 +11,11 @@ import qualified Network.GRPC.Unsafe.Constants as C import qualified Network.GRPC.Unsafe.Time as C import Network.GRPC.LowLevel.Call -import Network.GRPC.LowLevel.Client +import Network.GRPC.LowLevel.Client (Client (..), + NormalRequestResult (..), + clientEndpoint, + clientNormalRequestOps, + compileNormalRequestResults) import Network.GRPC.LowLevel.CompletionQueue (TimeoutSeconds) import qualified Network.GRPC.LowLevel.CompletionQueue.Unregistered as U import Network.GRPC.LowLevel.GRPC @@ -40,7 +44,7 @@ withClientCall client method timeout f = do case createResult of Left x -> return $ Left x Right call -> f call `finally` logDestroy call - where logDestroy c = grpcDebug "withClientCall: destroying." + where logDestroy c = grpcDebug "withClientCall(U): destroying." >> destroyClientCall c -- | Makes a normal (non-streaming) request without needing to register a method @@ -61,7 +65,7 @@ clientRequest client@Client{..} meth timeLimit body meta = withClientCall client meth timeLimit $ \call -> do let ops = clientNormalRequestOps body meta results <- runClientOps call clientCQ ops timeLimit - grpcDebug "clientRequest: ops ran." + grpcDebug "clientRequest(U): ops ran." case results of Left x -> return $ Left x Right rs -> return $ Right $ compileNormalRequestResults rs diff --git a/src/Network/GRPC/LowLevel/CompletionQueue.hs b/src/Network/GRPC/LowLevel/CompletionQueue.hs index d924e05..3e78bb9 100644 --- a/src/Network/GRPC/LowLevel/CompletionQueue.hs +++ b/src/Network/GRPC/LowLevel/CompletionQueue.hs @@ -19,12 +19,12 @@ module Network.GRPC.LowLevel.CompletionQueue , shutdownCompletionQueue , pluck , startBatch - , channelCreateRegisteredCall + , channelCreateCall , TimeoutSeconds , isEventSuccessful , serverRegisterCompletionQueue , serverShutdownAndNotify - , serverRequestRegisteredCall + , serverRequestCall , newTag ) where @@ -104,14 +104,17 @@ shutdownCompletionQueue (CompletionQueue{..}) = do C.QueueTimeout -> drainLoop C.OpComplete -> drainLoop -channelCreateRegisteredCall :: C.Channel -> C.Call -> C.PropagationMask - -> CompletionQueue -> C.CallHandle - -> C.CTimeSpecPtr - -> IO (Either GRPCIOError ClientCall) -channelCreateRegisteredCall +channelCreateCall :: C.Channel + -> C.Call + -> C.PropagationMask + -> CompletionQueue + -> C.CallHandle + -> C.CTimeSpecPtr + -> IO (Either GRPCIOError ClientCall) +channelCreateCall chan parent mask cq@CompletionQueue{..} handle deadline = withPermission Push cq $ do - grpcDebug $ "channelCreateRegisteredCall: call with " + grpcDebug $ "channelCreateCall: call with " ++ concat (intersperse " " [show chan, show parent, show mask, show unsafeCQ, show handle, show deadline]) @@ -120,10 +123,13 @@ channelCreateRegisteredCall return $ Right $ ClientCall call -- | Create the call object to handle a registered call. -serverRequestRegisteredCall :: C.Server -> CompletionQueue -> TimeoutSeconds - -> RegisteredMethod -> MetadataMap - -> IO (Either GRPCIOError ServerRegCall) -serverRequestRegisteredCall +serverRequestCall :: C.Server + -> CompletionQueue + -> TimeoutSeconds + -> RegisteredMethod + -> MetadataMap + -> IO (Either GRPCIOError ServerCall) +serverRequestCall server cq@CompletionQueue{..} timeLimit RegisteredMethod{..} initMeta = withPermission Push cq $ do -- TODO: Is gRPC supposed to populate this deadline? @@ -146,28 +152,28 @@ serverRequestRegisteredCall callError <- C.grpcServerRequestRegisteredCall server methodHandle callPtr deadline metadataArray bbPtr unsafeCQ unsafeCQ tag - grpcDebug $ "serverRequestRegisteredCall: callError: " + grpcDebug $ "serverRequestCall(R): callError: " ++ show callError if callError /= C.CallOk - then do grpcDebug "serverRequestRegisteredCall: callError. cleaning up" + then do grpcDebug "serverRequestCall(R): callError. cleaning up" failureCleanup deadline callPtr metadataArrayPtr bbPtr return $ Left $ GRPCIOCallError callError else do pluckResult <- pluck cq tag timeLimit - grpcDebug "serverRequestRegisteredCall: finished pluck." + grpcDebug "serverRequestCall(R): finished pluck." case pluckResult of Left x -> do - grpcDebug "serverRequestRegisteredCall: cleanup pluck err" + grpcDebug "serverRequestCall(R): cleanup pluck err" failureCleanup deadline callPtr metadataArrayPtr bbPtr return $ Left x Right () -> do rawCall <- peek callPtr - let assembledCall = ServerRegCall rawCall metadataArrayPtr - bbPtr Nothing deadline + let assembledCall = ServerCall rawCall metadataArrayPtr + bbPtr Nothing deadline return $ Right assembledCall -- TODO: see TODO for failureCleanup in serverRequestCall. where failureCleanup deadline callPtr metadataArrayPtr bbPtr = forkIO $ do threadDelaySecs 30 - grpcDebug "serverRequestRegisteredCall: doing delayed cleanup." + grpcDebug "serverRequestCall(R): doing delayed cleanup." C.timespecDestroy deadline free callPtr C.metadataArrayDestroy metadataArrayPtr diff --git a/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs b/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs index 40ea875..67cc169 100644 --- a/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs @@ -29,8 +29,10 @@ channelCreateCall chan parent mask cq@CompletionQueue{..} meth endpt deadline = return $ Right $ ClientCall call -serverRequestCall :: C.Server -> CompletionQueue -> TimeoutSeconds - -> IO (Either GRPCIOError U.ServerCall) +serverRequestCall :: C.Server + -> CompletionQueue + -> TimeoutSeconds + -> IO (Either GRPCIOError U.ServerCall) serverRequestCall server cq@CompletionQueue{..} timeLimit = withPermission Push cq $ do callPtr <- malloc diff --git a/src/Network/GRPC/LowLevel/Op.hs b/src/Network/GRPC/LowLevel/Op.hs index 7ee0fe5..ce17f1d 100644 --- a/src/Network/GRPC/LowLevel/Op.hs +++ b/src/Network/GRPC/LowLevel/Op.hs @@ -4,23 +4,22 @@ module Network.GRPC.LowLevel.Op where import Control.Exception -import qualified Data.ByteString as B -import qualified Data.Map.Strict as M -import Data.Maybe (catMaybes) -import Foreign.C.String (CString) -import Foreign.C.Types (CInt) -import Foreign.Marshal.Alloc (free, malloc, - mallocBytes) -import Foreign.Ptr (Ptr, nullPtr) -import Foreign.Storable (peek, poke) -import qualified Network.GRPC.Unsafe as C (Call) -import qualified Network.GRPC.Unsafe.ByteBuffer as C -import qualified Network.GRPC.Unsafe.Metadata as C -import qualified Network.GRPC.Unsafe.Op as C - +import qualified Data.ByteString as B +import qualified Data.Map.Strict as M +import Data.Maybe (catMaybes) +import Foreign.C.String (CString) +import Foreign.C.Types (CInt) +import Foreign.Marshal.Alloc (free, malloc, + mallocBytes) +import Foreign.Ptr (Ptr, nullPtr) +import Foreign.Storable (peek, poke) import Network.GRPC.LowLevel.Call import Network.GRPC.LowLevel.CompletionQueue import Network.GRPC.LowLevel.GRPC +import qualified Network.GRPC.Unsafe as C (Call) +import qualified Network.GRPC.Unsafe.ByteBuffer as C +import qualified Network.GRPC.Unsafe.Metadata as C +import qualified Network.GRPC.Unsafe.Op as C -- | Sum describing all possible send and receive operations that can be batched -- and executed by gRPC. Usually these are processed in a handful of @@ -219,26 +218,26 @@ runOps call cq ops timeLimit = fmap (Right . catMaybes) $ mapM resultFromOpContext contexts Left err -> return $ Left err --- | For a given call, run the given 'Op's on the given completion queue with --- the given tag. Blocks until the ops are complete or the given number of +-- | For a given server call, run the given 'Op's on the given completion queue +-- with the given tag. Blocks until the ops are complete or the given number of -- seconds have elapsed. TODO: now that we distinguish between different types -- of calls at the type level, we could try to limit the input 'Op's more -- appropriately. E.g., we don't use an 'OpRecvInitialMetadata' when receiving a -- registered call, because gRPC handles that for us. -runServerRegOps :: ServerRegCall - -- ^ 'Call' that this batch is associated with. One call can be - -- associated with many batches. - -> CompletionQueue - -- ^ Queue on which our tag will be placed once our ops are done - -- running. - -> [Op] +runServerOps :: ServerCall + -- ^ 'Call' that this batch is associated with. One call can be + -- associated with many batches. + -> CompletionQueue + -- ^ Queue on which our tag will be placed once our ops are done + -- running. + -> [Op] -- ^ The list of 'Op's to execute. - -> TimeoutSeconds + -> TimeoutSeconds -- ^ How long to block waiting for the tag to appear on the - --queue. If we time out, the result of this action will be + -- queue. If we time out, the result of this action will be -- @CallBatchError BatchTimeout@. - -> IO (Either GRPCIOError [OpRecvResult]) -runServerRegOps = runOps . internalServerRegCall + -> IO (Either GRPCIOError [OpRecvResult]) +runServerOps = runOps . unServerCall -- | Like 'runServerOps', but for client-side calls. runClientOps :: ClientCall @@ -246,7 +245,7 @@ runClientOps :: ClientCall -> [Op] -> TimeoutSeconds -> IO (Either GRPCIOError [OpRecvResult]) -runClientOps = runOps . internalClientCall +runClientOps = runOps . unClientCall -- | If response status info is present in the given 'OpRecvResult's, returns -- a tuple of trailing metadata, status code, and status details. diff --git a/src/Network/GRPC/LowLevel/Op/Unregistered.hs b/src/Network/GRPC/LowLevel/Op/Unregistered.hs index c41b8a1..55b4077 100644 --- a/src/Network/GRPC/LowLevel/Op/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/Op/Unregistered.hs @@ -10,4 +10,4 @@ runServerOps :: U.ServerCall -> [Op] -> TimeoutSeconds -> IO (Either GRPCIOError [OpRecvResult]) -runServerOps = runOps . U.internalServerCall +runServerOps = runOps . U.unServerCall diff --git a/src/Network/GRPC/LowLevel/Server.hs b/src/Network/GRPC/LowLevel/Server.hs index 9ae18a2..b55151f 100644 --- a/src/Network/GRPC/LowLevel/Server.hs +++ b/src/Network/GRPC/LowLevel/Server.hs @@ -5,24 +5,23 @@ -- `Network.GRPC.LowLevel.Server.Unregistered`. module Network.GRPC.LowLevel.Server where -import Control.Exception (bracket, finally) +import Control.Exception (bracket, finally) import Control.Monad -import Data.ByteString (ByteString) -import Foreign.Ptr (nullPtr) -import qualified Network.GRPC.Unsafe as C -import qualified Network.GRPC.Unsafe.Op as C - +import Data.ByteString (ByteString) +import Foreign.Ptr (nullPtr) import Network.GRPC.LowLevel.Call -import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue, - TimeoutSeconds, - createCompletionQueue, - pluck, - serverRegisterCompletionQueue, - serverRequestRegisteredCall, - serverShutdownAndNotify, - shutdownCompletionQueue) +import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue, + TimeoutSeconds, + createCompletionQueue, + pluck, + serverRegisterCompletionQueue, + serverRequestCall, + serverShutdownAndNotify, + shutdownCompletionQueue) import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.Op +import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Op as C -- | Wraps various gRPC state needed to run a server. data Server = Server @@ -123,24 +122,27 @@ serverRegisterMethod _ _ _ _ = error "Streaming methods not implemented yet." -- | Create a 'Call' with which to wait for the invocation of a registered -- method. -serverCreateRegisteredCall :: Server -> RegisteredMethod -> TimeoutSeconds - -> MetadataMap - -> IO (Either GRPCIOError ServerRegCall) -serverCreateRegisteredCall Server{..} rm timeLimit initMeta = - serverRequestRegisteredCall internalServer serverCQ timeLimit rm initMeta +serverCreateCall :: Server + -> RegisteredMethod + -> TimeoutSeconds + -> MetadataMap + -> IO (Either GRPCIOError ServerCall) +serverCreateCall Server{..} rm timeLimit initMeta = + serverRequestCall internalServer serverCQ timeLimit rm initMeta -withServerRegisteredCall :: Server -> RegisteredMethod -> TimeoutSeconds - -> MetadataMap - -> (ServerRegCall - -> IO (Either GRPCIOError a)) - -> IO (Either GRPCIOError a) -withServerRegisteredCall server regmethod timeout initMeta f = do - createResult <- serverCreateRegisteredCall server regmethod timeout initMeta +withServerCall :: Server + -> RegisteredMethod + -> TimeoutSeconds + -> MetadataMap + -> (ServerCall -> IO (Either GRPCIOError a)) + -> IO (Either GRPCIOError a) +withServerCall server regmethod timeout initMeta f = do + createResult <- serverCreateCall server regmethod timeout initMeta case createResult of Left x -> return $ Left x Right call -> f call `finally` logDestroy call where logDestroy c = grpcDebug "withServerRegisteredCall: destroying." - >> destroyServerRegCall c + >> destroyServerCall c -- | Sequence of 'Op's needed to receive a normal (non-streaming) call. serverOpsGetNormalCall :: MetadataMap -> [Op] @@ -174,44 +176,49 @@ serverOpsSendNormalRegisteredResponse OpSendMessage body, OpSendStatusFromServer trailingMeta code details] +-- | A handler for an registered server call; bytestring parameter is request +-- body, with the bytestring response body in the result tuple. The first +-- metadata parameter refers to the request metadata, with the two metadata +-- values in the result tuple being the initial and trailing metadata +-- respectively. + +-- TODO: make a more rigid type for this with a Maybe MetadataMap for the +-- trailing meta, and use it for both kinds of call handlers. +type ServerHandler + = ByteString -> MetadataMap + -> IO (ByteString, MetadataMap, MetadataMap, StatusDetails) + -- TODO: we will want to replace this with some more general concept that also -- works with streaming calls in the future. -- | Wait for and then handle a normal (non-streaming) call. -serverHandleNormalRegisteredCall :: Server - -> RegisteredMethod - -> TimeoutSeconds - -> MetadataMap - -- ^ Initial server metadata - -> (ByteString -> MetadataMap - -> IO (ByteString, - MetadataMap, - MetadataMap, - StatusDetails)) - -- ^ Handler function takes a request body and - -- metadata and returns a response body and - -- metadata. - -> IO (Either GRPCIOError ()) -serverHandleNormalRegisteredCall s@Server{..} rm timeLimit srvMetadata f = do +serverHandleNormalCall :: Server + -> RegisteredMethod + -> TimeoutSeconds + -> MetadataMap + -- ^ Initial server metadata + -> ServerHandler + -> IO (Either GRPCIOError ()) +serverHandleNormalCall s@Server{..} rm timeLimit srvMetadata f = do -- TODO: we use this timeLimit twice, so the max time spent is 2*timeLimit. -- Should we just hard-code time limits instead? Not sure if client -- programmer cares, since this function will likely just be put in a loop -- anyway. - withServerRegisteredCall s rm timeLimit srvMetadata $ \call -> do - grpcDebug "serverHandleNormalRegisteredCall: starting batch." - debugServerRegCall call - payload <- serverRegCallGetPayload call + withServerCall s rm timeLimit srvMetadata $ \call -> do + grpcDebug "serverHandleNormalCall(R): starting batch." + debugServerCall call + payload <- serverCallGetPayload call case payload of --TODO: what should we do with an empty payload? Have the handler take -- @Maybe ByteString@? Need to figure out when/why payload would be empty. - Nothing -> error "serverHandleNormalRegisteredCall: payload empty." + Nothing -> error "serverHandleNormalCall(R): payload empty." Just requestBody -> do - requestMeta <- serverRegCallGetMetadata call + requestMeta <- serverCallGetMetadata call (respBody, initMeta, trailingMeta, details) <- f requestBody requestMeta let status = C.GrpcStatusOk let respOps = serverOpsSendNormalRegisteredResponse respBody initMeta trailingMeta status details - respOpsResults <- runServerRegOps call serverCQ respOps timeLimit - grpcDebug "serverHandleNormalRegisteredCall: finished response ops." + respOpsResults <- runServerOps call serverCQ respOps timeLimit + grpcDebug "serverHandleNormalCall(R): finished response ops." case respOpsResults of Left x -> return $ Left x Right _ -> return $ Right () diff --git a/src/Network/GRPC/LowLevel/Server/Unregistered.hs b/src/Network/GRPC/LowLevel/Server/Unregistered.hs index 1a007d4..4069a01 100644 --- a/src/Network/GRPC/LowLevel/Server/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/Server/Unregistered.hs @@ -11,7 +11,9 @@ import qualified Network.GRPC.LowLevel.CompletionQueue.Unregistered as U import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.Op (OpRecvResult (..)) import qualified Network.GRPC.LowLevel.Op.Unregistered as U -import Network.GRPC.LowLevel.Server +import Network.GRPC.LowLevel.Server (Server (..), + serverOpsGetNormalCall, + serverOpsSendNormalResponse) import qualified Network.GRPC.Unsafe.Op as C serverCreateCall :: Server -> TimeoutSeconds @@ -44,7 +46,7 @@ serverHandleNormalCall :: Server -> IO (Either GRPCIOError ()) serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do withServerCall s timeLimit $ \call -> do - grpcDebug "serverHandleCall(U): starting batch." + grpcDebug "serverHandleNormalCall(U): starting batch." let recvOps = serverOpsGetNormalCall srvMetadata opResults <- U.runServerOps call serverCQ recvOps timeLimit case opResults of diff --git a/tests/LowLevelTests.hs b/tests/LowLevelTests.hs index 2e0032c..8f542ce 100644 --- a/tests/LowLevelTests.hs +++ b/tests/LowLevelTests.hs @@ -11,7 +11,8 @@ import Control.Monad import Data.ByteString (ByteString) import qualified Data.Map as M import Network.GRPC.LowLevel -import Network.GRPC.LowLevel.Server.Unregistered as U +import qualified Network.GRPC.LowLevel.Client.Unregistered as U +import qualified Network.GRPC.LowLevel.Server.Unregistered as U import Test.Tasty import Test.Tasty.HUnit as HU (Assertion, assertEqual, @@ -50,14 +51,14 @@ testClientCreateDestroy = testClientCall :: TestTree testClientCall = clientOnlyTest "create/destroy call" $ \c -> do - r <- withClientCall c "foo" 10 $ const $ return $ Right () + r <- U.withClientCall c "/foo" 10 $ const $ return $ Right () r @?= Right () testClientTimeoutNoServer :: TestTree testClientTimeoutNoServer = clientOnlyTest "request timeout when server DNE" $ \c -> do rm <- clientRegisterMethod c "/foo" Normal - r <- clientRegisteredRequest c rm 1 "Hello" mempty + r <- clientRequest c rm 1 "Hello" mempty r @?= Left GRPCIOTimeout testServerCreateDestroy :: TestTree @@ -74,7 +75,7 @@ testServerTimeoutNoClient :: TestTree testServerTimeoutNoClient = serverOnlyTest "wait timeout when client DNE" [("/foo", Normal)] $ \s -> do let rm = head (registeredMethods s) - r <- serverHandleNormalRegisteredCall s rm 1 mempty $ \_ _ -> + r <- serverHandleNormalCall s rm 1 mempty $ \_ _ -> return ("", mempty, mempty, StatusDetails "details") r @?= Left GRPCIOTimeout @@ -89,13 +90,13 @@ testWrongEndpoint = -- further client c = do rm <- clientRegisterMethod c "/bar" Normal - r <- clientRegisteredRequest c rm 1 "Hello!" mempty + r <- clientRequest c rm 1 "Hello!" mempty r @?= Left (GRPCIOBadStatusCode GrpcStatusDeadlineExceeded (StatusDetails "Deadline Exceeded")) server s = do length (registeredMethods s) @?= 1 let rm = head (registeredMethods s) - r <- serverHandleNormalRegisteredCall s rm 10 mempty $ \_ _ -> do + r <- serverHandleNormalCall s rm 10 mempty $ \_ _ -> do return ("reply test", dummyMeta, dummyMeta, StatusDetails "details string") r @?= Right () @@ -112,7 +113,7 @@ testPayload = clientMD = [("foo_key", "foo_val"), ("bar_key", "bar_val")] client c = do rm <- clientRegisterMethod c "/foo" Normal - clientRegisteredRequest c rm 10 "Hello!" clientMD >>= do + clientRequest c rm 10 "Hello!" clientMD >>= do checkReqRslt $ \NormalRequestResult{..} -> do rspCode @?= GrpcStatusOk rspBody @?= "reply test" @@ -122,7 +123,7 @@ testPayload = server s = do length (registeredMethods s) @?= 1 let rm = head (registeredMethods s) - r <- serverHandleNormalRegisteredCall s rm 11 mempty $ \reqBody reqMD -> do + r <- serverHandleNormalCall s rm 11 mempty $ \reqBody reqMD -> do reqBody @?= "Hello!" checkMD "Server metadata mismatch" clientMD reqMD return ("reply test", dummyMeta, dummyMeta, StatusDetails "details string") @@ -133,13 +134,13 @@ testPayloadUnregistered = csTest "unregistered normal request/response" client server [] where client c = do - clientRequest c "/foo" 10 "Hello!" mempty >>= do + U.clientRequest c "/foo" 10 "Hello!" mempty >>= do checkReqRslt $ \NormalRequestResult{..} -> do rspCode @?= GrpcStatusOk rspBody @?= "reply test" details @?= "details string" server s = do - r <- serverHandleNormalCall s 11 mempty $ \body _md meth -> do + r <- U.serverHandleNormalCall s 11 mempty $ \body _md meth -> do body @?= "Hello!" meth @?= "/foo" return ("reply test", mempty, "details string")