diff --git a/cbits/grpc_haskell.c b/cbits/grpc_haskell.c index 14995a7..051bde8 100644 --- a/cbits/grpc_haskell.c +++ b/cbits/grpc_haskell.c @@ -133,6 +133,12 @@ gpr_timespec* infinite_deadline(){ return retval; } +gpr_timespec* convert_clock_type(gpr_timespec *t, gpr_clock_type to){ + gpr_timespec *retval = malloc(sizeof(gpr_timespec)); + *retval = gpr_convert_clock_type(*t, to); + return retval; +} + grpc_metadata_array** metadata_array_create(){ grpc_metadata_array **retval = malloc(sizeof(grpc_metadata_array*)); *retval = malloc(sizeof(grpc_metadata_array)); diff --git a/include/grpc_haskell.h b/include/grpc_haskell.h index e03f884..719b466 100644 --- a/include/grpc_haskell.h +++ b/include/grpc_haskell.h @@ -54,6 +54,8 @@ gpr_timespec* millis_to_deadline(int64_t millis); gpr_timespec* infinite_deadline(); +gpr_timespec* convert_clock_type(gpr_timespec *t, gpr_clock_type to); + grpc_metadata_array** metadata_array_create(); void metadata_array_destroy(grpc_metadata_array **arr); diff --git a/src/Network/GRPC/LowLevel.hs b/src/Network/GRPC/LowLevel.hs index dfa8155..0e1b434 100644 --- a/src/Network/GRPC/LowLevel.hs +++ b/src/Network/GRPC/LowLevel.hs @@ -35,6 +35,7 @@ GRPC , serverHandleNormalCall , withServerCall , serverCallCancel +, serverCallIsExpired -- * Client , ClientConfig(..) diff --git a/src/Network/GRPC/LowLevel/Call.hs b/src/Network/GRPC/LowLevel/Call.hs index a74faf8..aa8a25d 100644 --- a/src/Network/GRPC/LowLevel/Call.hs +++ b/src/Network/GRPC/LowLevel/Call.hs @@ -12,6 +12,7 @@ import Data.String (IsString) import Foreign.Marshal.Alloc (free) import Foreign.Ptr (Ptr, nullPtr) import Foreign.Storable (peek) +import System.Clock import qualified Network.GRPC.Unsafe as C import qualified Network.GRPC.Unsafe.ByteBuffer as C @@ -67,7 +68,8 @@ data ServerCall = ServerCall requestMetadataRecv :: Ptr C.MetadataArray, optionalPayload :: Ptr C.ByteBuffer, parentPtr :: Maybe (Ptr C.Call), - callDeadline :: C.CTimeSpecPtr + callDeadlinePtr :: C.CTimeSpecPtr, + callDeadline :: TimeSpec } serverCallCancel :: ServerCall -> C.StatusCode -> String -> IO () @@ -92,6 +94,11 @@ serverCallGetPayload ServerCall{..} = do then return Nothing else Just <$> C.copyByteBufferToByteString bb +serverCallIsExpired :: ServerCall -> IO Bool +serverCallIsExpired sc = do + currTime <- getTime Monotonic + return $ currTime > (callDeadline sc) + debugClientCall :: ClientCall -> IO () {-# INLINE debugClientCall #-} #ifdef DEBUG @@ -103,7 +110,7 @@ debugClientCall = const $ return () debugServerCall :: ServerCall -> IO () #ifdef DEBUG -debugServerCall call@(ServerCall (C.Call ptr) _ _ _ _) = do +debugServerCall call@(ServerCall (C.Call ptr) _ _ _ _ _) = do grpcDebug $ "debugServerCall(R): server call: " ++ (show ptr) grpcDebug $ "debugServerCall(R): metadata ptr: " ++ show (requestMetadataRecv call) @@ -119,7 +126,7 @@ debugServerCall call@(ServerCall (C.Call ptr) _ _ _ _) = do (C.Call parent) <- peek parentPtr' grpcDebug $ "debugServerCall(R): parent: " ++ show parent grpcDebug $ "debugServerCall(R): deadline ptr: " ++ show (callDeadline call) - timespec <- peek (callDeadline call) + timespec <- peek (callDeadlinePtr call) grpcDebug $ "debugServerCall(R): deadline: " ++ show (C.timeSpec timespec) #else {-# INLINE debugServerCall #-} @@ -144,4 +151,4 @@ destroyServerCall call@ServerCall{..} = do grpcDebug $ "freeing parentPtr: " ++ show parentPtr forM_ parentPtr free grpcDebug $ "destroying deadline." ++ show callDeadline - C.timespecDestroy callDeadline + C.timespecDestroy callDeadlinePtr diff --git a/src/Network/GRPC/LowLevel/CompletionQueue.hs b/src/Network/GRPC/LowLevel/CompletionQueue.hs index a3eb0ff..ec060fd 100644 --- a/src/Network/GRPC/LowLevel/CompletionQueue.hs +++ b/src/Network/GRPC/LowLevel/CompletionQueue.hs @@ -44,6 +44,8 @@ import qualified Network.GRPC.Unsafe.Constants as C import qualified Network.GRPC.Unsafe.Metadata as C import qualified Network.GRPC.Unsafe.Op as C import qualified Network.GRPC.Unsafe.Time as C +import System.Clock (getTime, Clock(..), + TimeSpec(..)) import System.Timeout (timeout) import Network.GRPC.LowLevel.Call @@ -135,9 +137,8 @@ serverRequestCall :: C.Server serverRequestCall server cq@CompletionQueue{..} timeLimit RegisteredMethod{..} = withPermission Push cq $ do - -- TODO: Is gRPC supposed to populate this deadline? -- NOTE: the below stuff is freed when we free the call we return. - deadline <- C.secondsToDeadline timeLimit + deadlinePtr <- malloc callPtr <- malloc metadataArrayPtr <- C.metadataArrayCreate metadataArray <- peek metadataArrayPtr @@ -145,25 +146,26 @@ serverRequestCall tag <- newTag cq grpcDebug $ "serverRequestCall(R): tag is " ++ show tag callError <- C.grpcServerRequestRegisteredCall - server methodHandle callPtr deadline + server methodHandle callPtr deadlinePtr metadataArray bbPtr unsafeCQ unsafeCQ tag grpcDebug $ "serverRequestCall(R): callError: " ++ show callError if callError /= C.CallOk then do grpcDebug "serverRequestCall(R): callError. cleaning up" - failureCleanup deadline callPtr metadataArrayPtr bbPtr + failureCleanup deadlinePtr callPtr metadataArrayPtr bbPtr return $ Left $ GRPCIOCallError callError else do pluckResult <- pluck cq tag (Just timeLimit) grpcDebug "serverRequestCall(R): finished pluck." case pluckResult of Left x -> do grpcDebug "serverRequestCall(R): cleanup pluck err" - failureCleanup deadline callPtr metadataArrayPtr bbPtr + failureCleanup deadlinePtr callPtr metadataArrayPtr bbPtr return $ Left x Right () -> do rawCall <- peek callPtr + deadline <- convertDeadline deadlinePtr let assembledCall = ServerCall rawCall metadataArrayPtr - bbPtr Nothing deadline + bbPtr Nothing deadlinePtr deadline return $ Right assembledCall --TODO: the gRPC library appears to hold onto these pointers for a random -- amount of time, even after returning from the only call that uses them. @@ -178,6 +180,12 @@ serverRequestCall free callPtr C.metadataArrayDestroy metadataArrayPtr free bbPtr + convertDeadline deadline = do + --gRPC gives us a deadline that is just a delta, so we convert it + --to a proper deadline. + deadline' <- C.timeSpec <$> peek deadline + now <- getTime Monotonic + return $ now + deadline' -- | Register the server's completion queue. Must be done before the server is -- started. diff --git a/src/Network/GRPC/Unsafe/Time.chs b/src/Network/GRPC/Unsafe/Time.chs index eed1aa6..7471ae8 100644 --- a/src/Network/GRPC/Unsafe/Time.chs +++ b/src/Network/GRPC/Unsafe/Time.chs @@ -56,3 +56,11 @@ withDeadlineSeconds i = bracket (secondsToDeadline i) timespecDestroy withInfiniteDeadline :: (CTimeSpecPtr -> IO a) -> IO a withInfiniteDeadline = bracket infiniteDeadline timespecDestroy + +{#fun convert_clock_type as ^ {`CTimeSpecPtr', `ClockType'} -> `CTimeSpecPtr'#} + +withConvertedClockType :: CTimeSpecPtr -> ClockType + -> (CTimeSpecPtr -> IO a) + -> IO a +withConvertedClockType cptr ctype = bracket (convertClockType cptr ctype) + timespecDestroy diff --git a/tests/LowLevelTests.hs b/tests/LowLevelTests.hs index 3ea7ffd..f472324 100644 --- a/tests/LowLevelTests.hs +++ b/tests/LowLevelTests.hs @@ -38,6 +38,7 @@ lowLevelTests = testGroup "Unit tests of low-level Haskell library" , testServerCancel , testGoaway , testSlowServer + , testServerCallExpirationCheck ] testGRPCBracket :: TestTree @@ -243,6 +244,28 @@ testSlowServer = return ("", mempty, StatusOk, StatusDetails "") return () +testServerCallExpirationCheck :: TestTree +testServerCallExpirationCheck = + csTest "Check for call expiration" client server [("/foo", Normal)] + where + client c = do + rm <- clientRegisterMethod c "/foo" Normal + result <- clientRequest c rm 3 "" mempty + return () + server s = do + let rm = head (registeredMethods s) + serverHandleNormalCall s rm 5 mempty $ \c _ _ -> do + exp1 <- serverCallIsExpired c + assertBool "Call isn't expired when handler starts" $ not exp1 + threadDelaySecs 1 + exp2 <- serverCallIsExpired c + assertBool "Call isn't expired after 1 second" $ not exp2 + threadDelaySecs 3 + exp3 <- serverCallIsExpired c + assertBool "Call is expired after 4 seconds" exp3 + return ("", mempty, StatusDetails "") + return () + -------------------------------------------------------------------------------- -- Utilities and helpers @@ -328,3 +351,7 @@ stdTestServer = TestServer . stdServerConf stdServerConf :: [(MethodName, GRPCMethodType)] -> ServerConfig stdServerConf = ServerConfig "localhost" 50051 + + +threadDelaySecs :: Int -> IO () +threadDelaySecs = threadDelay . (* 10^(6::Int))