From 091bf4a4579d2e0a27c63b37f44295dc69662212 Mon Sep 17 00:00:00 2001 From: Connor Clark Date: Thu, 2 Jun 2016 08:46:20 -0700 Subject: [PATCH] Improve call-related code (#14) * projections from CallDetails * refactor Call, refactor clientRegisteredRequest, handle null error * split ServerCall into separate reg/unreg types * pass method name to unreg call handler, finish destroyServerUnregCall --- cbits/grpc_haskell.c | 12 ++ include/grpc_haskell.h | 6 + src/Network/GRPC/LowLevel.hs | 10 +- src/Network/GRPC/LowLevel/Call.hs | 202 ++++++++++++------- src/Network/GRPC/LowLevel/Client.hs | 93 ++++----- src/Network/GRPC/LowLevel/CompletionQueue.hs | 24 +-- src/Network/GRPC/LowLevel/GRPC.hs | 9 + src/Network/GRPC/LowLevel/Op.hs | 87 +++++--- src/Network/GRPC/LowLevel/Server.hs | 66 +++--- src/Network/GRPC/Unsafe.chs | 6 + tests/LowLevelTests.hs | 10 +- 11 files changed, 324 insertions(+), 201 deletions(-) diff --git a/cbits/grpc_haskell.c b/cbits/grpc_haskell.c index cd3d53e..c2f2228 100644 --- a/cbits/grpc_haskell.c +++ b/cbits/grpc_haskell.c @@ -359,3 +359,15 @@ grpc_call* grpc_channel_create_registered_call_( propagation_mask, completion_queue, registered_call_handle, *deadline, reserved); } + +char* call_details_get_method(grpc_call_details* details){ + return details->method; +} + +char* call_details_get_host(grpc_call_details* details){ + return details->host; +} + +gpr_timespec* call_details_get_deadline(grpc_call_details* details){ + return &(details->deadline); +} diff --git a/include/grpc_haskell.h b/include/grpc_haskell.h index aaa77f8..a2d0218 100644 --- a/include/grpc_haskell.h +++ b/include/grpc_haskell.h @@ -123,4 +123,10 @@ grpc_call* grpc_channel_create_registered_call_( grpc_completion_queue *completion_queue, void *registered_call_handle, gpr_timespec *deadline, void *reserved); +char* call_details_get_method(grpc_call_details* details); + +char* call_details_get_host(grpc_call_details* details); + +gpr_timespec* call_details_get_deadline(grpc_call_details* details); + #endif //GRPC_HASKELL diff --git a/src/Network/GRPC/LowLevel.hs b/src/Network/GRPC/LowLevel.hs index 5b94400..a0b076a 100644 --- a/src/Network/GRPC/LowLevel.hs +++ b/src/Network/GRPC/LowLevel.hs @@ -21,22 +21,24 @@ GRPC -- * Calls , GRPCMethodType(..) , RegisteredMethod -, Call , NormalRequestResult(..) -- * Server , ServerConfig(..) , Server +, ServerRegCall +, ServerUnregCall , registeredMethods , withServer , serverHandleNormalRegisteredCall , serverHandleNormalCall -, withServerCall +, withServerUnregCall , withServerRegisteredCall -- * Client , ClientConfig(..) , Client +, ClientCall , withClient , clientRegisterMethod , clientRegisteredRequest @@ -44,7 +46,9 @@ GRPC , withClientCall -- * Ops -, runOps +, runClientOps +, runServerRegOps +, runServerUnregOps , Op(..) , OpRecvResult(..) , StatusDetails(..) diff --git a/src/Network/GRPC/LowLevel/Call.hs b/src/Network/GRPC/LowLevel/Call.hs index ac8663a..53fca50 100644 --- a/src/Network/GRPC/LowLevel/Call.hs +++ b/src/Network/GRPC/LowLevel/Call.hs @@ -4,9 +4,10 @@ module Network.GRPC.LowLevel.Call where import Control.Monad +import Data.ByteString (ByteString) import Data.String (IsString) import Foreign.Marshal.Alloc (free) -import Foreign.Ptr (Ptr, castPtr) +import Foreign.Ptr (Ptr, nullPtr) import Foreign.Storable (peek) import qualified Network.GRPC.Unsafe as C @@ -14,7 +15,7 @@ import qualified Network.GRPC.Unsafe.Time as C import qualified Network.GRPC.Unsafe.Metadata as C import qualified Network.GRPC.Unsafe.ByteBuffer as C -import Network.GRPC.LowLevel.GRPC (grpcDebug) +import Network.GRPC.LowLevel.GRPC (grpcDebug, MetadataMap) -- | Models the four types of RPC call supported by gRPC. We currently only -- support the first alternative, and only in a preliminary fashion. @@ -38,78 +39,141 @@ data RegisteredMethod = RegisteredMethod {methodType :: GRPCMethodType, methodHost :: Host, methodHandle :: C.CallHandle} --- | Represents one GRPC call (i.e. request). This type is used on both the --- client and server. Contains pointers to all the necessary C state needed to --- send and respond to a call. +-- | Represents one GRPC call (i.e. request) on the client. -- This is used to associate send/receive 'Op's with a request. --- There are separate functions for creating these depending on whether the --- method is registered and whether the call is on the client or server side. -data Call = ClientCall {internalCall :: C.Call} - | ServerCall - {internalCall :: C.Call, - requestMetadataRecv :: (Ptr C.MetadataArray), - optionalPayload :: Maybe (Ptr C.ByteBuffer), - parentPtr :: Maybe (Ptr C.Call), - callDetails :: Maybe (C.CallDetails), - -- ^ used on the server for non-registered calls - --, to identify the endpoint being used. - callDeadline :: Maybe C.CTimeSpecPtr - } +data ClientCall = ClientCall {internalClientCall :: C.Call} -debugCall :: Call -> IO () +-- | Represents one registered GRPC call on the server. +-- Contains pointers to all the C state needed to respond to a registered call. +data ServerRegCall = ServerRegCall + {internalServerRegCall :: C.Call, + requestMetadataRecvReg :: Ptr C.MetadataArray, + optionalPayload :: Ptr C.ByteBuffer, + parentPtrReg :: Maybe (Ptr C.Call), + callDeadline :: C.CTimeSpecPtr + } + +serverRegCallGetMetadata :: ServerRegCall -> IO MetadataMap +serverRegCallGetMetadata ServerRegCall{..} = do + marray <- peek requestMetadataRecvReg + C.getAllMetadataArray marray + +-- | Extract the client request body from the given registered call, if present. +-- TODO: the reason this returns @Maybe ByteString@ is because the gRPC library +-- calls the underlying out parameter "optional_payload". I am not sure exactly +-- in what cases it won't be present. +serverRegCallGetPayload :: ServerRegCall -> IO (Maybe ByteString) +serverRegCallGetPayload ServerRegCall{..} = do + bb@(C.ByteBuffer rawPtr) <- peek optionalPayload + if rawPtr == nullPtr + then return Nothing + else Just <$> C.copyByteBufferToByteString bb + +-- | Represents one unregistered GRPC call on the server. +-- Contains pointers to all the C state needed to respond to an unregistered +-- call. +data ServerUnregCall = ServerUnregCall + {internalServerUnregCall :: C.Call, + requestMetadataRecvUnreg :: Ptr C.MetadataArray, + parentPtrUnreg :: Maybe (Ptr C.Call), + callDetails :: C.CallDetails} + +serverUnregCallGetMetadata :: ServerUnregCall -> IO MetadataMap +serverUnregCallGetMetadata ServerUnregCall{..} = do + marray <- peek requestMetadataRecvUnreg + C.getAllMetadataArray marray + +serverUnregCallGetMethodName :: ServerUnregCall -> IO MethodName +serverUnregCallGetMethodName ServerUnregCall{..} = + MethodName <$> C.callDetailsGetMethod callDetails + +debugClientCall :: ClientCall -> IO () +{-# INLINE debugClientCall #-} #ifdef DEBUG -debugCall (ClientCall (C.Call ptr)) = +debugClientCall (ClientCall (C.Call ptr)) = grpcDebug $ "debugCall: client call: " ++ (show ptr) -debugCall call@(ServerCall (C.Call ptr) _ _ _ _ _) = do - grpcDebug $ "debugCall: server call: " ++ (show ptr) - grpcDebug $ "debugCall: metadata ptr: " ++ show (requestMetadataRecv call) - metadataArr <- peek (requestMetadataRecv call) - metadata <- C.getAllMetadataArray metadataArr - grpcDebug $ "debugCall: metadata received: " ++ (show metadata) - forM_ (optionalPayload call) $ \payloadPtr -> do - grpcDebug $ "debugCall: payload ptr: " ++ show payloadPtr - payload <- peek payloadPtr - bs <- C.copyByteBufferToByteString payload - grpcDebug $ "debugCall: payload contents: " ++ show bs - forM_ (parentPtr call) $ \parentPtr' -> do - grpcDebug $ "debugCall: parent ptr: " ++ show parentPtr' - (C.Call parent) <- peek parentPtr' - grpcDebug $ "debugCall: parent: " ++ show parent - forM_ (callDetails call) $ \(C.CallDetails callDetailsPtr) -> do - grpcDebug $ "debugCall: callDetails ptr: " ++ show callDetailsPtr - --TODO: need functions for getting data out of call_details. - forM_ (callDeadline call) $ \timespecptr -> do - grpcDebug $ "debugCall: deadline ptr: " ++ show timespecptr - timespec <- peek timespecptr - grpcDebug $ "debugCall: deadline: " ++ show (C.timeSpec timespec) #else -{-# INLINE debugCall #-} -debugCall = const $ return () +debugClientCall = const $ return () #endif --- | Destroys a 'Call'. -destroyCall :: Call -> IO () -destroyCall ClientCall{..} = do - grpcDebug "Destroying client-side call object." - C.grpcCallDestroy internalCall -destroyCall call@ServerCall{..} = do - grpcDebug "destroyCall: entered." - debugCall call - grpcDebug $ "Destroying server-side call object: " ++ show internalCall - C.grpcCallDestroy internalCall - grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecv - C.metadataArrayDestroy requestMetadataRecv - grpcDebug $ "destroying optional payload" ++ show optionalPayload - forM_ optionalPayload C.destroyReceivingByteBuffer - grpcDebug $ "freeing parentPtr: " ++ show parentPtr - forM_ parentPtr free - grpcDebug $ "destroying call details" ++ show callDetails - forM_ callDetails C.destroyCallDetails - grpcDebug $ "destroying deadline." ++ show callDeadline - forM_ callDeadline C.timespecDestroy +debugServerRegCall :: ServerRegCall -> IO () +#ifdef DEBUG +debugServerRegCall call@(ServerRegCall (C.Call ptr) _ _ _ _) = do + grpcDebug $ "debugServerRegCall: server call: " ++ (show ptr) + grpcDebug $ "debugServerRegCall: metadata ptr: " + ++ show (requestMetadataRecvReg call) + metadataArr <- peek (requestMetadataRecvReg call) + metadata <- C.getAllMetadataArray metadataArr + grpcDebug $ "debugServerRegCall: metadata received: " ++ (show metadata) + grpcDebug $ "debugServerRegCall: payload ptr: " ++ show (optionalPayload call) + payload <- peek (optionalPayload call) + bs <- C.copyByteBufferToByteString payload + grpcDebug $ "debugServerRegCall: payload contents: " ++ show bs + forM_ (parentPtrReg call) $ \parentPtr' -> do + grpcDebug $ "debugServerRegCall: parent ptr: " ++ show parentPtr' + (C.Call parent) <- peek parentPtr' + grpcDebug $ "debugServerRegCall: parent: " ++ show parent + grpcDebug $ "debugServerRegCall: deadline ptr: " ++ show (callDeadline call) + timespec <- peek (callDeadline call) + grpcDebug $ "debugServerRegCall: deadline: " ++ show (C.timeSpec timespec) +#else +{-# INLINE debugServerRegCall #-} +debugServerRegCall = const $ return () +#endif -_nowarn_unused :: a -_nowarn_unused = - castPtr `undefined` - (peek :: Ptr Int -> IO Int) `undefined` - () +debugServerUnregCall :: ServerUnregCall -> IO () +#ifdef DEBUG +debugServerUnregCall call@(ServerUnregCall (C.Call ptr) _ _ _) = do + grpcDebug $ "debugServerUnregCall: server call: " ++ (show ptr) + grpcDebug $ "debugServerUnregCall: metadata ptr: " + ++ show (requestMetadataRecvUnreg call) + metadataArr <- peek (requestMetadataRecvUnreg call) + metadata <- C.getAllMetadataArray metadataArr + grpcDebug $ "debugServerUnregCall: metadata received: " ++ (show metadata) + forM_ (parentPtrUnreg call) $ \parentPtr' -> do + grpcDebug $ "debugServerRegCall: parent ptr: " ++ show parentPtr' + (C.Call parent) <- peek parentPtr' + grpcDebug $ "debugServerRegCall: parent: " ++ show parent + grpcDebug $ "debugServerUnregCall: callDetails ptr: " + ++ show (callDetails call) + --TODO: need functions for getting data out of call_details. +#else +{-# INLINE debugServerUnregCall #-} +debugServerUnregCall = const $ return () +#endif + + +destroyClientCall :: ClientCall -> IO () +destroyClientCall ClientCall{..} = do + grpcDebug "Destroying client-side call object." + C.grpcCallDestroy internalClientCall + +destroyServerRegCall :: ServerRegCall -> IO () +destroyServerRegCall call@ServerRegCall{..} = do + grpcDebug "destroyServerRegCall: entered." + debugServerRegCall call + grpcDebug $ "Destroying server-side call object: " + ++ show internalServerRegCall + C.grpcCallDestroy internalServerRegCall + grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecvReg + C.metadataArrayDestroy requestMetadataRecvReg + grpcDebug $ "destroying optional payload" ++ show optionalPayload + C.destroyReceivingByteBuffer optionalPayload + grpcDebug $ "freeing parentPtr: " ++ show parentPtrReg + forM_ parentPtrReg free + grpcDebug $ "destroying deadline." ++ show callDeadline + C.timespecDestroy callDeadline + +destroyServerUnregCall :: ServerUnregCall -> IO () +destroyServerUnregCall call@ServerUnregCall{..} = do + grpcDebug "destroyServerUnregCall: entered." + debugServerUnregCall call + grpcDebug $ "Destroying server-side call object: " + ++ show internalServerUnregCall + C.grpcCallDestroy internalServerUnregCall + grpcDebug $ "destroying metadata array: " ++ show requestMetadataRecvUnreg + C.metadataArrayDestroy requestMetadataRecvUnreg + grpcDebug $ "freeing parentPtrUnreg: " ++ show parentPtrUnreg + forM_ parentPtrUnreg free + grpcDebug $ "destroying call details: " ++ show callDetails + C.destroyCallDetails callDetails diff --git a/src/Network/GRPC/LowLevel/Client.hs b/src/Network/GRPC/LowLevel/Client.hs index efb96ef..717b8c3 100644 --- a/src/Network/GRPC/LowLevel/Client.hs +++ b/src/Network/GRPC/LowLevel/Client.hs @@ -3,6 +3,7 @@ module Network.GRPC.LowLevel.Client where import Control.Exception (bracket, finally) +import Control.Monad (join) import Data.ByteString (ByteString) import Foreign.Ptr (nullPtr) import qualified Network.GRPC.Unsafe as C @@ -63,7 +64,7 @@ clientRegisterMethod _ _ _ _ = error "Streaming methods not yet implemented." -- Returns 'Left' if the CQ is shutting down or if the job to create a call -- timed out. clientCreateRegisteredCall :: Client -> RegisteredMethod -> TimeoutSeconds - -> IO (Either GRPCIOError Call) + -> IO (Either GRPCIOError ClientCall) clientCreateRegisteredCall Client{..} RegisteredMethod{..} timeout = do let parentCall = C.Call nullPtr --Unsure what this does. null is safe, though. C.withDeadlineSeconds timeout $ \deadline -> do @@ -74,7 +75,7 @@ clientCreateRegisteredCall Client{..} RegisteredMethod{..} timeout = do -- by switching to ExceptT IO. -- | Handles safe creation and cleanup of a client call withClientRegisteredCall :: Client -> RegisteredMethod -> TimeoutSeconds - -> (Call + -> (ClientCall -> IO (Either GRPCIOError a)) -> IO (Either GRPCIOError a) withClientRegisteredCall client regmethod timeout f = do @@ -83,7 +84,7 @@ withClientRegisteredCall client regmethod timeout f = do Left x -> return $ Left x Right call -> f call `finally` logDestroy call where logDestroy c = grpcDebug "withClientRegisteredCall: destroying." - >> destroyCall c + >> destroyClientCall c -- | Create a call on the client for an endpoint without using the -- method registration machinery. In practice, we'll probably only use the @@ -94,7 +95,7 @@ clientCreateCall :: Client -> Host -- ^ The host. -> TimeoutSeconds - -> IO (Either GRPCIOError Call) + -> IO (Either GRPCIOError ClientCall) clientCreateCall Client{..} method host timeout = do let parentCall = C.Call nullPtr C.withDeadlineSeconds timeout $ \deadline -> do @@ -102,7 +103,7 @@ clientCreateCall Client{..} method host timeout = do clientCQ method host deadline withClientCall :: Client -> MethodName -> Host -> TimeoutSeconds - -> (Call -> IO (Either GRPCIOError a)) + -> (ClientCall -> IO (Either GRPCIOError a)) -> IO (Either GRPCIOError a) withClientCall client method host timeout f = do createResult <- clientCreateCall client method host timeout @@ -110,7 +111,7 @@ withClientCall client method host timeout f = do Left x -> return $ Left x Right call -> f call `finally` logDestroy call where logDestroy c = grpcDebug "withClientCall: destroying." - >> destroyCall c + >> destroyClientCall c data NormalRequestResult = NormalRequestResult ByteString @@ -121,26 +122,25 @@ data NormalRequestResult = NormalRequestResult deriving (Show, Eq) -- | Function for assembling call result when the 'MethodType' is 'Normal'. -compileNormalRequestResults :: [OpRecvResult] -> NormalRequestResult +compileNormalRequestResults :: [OpRecvResult] + -> Either GRPCIOError NormalRequestResult compileNormalRequestResults - --TODO: consider using more precise type instead of match. - -- Whether we do so depends on whether this layer of abstraction is supposed - -- to be a safe interface to the gRPC C core library, or something that makes - -- core use cases easy. [OpRecvInitialMetadataResult m, - OpRecvMessageResult body, + OpRecvMessageResult (Just body), OpRecvStatusOnClientResult m2 status details] - = NormalRequestResult body (Just m) m2 status (StatusDetails details) + = Right $ NormalRequestResult body (Just m) m2 status + (StatusDetails details) -- TODO: it seems registered request responses on the server -- don't send initial metadata. Hence the 'Maybe'. Investigate. compileNormalRequestResults - [OpRecvMessageResult body, + [OpRecvMessageResult (Just body), OpRecvStatusOnClientResult m2 status details] - = NormalRequestResult body Nothing m2 status (StatusDetails details) -compileNormalRequestResults _ = - --TODO: impossible case should be enforced by more precise types. - error "non-normal request input to compileNormalRequestResults." - + = Right $ NormalRequestResult body Nothing m2 status (StatusDetails details) +compileNormalRequestResults x = + case extractStatus x of + Nothing -> Left GRPCIOUnknownError + Just (OpRecvStatusOnClientResult _ status details) -> + Left (GRPCIOBadStatusCode status (StatusDetails details)) -- | Make a request of the given method with the given body. Returns the -- server's response. TODO: This is preliminary until we figure out how many @@ -162,37 +162,28 @@ clientRegisteredRequest :: Client -> IO (Either GRPCIOError NormalRequestResult) clientRegisteredRequest client@(Client{..}) rm@(RegisteredMethod{..}) timeLimit body meta = - case methodType of + fmap join $ case methodType of Normal -> withClientRegisteredCall client rm timeLimit $ \call -> do grpcDebug "clientRegisteredRequest: created call." - debugCall call - --TODO: doing one op at a time to debug. Some were hanging. - let op1 = [OpSendInitialMetadata meta] - res1 <- runOps call clientCQ op1 timeLimit - grpcDebug $ "finished res1: " ++ show res1 - let op2 = [OpSendMessage body] - res2 <- runOps call clientCQ op2 timeLimit - grpcDebug $ "finished res2: " ++ show res2 - let op3 = [OpSendCloseFromClient] - res3 <- runOps call clientCQ op3 timeLimit - grpcDebug $ "finished res3: " ++ show res3 - let op4 = [OpRecvMessage] - res4 <- runOps call clientCQ op4 timeLimit - grpcDebug $ "finished res4: " ++ show res4 - let op5 = [OpRecvStatusOnClient] - res5 <- runOps call clientCQ op5 timeLimit - grpcDebug $ "finished res5: " ++ show res5 - let results = do - r1 <- res1 - r2 <- res2 - r3 <- res3 - r4 <- res4 - r5 <- res5 - return $ r1 ++ r2 ++ r3 ++ r4 ++ r5 - case results of - Left x -> return $ Left x - Right rs -> return $ - Right $ compileNormalRequestResults rs + debugClientCall call + -- NOTE: sendOps and recvOps *must* be in separate batches or + -- the client hangs when the server can't be reached. + let sendOps = [OpSendInitialMetadata meta + , OpSendMessage body + , OpSendCloseFromClient] + sendRes <- runClientOps call clientCQ sendOps timeLimit + case sendRes of + Left x -> do grpcDebug "clientRegisteredRequest: batch error." + return $ Left x + Right rs -> do + let recvOps = [OpRecvMessage, OpRecvStatusOnClient] + recvRes <- runClientOps call clientCQ recvOps timeLimit + case recvRes of + Left x -> do + grpcDebug "clientRegisteredRequest: batch error." + return $ Left x + Right rs' -> do + return $ Right $ compileNormalRequestResults (rs ++ rs') _ -> error "Streaming methods not yet implemented." -- | Makes a normal (non-streaming) request without needing to register a method @@ -210,16 +201,16 @@ clientRequest :: Client -- ^ Request metadata. -> IO (Either GRPCIOError NormalRequestResult) clientRequest client@(Client{..}) (MethodName method) (Host host) - timeLimit body meta = do + timeLimit body meta = + fmap join $ withClientCall client (MethodName method) (Host host) timeLimit $ \call -> do let ops = clientNormalRequestOps body meta - results <- runOps call clientCQ ops timeLimit + results <- runClientOps call clientCQ ops timeLimit grpcDebug "clientRequest: ops ran." case results of Left x -> return $ Left x Right rs -> return $ Right $ compileNormalRequestResults rs - clientNormalRequestOps :: ByteString -> MetadataMap -> [Op] clientNormalRequestOps body metadata = [OpSendInitialMetadata metadata, diff --git a/src/Network/GRPC/LowLevel/CompletionQueue.hs b/src/Network/GRPC/LowLevel/CompletionQueue.hs index 3310734..542e3a2 100644 --- a/src/Network/GRPC/LowLevel/CompletionQueue.hs +++ b/src/Network/GRPC/LowLevel/CompletionQueue.hs @@ -224,7 +224,8 @@ shutdownCompletionQueue (CompletionQueue{..}) = do channelCreateRegisteredCall :: C.Channel -> C.Call -> C.PropagationMask -> CompletionQueue -> C.CallHandle - -> C.CTimeSpecPtr -> IO (Either GRPCIOError Call) + -> C.CTimeSpecPtr + -> IO (Either GRPCIOError ClientCall) channelCreateRegisteredCall chan parent mask cq@CompletionQueue{..} handle deadline = withPermission Push cq $ do @@ -238,7 +239,7 @@ channelCreateRegisteredCall channelCreateCall :: C.Channel -> C.Call -> C.PropagationMask -> CompletionQueue -> MethodName -> Host -> C.CTimeSpecPtr - -> IO (Either GRPCIOError Call) + -> IO (Either GRPCIOError ClientCall) channelCreateCall chan parent mask cq@CompletionQueue{..} (MethodName methodName) (Host host) deadline = @@ -250,7 +251,7 @@ channelCreateCall -- | Create the call object to handle a registered call. serverRequestRegisteredCall :: C.Server -> CompletionQueue -> TimeoutSeconds -> RegisteredMethod - -> IO (Either GRPCIOError Call) + -> IO (Either GRPCIOError ServerRegCall) serverRequestRegisteredCall server cq@CompletionQueue{..} timeLimit RegisteredMethod{..} = withPermission Push cq $ do @@ -280,9 +281,8 @@ serverRequestRegisteredCall return $ Left x Right () -> do rawCall <- peek callPtr - let assembledCall = ServerCall rawCall metadataArrayPtr - (Just bbPtr) Nothing Nothing - (Just deadline) + let assembledCall = ServerRegCall rawCall metadataArrayPtr + bbPtr Nothing deadline return $ Right assembledCall -- TODO: see TODO for failureCleanup in serverRequestCall. where failureCleanup deadline callPtr metadataArrayPtr bbPtr = forkIO $ do @@ -294,7 +294,7 @@ serverRequestRegisteredCall free bbPtr serverRequestCall :: C.Server -> CompletionQueue -> TimeoutSeconds - -> IO (Either GRPCIOError Call) + -> IO (Either GRPCIOError ServerUnregCall) serverRequestCall server cq@CompletionQueue{..} timeLimit = withPermission Push cq $ do callPtr <- malloc @@ -321,12 +321,10 @@ serverRequestCall server cq@CompletionQueue{..} timeLimit = return $ Left x Right () -> do rawCall <- peek callPtr - let call = ServerCall rawCall - metadataArrayPtr - Nothing - Nothing - (Just callDetails) - Nothing + let call = ServerUnregCall rawCall + metadataArrayPtr + Nothing + callDetails return $ Right call --TODO: the gRPC library appears to hold onto these pointers for a random diff --git a/src/Network/GRPC/LowLevel/GRPC.hs b/src/Network/GRPC/LowLevel/GRPC.hs index dedf1ba..65529d6 100644 --- a/src/Network/GRPC/LowLevel/GRPC.hs +++ b/src/Network/GRPC/LowLevel/GRPC.hs @@ -10,12 +10,20 @@ import Control.Monad.Except (ExceptT(..), runExceptT, throwError, MonadError) -} import Control.Exception +import qualified Data.ByteString as B +import qualified Data.Map as M +import Data.String (IsString) import qualified Network.GRPC.Unsafe as C +import qualified Network.GRPC.Unsafe.Op as C #ifdef DEBUG import GHC.Conc (myThreadId) #endif +type MetadataMap = M.Map B.ByteString B.ByteString + +newtype StatusDetails = StatusDetails B.ByteString deriving (Show, Eq, IsString) + -- | Functions as a proof that the gRPC core has been started. The gRPC core -- must be initialized to create any gRPC state, so this is a requirement for -- the server and client create/start functions. @@ -39,6 +47,7 @@ data GRPCIOError = GRPCIOCallError C.CallError -- ^ Thrown if a 'CompletionQueue' fails to shut down in a -- reasonable amount of time. | GRPCIOUnknownError + | GRPCIOBadStatusCode C.StatusCode StatusDetails deriving (Show, Eq) throwIfCallError :: C.CallError -> Either GRPCIOError () diff --git a/src/Network/GRPC/LowLevel/Op.hs b/src/Network/GRPC/LowLevel/Op.hs index 938f2de..3f5747a 100644 --- a/src/Network/GRPC/LowLevel/Op.hs +++ b/src/Network/GRPC/LowLevel/Op.hs @@ -14,7 +14,7 @@ import Foreign.Marshal.Alloc (free, malloc, mallocBytes) import Foreign.Ptr (Ptr, nullPtr) import Foreign.Storable (peek, poke) -import qualified Network.GRPC.Unsafe as C () +import qualified Network.GRPC.Unsafe as C (Call) import qualified Network.GRPC.Unsafe.ByteBuffer as C import qualified Network.GRPC.Unsafe.Metadata as C import qualified Network.GRPC.Unsafe.Op as C @@ -23,10 +23,6 @@ import Network.GRPC.LowLevel.Call import Network.GRPC.LowLevel.CompletionQueue import Network.GRPC.LowLevel.GRPC -type MetadataMap = M.Map B.ByteString B.ByteString - -newtype StatusDetails = StatusDetails B.ByteString deriving (Show, Eq, IsString) - -- | Sum describing all possible send and receive operations that can be batched -- and executed by gRPC. Usually these are processed in a handful of -- combinations depending on the 'MethodType' of the call being run. @@ -146,7 +142,9 @@ withOpArray n = bracket (C.opArrayCreate n) -- | Container holding GC-managed results for 'Op's which receive data. data OpRecvResult = OpRecvInitialMetadataResult MetadataMap - | OpRecvMessageResult B.ByteString + | OpRecvMessageResult (Maybe B.ByteString) + -- ^ If the client or server dies, we might not receive a response body, in + -- which case this will be 'Nothing'. | OpRecvStatusOnClientResult MetadataMap C.StatusCode B.ByteString | OpRecvCloseOnServerResult Bool -- ^ True if call was cancelled. deriving (Eq, Show) @@ -162,11 +160,12 @@ resultFromOpContext (OpRecvInitialMetadataContext pmetadata) = do return $ Just $ OpRecvInitialMetadataResult metadataMap resultFromOpContext (OpRecvMessageContext pbb) = do grpcDebug "resultFromOpContext: OpRecvMessageContext" - bb <- peek pbb - grpcDebug "resultFromOpContext: bytebuffer peeked." - bs <- C.copyByteBufferToByteString bb - grpcDebug "resultFromOpContext: bb copied." - return $ Just $ OpRecvMessageResult bs + bb@(C.ByteBuffer bbptr) <- peek pbb + if bbptr == nullPtr + then return $ Just $ OpRecvMessageResult Nothing + else do bs <- C.copyByteBufferToByteString bb + grpcDebug "resultFromOpContext: bb copied." + return $ Just $ OpRecvMessageResult (Just bs) resultFromOpContext (OpRecvStatusOnClientContext pmetadata pcode pstr) = do grpcDebug "resultFromOpContext: OpRecvStatusOnClientContext" metadata <- peek pmetadata @@ -186,23 +185,18 @@ resultFromOpContext _ = do --TODO: the list of 'Op's type is less specific than it could be. There are only -- a few different sequences of 'Op's we will see in practice. Once we figure --- out what those are, we should create a more specific sum type. This will also --- allow us to make a more specific sum type to replace @[OpRecvResult]@, too. +-- out what those are, we should create a more specific sum type. However, since +-- ops can fail, the list of 'OpRecvResult' returned by 'runOps' can vary in +-- their contents and are perhaps less amenable to simplification. +-- In the meantime, from looking at the core tests, it looks like it is safe to +-- say that we always get a GRPC_CALL_ERROR_TOO_MANY_OPERATIONS error if we use +-- the same 'Op' twice in the same batch, so we might want to change the list to +-- a set. I don't think order matters within a batch. Need to check. --- | For a given call, run the given 'Op's on the given completion queue with --- the given tag. Blocks until the ops are complete or the given number of --- seconds have elapsed. -runOps :: Call - -- ^ 'Call' that this batch is associated with. One call can be - -- associated with many batches. +runOps :: C.Call -> CompletionQueue - -- ^ Queue on which our tag will be placed once our ops are done - -- running. -> [Op] -> TimeoutSeconds - -- ^ How long to block waiting for the tag to appear on the queue. - -- If we time out, the result of this action will be - -- @CallBatchError BatchTimeout@. -> IO (Either GRPCIOError [OpRecvResult]) runOps call cq ops timeLimit = let l = length ops in @@ -212,7 +206,7 @@ runOps call cq ops timeLimit = grpcDebug $ "runOps: allocated op contexts: " ++ show contexts sequence_ $ zipWith (setOpArray opArray) [0..l-1] contexts tag <- newTag cq - callError <- startBatch cq (internalCall call) opArray l tag + callError <- startBatch cq call opArray l tag grpcDebug $ "runOps: called start_batch. callError: " ++ (show callError) case callError of @@ -226,5 +220,44 @@ runOps call cq ops timeLimit = fmap (Right . catMaybes) $ mapM resultFromOpContext contexts Left err -> return $ Left err -_nowarn_unused :: a -_nowarn_unused = undefined nullPtr +-- | For a given call, run the given 'Op's on the given completion queue with +-- the given tag. Blocks until the ops are complete or the given number of +-- seconds have elapsed. +-- TODO: now that 'ServerRegCall' and 'ServerUnregCall' are separate types, we +-- could try to limit the input 'Op's more appropriately. E.g., we don't use +-- an 'OpRecvInitialMetadata' when receiving a registered call, because gRPC +-- handles that for us. +runServerRegOps :: ServerRegCall + -- ^ 'Call' that this batch is associated with. One call can be + -- associated with many batches. + -> CompletionQueue + -- ^ Queue on which our tag will be placed once our ops are done + -- running. + -> [Op] + -- ^ The list of 'Op's to execute. + -> TimeoutSeconds + -- ^ How long to block waiting for the tag to appear on the + --queue. If we time out, the result of this action will be + -- @CallBatchError BatchTimeout@. + -> IO (Either GRPCIOError [OpRecvResult]) +runServerRegOps = runOps . internalServerRegCall + +runServerUnregOps :: ServerUnregCall + -> CompletionQueue + -> [Op] + -> TimeoutSeconds + -> IO (Either GRPCIOError [OpRecvResult]) +runServerUnregOps = runOps . internalServerUnregCall + +-- | Like 'runServerOps', but for client-side calls. +runClientOps :: ClientCall + -> CompletionQueue + -> [Op] + -> TimeoutSeconds + -> IO (Either GRPCIOError [OpRecvResult]) +runClientOps = runOps . internalClientCall + +extractStatus :: [OpRecvResult] -> Maybe OpRecvResult +extractStatus [] = Nothing +extractStatus (res@(OpRecvStatusOnClientResult _ _ _):_) = Just res +extractStatus (_:xs) = extractStatus xs diff --git a/src/Network/GRPC/LowLevel/Server.hs b/src/Network/GRPC/LowLevel/Server.hs index d541ac1..4a36a78 100644 --- a/src/Network/GRPC/LowLevel/Server.hs +++ b/src/Network/GRPC/LowLevel/Server.hs @@ -6,9 +6,7 @@ import Control.Concurrent (threadDelay) import Control.Exception (bracket, finally) import Control.Monad import Data.ByteString (ByteString) -import qualified Data.Map as M import Foreign.Ptr (nullPtr) -import Foreign.Storable (peek) import qualified Network.GRPC.Unsafe as C import qualified Network.GRPC.Unsafe.Op as C @@ -25,9 +23,6 @@ import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue, import Network.GRPC.LowLevel.GRPC import Network.GRPC.LowLevel.Op -import qualified Network.GRPC.Unsafe.ByteBuffer as C -import qualified Network.GRPC.Unsafe.Metadata as C - -- | Wraps various gRPC state needed to run a server. data Server = Server {internalServer :: C.Server, serverCQ :: CompletionQueue, registeredMethods :: [RegisteredMethod]} @@ -123,12 +118,12 @@ serverRegisterMethod _ _ _ _ = error "Streaming methods not implemented yet." -- | Create a 'Call' with which to wait for the invocation of a registered -- method. serverCreateRegisteredCall :: Server -> RegisteredMethod -> TimeoutSeconds - -> IO (Either GRPCIOError Call) + -> IO (Either GRPCIOError ServerRegCall) serverCreateRegisteredCall Server{..} rm timeLimit = serverRequestRegisteredCall internalServer serverCQ timeLimit rm withServerRegisteredCall :: Server -> RegisteredMethod -> TimeoutSeconds - -> (Call + -> (ServerRegCall -> IO (Either GRPCIOError a)) -> IO (Either GRPCIOError a) withServerRegisteredCall server regmethod timeout f = do @@ -137,23 +132,23 @@ withServerRegisteredCall server regmethod timeout f = do Left x -> return $ Left x Right call -> f call `finally` logDestroy call where logDestroy c = grpcDebug "withServerRegisteredCall: destroying." - >> destroyCall c + >> destroyServerRegCall c -serverCreateCall :: Server -> TimeoutSeconds - -> IO (Either GRPCIOError Call) -serverCreateCall Server{..} timeLimit = +serverCreateUnregCall :: Server -> TimeoutSeconds + -> IO (Either GRPCIOError ServerUnregCall) +serverCreateUnregCall Server{..} timeLimit = serverRequestCall internalServer serverCQ timeLimit -withServerCall :: Server -> TimeoutSeconds - -> (Call -> IO (Either GRPCIOError a)) +withServerUnregCall :: Server -> TimeoutSeconds + -> (ServerUnregCall -> IO (Either GRPCIOError a)) -> IO (Either GRPCIOError a) -withServerCall server timeout f = do - createResult <- serverCreateCall server timeout +withServerUnregCall server timeout f = do + createResult <- serverCreateUnregCall server timeout case createResult of Left x -> return $ Left x Right call -> f call `finally` logDestroy call where logDestroy c = grpcDebug "withServerCall: destroying." - >> destroyCall c + >> destroyServerUnregCall c -- | Sequence of 'Op's needed to receive a normal (non-streaming) call. serverOpsGetNormalCall :: MetadataMap -> [Op] @@ -211,19 +206,19 @@ serverHandleNormalRegisteredCall s@Server{..} rm timeLimit srvMetadata f = do -- anyway. withServerRegisteredCall s rm timeLimit $ \call -> do grpcDebug "serverHandleNormalRegisteredCall: starting batch." - debugCall call - case optionalPayload call of - Nothing -> error "Impossible: not a registered call." --TODO: better types - Just payloadPtr -> do - payload <- peek payloadPtr - requestBody <- C.copyByteBufferToByteString payload - metadataArray <- peek $ requestMetadataRecv call - metadata <- C.getAllMetadataArray metadataArray - (respBody, initMeta, trailingMeta, details) <- f requestBody metadata + debugServerRegCall call + payload <- serverRegCallGetPayload call + case payload of + --TODO: what should we do with an empty payload? Have the handler take + -- @Maybe ByteString@? Need to figure out when/why payload would be empty. + Nothing -> error "serverHandleNormalRegisteredCall: payload empty." + Just requestBody -> do + requestMeta <- serverRegCallGetMetadata call + (respBody, initMeta, trailingMeta, details) <- f requestBody requestMeta let status = C.GrpcStatusOk let respOps = serverOpsSendNormalRegisteredResponse respBody initMeta trailingMeta status details - respOpsResults <- runOps call serverCQ respOps timeLimit + respOpsResults <- runServerRegOps call serverCQ respOps timeLimit grpcDebug "serverHandleNormalRegisteredCall: finished response ops." case respOpsResults of Left x -> return $ Left x @@ -235,25 +230,28 @@ serverHandleNormalRegisteredCall s@Server{..} rm timeLimit srvMetadata f = do serverHandleNormalCall :: Server -> TimeoutSeconds -> MetadataMap -- ^ Initial server metadata. - -> (ByteString -> MetadataMap + -> (ByteString -> MetadataMap -> MethodName -> IO (ByteString, MetadataMap, StatusDetails)) -- ^ Handler function takes a request body and -- metadata and returns a response body and metadata. -> IO (Either GRPCIOError ()) serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do - withServerCall s timeLimit $ \call -> do + withServerUnregCall s timeLimit $ \call -> do grpcDebug "serverHandleNormalCall: starting batch." let recvOps = serverOpsGetNormalCall srvMetadata - opResults <- runOps call serverCQ recvOps timeLimit + opResults <- runServerUnregOps call serverCQ recvOps timeLimit case opResults of - Left x -> return $ Left x - Right [OpRecvMessageResult body] -> do - --TODO: we need to get client metadata - (respBody, respMetadata, details) <- f body M.empty + Left x -> do grpcDebug "serverHandleNormalCall: ops failed; aborting" + return $ Left x + Right [OpRecvMessageResult (Just body)] -> do + requestMeta <- serverUnregCallGetMetadata call + grpcDebug $ "got client metadata: " ++ show requestMeta + methodName <- serverUnregCallGetMethodName call + (respBody, respMetadata, details) <- f body requestMeta methodName let status = C.GrpcStatusOk let respOps = serverOpsSendNormalResponse respBody respMetadata status details - respOpsResults <- runOps call serverCQ respOps timeLimit + respOpsResults <- runServerUnregOps call serverCQ respOps timeLimit case respOpsResults of Left x -> do grpcDebug "serverHandleNormalCall: resp failed." return $ Left x diff --git a/src/Network/GRPC/Unsafe.chs b/src/Network/GRPC/Unsafe.chs index fa3f9f2..8e0ee94 100644 --- a/src/Network/GRPC/Unsafe.chs +++ b/src/Network/GRPC/Unsafe.chs @@ -263,3 +263,9 @@ castPeek p = peek (castPtr p) `MetadataArray', id `Ptr ByteBuffer', `CompletionQueue', `CompletionQueue',unTag `Tag'} -> `CallError'#} + +{#fun call_details_get_method as ^ {`CallDetails'} -> `String'#} + +{#fun call_details_get_host as ^ {`CallDetails'} -> `String'#} + +{#fun call_details_get_deadline as ^ {`CallDetails'} -> `CTimeSpec' peek* #} diff --git a/tests/LowLevelTests.hs b/tests/LowLevelTests.hs index 5f01a80..035741c 100644 --- a/tests/LowLevelTests.hs +++ b/tests/LowLevelTests.hs @@ -84,8 +84,10 @@ payloadLowLevelServerUnregistered :: TestServer payloadLowLevelServerUnregistered = TestServer $ \grpc -> do withServer grpc (ServerConfig "localhost" 50051 []) $ \server -> do result <- serverHandleNormalCall server 11 M.empty $ - \_reqBody _reqMeta -> return ("reply test", M.empty, - StatusDetails "details string") + \reqBody _reqMeta reqMethod -> do + reqBody @?= "Hello!" + reqMethod @?= "/foo" + return ("reply test", M.empty, StatusDetails "details string") case result of Left x -> error $ show x Right _ -> return () @@ -115,7 +117,7 @@ testServerUnregisteredAwaitNoClient = let conf = ServerConfig "localhost" 50051 [] withServer grpc conf $ \server -> do result <- serverHandleNormalCall server 10 M.empty $ - \_ _ -> return ("", M.empty, StatusDetails "") + \_ _ _ -> return ("", M.empty, StatusDetails "") case result of Left err -> error $ show err Right _ -> return () @@ -135,7 +137,7 @@ testWithServerCall = grpcTest "Server - Create/destroy call" $ \grpc -> do let conf = ServerConfig "localhost" 50051 [] withServer grpc conf $ \server -> do - result <- withServerCall server 1 $ const $ return $ Right () + result <- withServerUnregCall server 1 $ const $ return $ Right () result @?= Left GRPCIOTimeout testWithClientCall :: TestTree