mirror of
https://github.com/unclechu/gRPC-haskell.git
synced 2024-11-26 21:19:43 +01:00
Fix async race causing crash on server shutdown (#43)
* clean up forked threads before shutting down server * get rid of sleep, wait until all forked threads are dead to shutdown server * add todo
This commit is contained in:
parent
99e6f0652d
commit
65026eb79c
3 changed files with 82 additions and 11 deletions
|
@ -80,7 +80,7 @@ library
|
||||||
, grpc/impl/codegen/slice.h
|
, grpc/impl/codegen/slice.h
|
||||||
build-tools: c2hs
|
build-tools: c2hs
|
||||||
default-language: Haskell2010
|
default-language: Haskell2010
|
||||||
ghc-options: -Wall -fwarn-incomplete-patterns -fno-warn-unused-do-bind
|
ghc-options: -Wall -fwarn-incomplete-patterns -fno-warn-unused-do-bind -threaded
|
||||||
include-dirs: include
|
include-dirs: include
|
||||||
hs-source-dirs: src
|
hs-source-dirs: src
|
||||||
default-extensions: CPP
|
default-extensions: CPP
|
||||||
|
|
|
@ -14,10 +14,24 @@
|
||||||
-- `Network.GRPC.LowLevel.Server.Unregistered`.
|
-- `Network.GRPC.LowLevel.Server.Unregistered`.
|
||||||
module Network.GRPC.LowLevel.Server where
|
module Network.GRPC.LowLevel.Server where
|
||||||
|
|
||||||
|
import Control.Concurrent (ThreadId
|
||||||
|
, forkFinally
|
||||||
|
, myThreadId
|
||||||
|
, killThread)
|
||||||
|
import Control.Concurrent.STM (atomically
|
||||||
|
, check)
|
||||||
|
import Control.Concurrent.STM.TVar (TVar
|
||||||
|
, modifyTVar'
|
||||||
|
, readTVar
|
||||||
|
, writeTVar
|
||||||
|
, readTVarIO
|
||||||
|
, newTVarIO)
|
||||||
import Control.Exception (bracket, finally)
|
import Control.Exception (bracket, finally)
|
||||||
import Control.Monad
|
import Control.Monad hiding (mapM_)
|
||||||
import Control.Monad.Trans.Except
|
import Control.Monad.Trans.Except
|
||||||
import Data.ByteString (ByteString)
|
import Data.ByteString (ByteString)
|
||||||
|
import Data.Foldable (mapM_)
|
||||||
|
import qualified Data.Set as S
|
||||||
import Network.GRPC.LowLevel.Call
|
import Network.GRPC.LowLevel.Call
|
||||||
import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue,
|
import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue,
|
||||||
createCompletionQueue,
|
createCompletionQueue,
|
||||||
|
@ -42,8 +56,41 @@ data Server = Server
|
||||||
, cstreamingMethods :: [RegisteredMethod 'ClientStreaming]
|
, cstreamingMethods :: [RegisteredMethod 'ClientStreaming]
|
||||||
, bidiStreamingMethods :: [RegisteredMethod 'BiDiStreaming]
|
, bidiStreamingMethods :: [RegisteredMethod 'BiDiStreaming]
|
||||||
, serverConfig :: ServerConfig
|
, serverConfig :: ServerConfig
|
||||||
|
, outstandingForks :: TVar (S.Set ThreadId)
|
||||||
|
, serverShuttingDown :: TVar Bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
-- TODO: should we make a forkGRPC function instead? I am not sure if it would
|
||||||
|
-- be safe to let the call handlers threads keep running after the server stops,
|
||||||
|
-- so I'm taking the more conservative route of ensuring the server will
|
||||||
|
-- stay alive. Experiment more when time permits.
|
||||||
|
-- | Fork a thread from the server (presumably for a handler) with the guarantee
|
||||||
|
-- that the server won't shut down while the thread is alive.
|
||||||
|
-- Returns true if the fork happens successfully, and false if the server is
|
||||||
|
-- already shutting down (in which case the function to fork is never executed).
|
||||||
|
-- If the thread stays alive too long while the server is trying to shut down,
|
||||||
|
-- the thread will be killed with 'killThread'.
|
||||||
|
-- The purpose of this is to prevent memory access
|
||||||
|
-- errors at the C level of the library, not to ensure that application layer
|
||||||
|
-- operations in user code complete successfully.
|
||||||
|
forkServer :: Server -> IO () -> IO Bool
|
||||||
|
forkServer Server{..} f = do
|
||||||
|
shutdown <- readTVarIO serverShuttingDown
|
||||||
|
case shutdown of
|
||||||
|
True -> return False
|
||||||
|
False -> do
|
||||||
|
tid <- forkFinally f cleanup
|
||||||
|
atomically $ modifyTVar' outstandingForks (S.insert tid)
|
||||||
|
#ifdef DEBUG
|
||||||
|
currSet <- readTVarIO outstandingForks
|
||||||
|
grpcDebug $ "forkServer: number of outstandingForks is "
|
||||||
|
++ show (S.size currSet)
|
||||||
|
#endif
|
||||||
|
return True
|
||||||
|
where cleanup _ = do
|
||||||
|
tid <- myThreadId
|
||||||
|
atomically $ modifyTVar' outstandingForks (S.delete tid)
|
||||||
|
|
||||||
-- | Configuration needed to start a server.
|
-- | Configuration needed to start a server.
|
||||||
data ServerConfig = ServerConfig
|
data ServerConfig = ServerConfig
|
||||||
{ host :: Host
|
{ host :: Host
|
||||||
|
@ -90,15 +137,18 @@ startServer grpc conf@ServerConfig{..} =
|
||||||
bs <- mapM (\nm -> serverRegisterMethodBiDiStreaming server nm e)
|
bs <- mapM (\nm -> serverRegisterMethodBiDiStreaming server nm e)
|
||||||
methodsToRegisterBiDiStreaming
|
methodsToRegisterBiDiStreaming
|
||||||
C.grpcServerStart server
|
C.grpcServerStart server
|
||||||
return $ Server grpc server cq ns ss cs bs conf
|
forks <- newTVarIO S.empty
|
||||||
|
shutdown <- newTVarIO False
|
||||||
|
return $ Server grpc server cq ns ss cs bs conf forks shutdown
|
||||||
|
|
||||||
stopServer :: Server -> IO ()
|
stopServer :: Server -> IO ()
|
||||||
-- TODO: Do method handles need to be freed?
|
-- TODO: Do method handles need to be freed?
|
||||||
stopServer Server{ unsafeServer = s, serverCQ = scq } = do
|
stopServer Server{ unsafeServer = s, serverCQ = scq, .. } = do
|
||||||
grpcDebug "stopServer: calling shutdownNotify."
|
grpcDebug "stopServer: calling shutdownNotify."
|
||||||
shutdownNotify
|
shutdownNotify
|
||||||
grpcDebug "stopServer: cancelling all calls."
|
grpcDebug "stopServer: cancelling all calls."
|
||||||
C.grpcServerCancelAllCalls s
|
C.grpcServerCancelAllCalls s
|
||||||
|
cleanupForks
|
||||||
grpcDebug "stopServer: call grpc_server_destroy."
|
grpcDebug "stopServer: call grpc_server_destroy."
|
||||||
C.grpcServerDestroy s
|
C.grpcServerDestroy s
|
||||||
grpcDebug "stopServer: shutting down CQ."
|
grpcDebug "stopServer: shutting down CQ."
|
||||||
|
@ -122,6 +172,14 @@ stopServer Server{ unsafeServer = s, serverCQ = scq } = do
|
||||||
(Left GRPCIOShutdown) -> error "Called stopServer twice!"
|
(Left GRPCIOShutdown) -> error "Called stopServer twice!"
|
||||||
(Left _) -> error "Failed to stop server."
|
(Left _) -> error "Failed to stop server."
|
||||||
(Right _) -> return ()
|
(Right _) -> return ()
|
||||||
|
cleanupForks = do
|
||||||
|
atomically $ writeTVar serverShuttingDown True
|
||||||
|
liveForks <- readTVarIO outstandingForks
|
||||||
|
grpcDebug $ "Server shutdown: killing threads: " ++ show liveForks
|
||||||
|
mapM_ killThread liveForks
|
||||||
|
--wait for threads to shut down.
|
||||||
|
atomically $ readTVar outstandingForks >>= (check . (==0) . S.size)
|
||||||
|
grpcDebug "Server shutdown: All forks cleaned up."
|
||||||
|
|
||||||
-- Uses 'bracket' to safely start and stop a server, even if exceptions occur.
|
-- Uses 'bracket' to safely start and stop a server, even if exceptions occur.
|
||||||
withServer :: GRPC -> ServerConfig -> (Server -> IO a) -> IO a
|
withServer :: GRPC -> ServerConfig -> (Server -> IO a) -> IO a
|
||||||
|
|
|
@ -3,9 +3,7 @@
|
||||||
|
|
||||||
module Network.GRPC.LowLevel.Server.Unregistered where
|
module Network.GRPC.LowLevel.Server.Unregistered where
|
||||||
|
|
||||||
import Control.Concurrent (forkIO)
|
|
||||||
import Control.Exception (finally)
|
import Control.Exception (finally)
|
||||||
import Control.Monad
|
|
||||||
import Control.Monad.Trans.Except
|
import Control.Monad.Trans.Except
|
||||||
import Data.ByteString (ByteString)
|
import Data.ByteString (ByteString)
|
||||||
import Network.GRPC.LowLevel.Call.Unregistered
|
import Network.GRPC.LowLevel.Call.Unregistered
|
||||||
|
@ -25,7 +23,8 @@ import Network.GRPC.LowLevel.Op (Op (..)
|
||||||
import Network.GRPC.LowLevel.Server (Server (..)
|
import Network.GRPC.LowLevel.Server (Server (..)
|
||||||
, ServerReaderHandlerLL
|
, ServerReaderHandlerLL
|
||||||
, ServerWriterHandlerLL
|
, ServerWriterHandlerLL
|
||||||
, ServerRWHandlerLL)
|
, ServerRWHandlerLL
|
||||||
|
, forkServer)
|
||||||
import qualified Network.GRPC.Unsafe.Op as C
|
import qualified Network.GRPC.Unsafe.Op as C
|
||||||
|
|
||||||
serverCreateCall :: Server
|
serverCreateCall :: Server
|
||||||
|
@ -54,10 +53,24 @@ withServerCallAsync :: Server
|
||||||
-> IO ()
|
-> IO ()
|
||||||
withServerCallAsync s f =
|
withServerCallAsync s f =
|
||||||
serverCreateCall s >>= \case
|
serverCreateCall s >>= \case
|
||||||
Left _ -> return ()
|
Left e -> do grpcDebug $ "withServerCallAsync: call error: " ++ show e
|
||||||
Right c -> void $ forkIO (f c `finally` do
|
return ()
|
||||||
|
Right c -> do wasForkSuccess <- forkServer s handler
|
||||||
|
if wasForkSuccess
|
||||||
|
then return ()
|
||||||
|
else destroy
|
||||||
|
where handler = f c `finally` destroy
|
||||||
|
--TODO: We sometimes never finish cleanup if the server
|
||||||
|
-- is shutting down and calls killThread. This causes
|
||||||
|
-- gRPC core to complain about leaks.
|
||||||
|
-- I think the cause of this is that killThread gets
|
||||||
|
-- called after we are already in destroyServerCall,
|
||||||
|
-- and wrapping uninterruptibleMask doesn't seem to help.
|
||||||
|
-- Doesn't crash, but does emit annoying log messages.
|
||||||
|
destroy = do
|
||||||
grpcDebug "withServerCallAsync: destroying."
|
grpcDebug "withServerCallAsync: destroying."
|
||||||
destroyServerCall c)
|
destroyServerCall c
|
||||||
|
grpcDebug "withServerCallAsync: cleanup finished."
|
||||||
|
|
||||||
-- | Sequence of 'Op's needed to receive a normal (non-streaming) call.
|
-- | Sequence of 'Op's needed to receive a normal (non-streaming) call.
|
||||||
-- TODO: We have to put 'OpRecvCloseOnServer' in the response ops, or else the
|
-- TODO: We have to put 'OpRecvCloseOnServer' in the response ops, or else the
|
||||||
|
|
Loading…
Reference in a new issue