diff --git a/cbits/grpc_haskell.c b/cbits/grpc_haskell.c index bdffed8..14995a7 100644 --- a/cbits/grpc_haskell.c +++ b/cbits/grpc_haskell.c @@ -127,6 +127,12 @@ gpr_timespec* millis_to_deadline(int64_t millis){ return retval; } +gpr_timespec* infinite_deadline(){ + gpr_timespec *retval = malloc(sizeof(gpr_timespec)); + *retval = gpr_inf_future(GPR_CLOCK_MONOTONIC); + return retval; +} + grpc_metadata_array** metadata_array_create(){ grpc_metadata_array **retval = malloc(sizeof(grpc_metadata_array*)); *retval = malloc(sizeof(grpc_metadata_array)); diff --git a/include/grpc_haskell.h b/include/grpc_haskell.h index 132d4da..e03f884 100644 --- a/include/grpc_haskell.h +++ b/include/grpc_haskell.h @@ -52,6 +52,8 @@ gpr_timespec* seconds_to_deadline(int64_t seconds); gpr_timespec* millis_to_deadline(int64_t millis); +gpr_timespec* infinite_deadline(); + grpc_metadata_array** metadata_array_create(); void metadata_array_destroy(grpc_metadata_array **arr); diff --git a/src/Network/GRPC/LowLevel/Client.hs b/src/Network/GRPC/LowLevel/Client.hs index 836fde0..18ebbb3 100644 --- a/src/Network/GRPC/LowLevel/Client.hs +++ b/src/Network/GRPC/LowLevel/Client.hs @@ -41,12 +41,14 @@ createClient grpc clientConfig = do destroyClient :: Client -> IO () destroyClient Client{..} = do + grpcDebug "destroyClient: calling grpc_channel_destroy()" + C.grpcChannelDestroy clientChannel + grpcDebug "destroyClient: shutting down CQ." shutdownResult <- shutdownCompletionQueue clientCQ case shutdownResult of Left x -> do putStrLn $ "Failed to stop client CQ: " ++ show x putStrLn $ "Trying to shut down anyway." Right _ -> return () - C.grpcChannelDestroy clientChannel withClient :: GRPC -> ClientConfig -> (Client -> IO a) -> IO a withClient grpc config = bracket (createClient grpc config) @@ -159,18 +161,18 @@ clientRequest client@(Client{..}) rm@(RegisteredMethod{..}) let sendOps = [OpSendInitialMetadata meta , OpSendMessage body , OpSendCloseFromClient] - sendRes <- runOps call' clientCQ sendOps timeLimit + sendRes <- runOps call' clientCQ sendOps case sendRes of - Left x -> do grpcDebug "clientRequest(R) : batch error." + Left x -> do grpcDebug "clientRequest(R) : batch error sending." return $ Left x Right rs -> do let recvOps = [OpRecvInitialMetadata, OpRecvMessage, OpRecvStatusOnClient] - recvRes <- runOps call' clientCQ recvOps timeLimit + recvRes <- runOps call' clientCQ recvOps case recvRes of Left x -> do - grpcDebug "clientRequest(R): batch error." + grpcDebug "clientRequest(R): batch error receiving." return $ Left x Right rs' -> do grpcDebug $ "clientRequest(R): got " ++ show rs' diff --git a/src/Network/GRPC/LowLevel/Client/Unregistered.hs b/src/Network/GRPC/LowLevel/Client/Unregistered.hs index 1623801..e0a47d7 100644 --- a/src/Network/GRPC/LowLevel/Client/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/Client/Unregistered.hs @@ -63,7 +63,7 @@ clientRequest client@Client{..} meth timeLimit body meta = fmap join $ do withClientCall client meth timeLimit $ \call -> do let ops = clientNormalRequestOps body meta - results <- runOps (unClientCall call) clientCQ ops timeLimit + results <- runOps (unClientCall call) clientCQ ops grpcDebug "clientRequest(U): ops ran." case results of Left x -> return $ Left x diff --git a/src/Network/GRPC/LowLevel/CompletionQueue.hs b/src/Network/GRPC/LowLevel/CompletionQueue.hs index c491714..a3eb0ff 100644 --- a/src/Network/GRPC/LowLevel/CompletionQueue.hs +++ b/src/Network/GRPC/LowLevel/CompletionQueue.hs @@ -92,14 +92,17 @@ shutdownCompletionQueue (CompletionQueue{..}) = do --drain the queue C.grpcCompletionQueueShutdown unsafeCQ loopRes <- timeout (5*10^(6::Int)) drainLoop + grpcDebug $ "Got CQ loop shutdown result of: " ++ show loopRes case loopRes of Nothing -> return $ Left GRPCIOShutdownFailure Just () -> C.grpcCompletionQueueDestroy unsafeCQ >> return (Right ()) where drainLoop :: IO () drainLoop = do + grpcDebug "drainLoop: before next() call" ev <- C.withDeadlineSeconds 1 $ \deadline -> C.grpcCompletionQueueNext unsafeCQ deadline C.reserved + grpcDebug $ "drainLoop: next() call got " ++ show ev case (C.eventCompletionType ev) of C.QueueShutdown -> return () C.QueueTimeout -> drainLoop @@ -140,6 +143,7 @@ serverRequestCall metadataArray <- peek metadataArrayPtr bbPtr <- malloc tag <- newTag cq + grpcDebug $ "serverRequestCall(R): tag is " ++ show tag callError <- C.grpcServerRequestRegisteredCall server methodHandle callPtr deadline metadataArray bbPtr unsafeCQ unsafeCQ tag @@ -149,7 +153,7 @@ serverRequestCall then do grpcDebug "serverRequestCall(R): callError. cleaning up" failureCleanup deadline callPtr metadataArrayPtr bbPtr return $ Left $ GRPCIOCallError callError - else do pluckResult <- pluck cq tag timeLimit + else do pluckResult <- pluck cq tag (Just timeLimit) grpcDebug "serverRequestCall(R): finished pluck." case pluckResult of Left x -> do diff --git a/src/Network/GRPC/LowLevel/CompletionQueue/Internal.hs b/src/Network/GRPC/LowLevel/CompletionQueue/Internal.hs index 1facf67..6b9090f 100644 --- a/src/Network/GRPC/LowLevel/CompletionQueue/Internal.hs +++ b/src/Network/GRPC/LowLevel/CompletionQueue/Internal.hs @@ -97,17 +97,23 @@ withPermission op cq f = -- | Waits for the given number of seconds for the given tag to appear on the -- completion queue. Throws 'GRPCIOShutdown' if the completion queue is shutting --- down and cannot handle new requests. -pluck :: CompletionQueue -> C.Tag -> TimeoutSeconds +-- down and cannot handle new requests. Note that the timeout is optional. When +-- doing client ops, provide @Nothing@ and the pluck will automatically fail if +-- the deadline associated with the 'ClientCall' expires. If plucking +-- 'serverRequestCall', this will block forever unless a timeout is given. +pluck :: CompletionQueue -> C.Tag -> Maybe TimeoutSeconds -> IO (Either GRPCIOError ()) pluck cq@CompletionQueue{..} tag waitSeconds = do grpcDebug $ "pluck: called with tag: " ++ show tag ++ " and wait: " ++ show waitSeconds - withPermission Pluck cq $ do - C.withDeadlineSeconds waitSeconds $ \deadline -> do - ev <- C.grpcCompletionQueuePluck unsafeCQ tag deadline C.reserved - grpcDebug $ "pluck: finished. Event: " ++ show ev - return $ if isEventSuccessful ev then Right () else eventToError ev + withPermission Pluck cq $ + case waitSeconds of + Nothing -> C.withInfiniteDeadline go + Just seconds -> C.withDeadlineSeconds seconds go + where go deadline = do + ev <- C.grpcCompletionQueuePluck unsafeCQ tag deadline C.reserved + grpcDebug $ "pluck: finished. Event: " ++ show ev + return $ if isEventSuccessful ev then Right () else eventToError ev -- | Translate 'C.Event' to an error. The caller is responsible for ensuring -- that the event actually corresponds to an error condition; a successful event diff --git a/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs b/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs index 67cc169..bc9d0fe 100644 --- a/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/CompletionQueue/Unregistered.hs @@ -48,7 +48,7 @@ serverRequestCall server cq@CompletionQueue{..} timeLimit = then do grpcDebug "serverRequestCall: got call error; cleaning up." failureCleanup callPtr callDetails metadataArrayPtr return $ Left $ GRPCIOCallError callError - else do pluckResult <- pluck cq tag timeLimit + else do pluckResult <- pluck cq tag (Just timeLimit) grpcDebug $ "serverRequestCall: pluckResult was " ++ show pluckResult case pluckResult of diff --git a/src/Network/GRPC/LowLevel/GRPC.hs b/src/Network/GRPC/LowLevel/GRPC.hs index aa4106f..17e91aa 100644 --- a/src/Network/GRPC/LowLevel/GRPC.hs +++ b/src/Network/GRPC/LowLevel/GRPC.hs @@ -25,7 +25,8 @@ newtype StatusDetails = StatusDetails B.ByteString deriving (Show, Eq, IsString) data GRPC = GRPC withGRPC :: (GRPC -> IO a) -> IO a -withGRPC = bracket (C.grpcInit >> return GRPC) (const C.grpcShutdown) +withGRPC = bracket (C.grpcInit >> return GRPC) + (\_ -> grpcDebug "withGRPC: shutting down" >> C.grpcShutdown) -- | Describes all errors that can occur while running a GRPC-related IO action. data GRPCIOError = GRPCIOCallError C.CallError diff --git a/src/Network/GRPC/LowLevel/Op.hs b/src/Network/GRPC/LowLevel/Op.hs index 88fb211..fc1d0ac 100644 --- a/src/Network/GRPC/LowLevel/Op.hs +++ b/src/Network/GRPC/LowLevel/Op.hs @@ -130,7 +130,7 @@ freeOpContext (OpRecvCloseOnServerContext pcancelled) = >> free pcancelled -- | Allocates an `OpArray` and a list of `OpContext`s from the given list of --- `Op`s. +-- `Op`s. withOpArrayAndCtxts :: [Op] -> ((C.OpArray, [OpContext]) -> IO a) -> IO a withOpArrayAndCtxts ops = bracket setup teardown where setup = do ctxts <- mapM createOpContext ops @@ -186,8 +186,9 @@ resultFromOpContext _ = do return Nothing -- | For a given call, run the given 'Op's on the given completion queue with --- the given tag. Blocks until the ops are complete or the given number of --- seconds have elapsed. TODO: now that we distinguish between different types +-- the given tag. Blocks until the ops are complete or the deadline on the +-- associated call has been reached. +-- TODO: now that we distinguish between different types -- of calls at the type level, we could try to limit the input 'Op's more -- appropriately. E.g., we don't use an 'OpRecvInitialMetadata' when receiving a -- registered call, because gRPC handles that for us. @@ -211,23 +212,20 @@ runOps :: C.Call -- running. -> [Op] -- ^ The list of 'Op's to execute. - -> TimeoutSeconds - -- ^ How long to block waiting for the tag to appear on the queue. If - -- we time out, the result of this action will be @CallBatchError - -- BatchTimeout@. -> IO (Either GRPCIOError [OpRecvResult]) -runOps call cq ops timeLimit = +runOps call cq ops = let l = length ops in withOpArrayAndCtxts ops $ \(opArray, contexts) -> do grpcDebug $ "runOps: allocated op contexts: " ++ show contexts tag <- newTag cq + grpcDebug $ "runOps: tag: " ++ show tag callError <- startBatch cq call opArray l tag grpcDebug $ "runOps: called start_batch. callError: " ++ (show callError) case callError of Left x -> return $ Left x Right () -> do - ev <- pluck cq tag timeLimit + ev <- pluck cq tag Nothing grpcDebug $ "runOps: pluck returned " ++ show ev case ev of Right () -> do diff --git a/src/Network/GRPC/LowLevel/Server.hs b/src/Network/GRPC/LowLevel/Server.hs index e2c4172..759457f 100644 --- a/src/Network/GRPC/LowLevel/Server.hs +++ b/src/Network/GRPC/LowLevel/Server.hs @@ -83,7 +83,9 @@ stopServer (Server server cq _ _) = do shutdownNotify = do let shutdownTag = C.tag 0 serverShutdownAndNotify server cq shutdownTag - shutdownEvent <- pluck cq shutdownTag 30 + grpcDebug "called serverShutdownAndNotify; plucking." + shutdownEvent <- pluck cq shutdownTag (Just 30) + grpcDebug $ "shutdownNotify: got shutdown event" ++ show shutdownEvent case shutdownEvent of -- This case occurs when we pluck but the queue is already in the -- 'shuttingDown' state, implying we already tried to shut down. @@ -181,10 +183,6 @@ serverHandleNormalCall :: Server -> ServerHandler -> IO (Either GRPCIOError ()) serverHandleNormalCall s@Server{..} rm timeLimit initMeta f = do - -- TODO: we use this timeLimit twice, so the max time spent is 2*timeLimit. - -- Should we just hard-code time limits instead? Not sure if client - -- programmer cares, since this function will likely just be put in a loop - -- anyway. withServerCall s rm timeLimit $ \call -> do grpcDebug "serverHandleNormalCall(R): starting batch." debugServerCall call @@ -199,7 +197,7 @@ serverHandleNormalCall s@Server{..} rm timeLimit initMeta f = do let status = C.GrpcStatusOk let respOps = serverOpsSendNormalRegisteredResponse respBody initMeta trailingMeta status details - respOpsResults <- runOps (unServerCall call) serverCQ respOps timeLimit + respOpsResults <- runOps (unServerCall call) serverCQ respOps grpcDebug "serverHandleNormalCall(R): finished response ops." case respOpsResults of Left x -> return $ Left x diff --git a/src/Network/GRPC/LowLevel/Server/Unregistered.hs b/src/Network/GRPC/LowLevel/Server/Unregistered.hs index 9340f3c..323c2f3 100644 --- a/src/Network/GRPC/LowLevel/Server/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/Server/Unregistered.hs @@ -66,7 +66,7 @@ serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do grpcDebug "serverHandleNormalCall(U): starting batch." let recvOps = serverOpsGetNormalCall srvMetadata call' = unServerCall call - opResults <- runOps call' serverCQ recvOps timeLimit + opResults <- runOps call' serverCQ recvOps case opResults of Left x -> do grpcDebug "serverHandleNormalCall(U): ops failed; aborting" return $ Left x @@ -80,7 +80,7 @@ serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do let status = C.GrpcStatusOk let respOps = serverOpsSendNormalResponse respBody respMetadata status details - respOpsResults <- runOps call' serverCQ respOps timeLimit + respOpsResults <- runOps call' serverCQ respOps case respOpsResults of Left x -> do grpcDebug "serverHandleNormalCall(U): resp failed." return $ Left x diff --git a/src/Network/GRPC/Unsafe/Time.chs b/src/Network/GRPC/Unsafe/Time.chs index ab88d8f..eed1aa6 100644 --- a/src/Network/GRPC/Unsafe/Time.chs +++ b/src/Network/GRPC/Unsafe/Time.chs @@ -49,3 +49,10 @@ withDeadlineSeconds i = bracket (secondsToDeadline i) timespecDestroy -- | Returns a GprClockMonotonic representing a deadline n milliseconds -- in the future. {#fun millis_to_deadline as ^ {`Int'} -> `CTimeSpecPtr'#} + +-- | Returns a GprClockMonotonic representing an infinitely distant deadline. +-- wraps gpr_inf_future in the gRPC library. +{#fun infinite_deadline as ^ {} -> `CTimeSpecPtr'#} + +withInfiniteDeadline :: (CTimeSpecPtr -> IO a) -> IO a +withInfiniteDeadline = bracket infiniteDeadline timespecDestroy diff --git a/tests/LowLevelTests.hs b/tests/LowLevelTests.hs index d17f0c8..337ca58 100644 --- a/tests/LowLevelTests.hs +++ b/tests/LowLevelTests.hs @@ -182,7 +182,7 @@ testGoaway = assertBool "Client handles server shutdown gracefully" $ lastResult == unavailableStatus || - lastResult == Left GRPCIOTimeout + lastResult == deadlineExceededStatus server s = do let rm = head (registeredMethods s) serverHandleNormalCall s rm 11 mempty dummyHandler @@ -196,10 +196,7 @@ testSlowServer = client c = do rm <- clientRegisterMethod c "/foo" Normal result <- clientRequest c rm 1 "" mempty - assertBool "Client gets timeout or deadline exceeded" $ - result == Left GRPCIOTimeout - || - result == deadlineExceededStatus + result @?= deadlineExceededStatus server s = do let rm = head (registeredMethods s) serverHandleNormalCall s rm 1 mempty $ \_ _ _ -> do diff --git a/tests/LowLevelTests/Op.hs b/tests/LowLevelTests/Op.hs index f0b1e5b..ac454e0 100644 --- a/tests/LowLevelTests/Op.hs +++ b/tests/LowLevelTests/Op.hs @@ -35,7 +35,7 @@ testCancelWhileHandling = withOpArrayAndCtxts serverEmptyRecvOps $ \(opArray, ctxts) -> do tag <- newTag serverCQ startBatch serverCQ unServerCall opArray 3 tag - pluck serverCQ tag 1 + pluck serverCQ tag (Just 1) let (OpRecvCloseOnServerContext pcancelled) = last ctxts cancelledBefore <- peek pcancelled cancelledBefore @?= 0 @@ -52,7 +52,7 @@ testCancelFromServer = withClientServerUnaryCall grpc $ \(c@Client{..}, s@Server{..}, cc@ClientCall{..}, sc@ServerCall{..}) -> do serverCallCancel sc GrpcStatusPermissionDenied "TestStatus" - clientRes <- runOps unClientCall clientCQ clientRecvOps 1 + clientRes <- runOps unClientCall clientCQ clientRecvOps case clientRes of Left x -> error $ "Client recv error: " ++ show x Right [_,_,OpRecvStatusOnClientResult _ code details] -> do @@ -83,7 +83,7 @@ withClientServerUnaryCall grpc f = do -- because registered methods try to do recv ops immediately when -- created. If later we want to send payloads or metadata, we'll need -- to tweak this. - clientRes <- runOps (unClientCall cc) (clientCQ c) clientEmptySendOps 1 + clientRes <- runOps (unClientCall cc) (clientCQ c) clientEmptySendOps withServerCall s srm 10 $ \sc -> f (c, s, cc, sc)