From 65026eb79c1fdfee083050efe21ee0998c74146a Mon Sep 17 00:00:00 2001 From: Connor Clark Date: Mon, 18 Jul 2016 09:04:01 -0700 Subject: [PATCH] Fix async race causing crash on server shutdown (#43) * clean up forked threads before shutting down server * get rid of sleep, wait until all forked threads are dead to shutdown server * add todo --- grpc-haskell.cabal | 2 +- src/Network/GRPC/LowLevel/Server.hs | 64 ++++++++++++++++++- .../GRPC/LowLevel/Server/Unregistered.hs | 27 ++++++-- 3 files changed, 82 insertions(+), 11 deletions(-) diff --git a/grpc-haskell.cabal b/grpc-haskell.cabal index ecb9d4a..69e0b95 100644 --- a/grpc-haskell.cabal +++ b/grpc-haskell.cabal @@ -80,7 +80,7 @@ library , grpc/impl/codegen/slice.h build-tools: c2hs default-language: Haskell2010 - ghc-options: -Wall -fwarn-incomplete-patterns -fno-warn-unused-do-bind + ghc-options: -Wall -fwarn-incomplete-patterns -fno-warn-unused-do-bind -threaded include-dirs: include hs-source-dirs: src default-extensions: CPP diff --git a/src/Network/GRPC/LowLevel/Server.hs b/src/Network/GRPC/LowLevel/Server.hs index 435f7cb..e15d72f 100644 --- a/src/Network/GRPC/LowLevel/Server.hs +++ b/src/Network/GRPC/LowLevel/Server.hs @@ -14,10 +14,24 @@ -- `Network.GRPC.LowLevel.Server.Unregistered`. module Network.GRPC.LowLevel.Server where +import Control.Concurrent (ThreadId + , forkFinally + , myThreadId + , killThread) +import Control.Concurrent.STM (atomically + , check) +import Control.Concurrent.STM.TVar (TVar + , modifyTVar' + , readTVar + , writeTVar + , readTVarIO + , newTVarIO) import Control.Exception (bracket, finally) -import Control.Monad +import Control.Monad hiding (mapM_) import Control.Monad.Trans.Except import Data.ByteString (ByteString) +import Data.Foldable (mapM_) +import qualified Data.Set as S import Network.GRPC.LowLevel.Call import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue, createCompletionQueue, @@ -42,8 +56,41 @@ data Server = Server , cstreamingMethods :: [RegisteredMethod 'ClientStreaming] , bidiStreamingMethods :: [RegisteredMethod 'BiDiStreaming] , serverConfig :: ServerConfig + , outstandingForks :: TVar (S.Set ThreadId) + , serverShuttingDown :: TVar Bool } +-- TODO: should we make a forkGRPC function instead? I am not sure if it would +-- be safe to let the call handlers threads keep running after the server stops, +-- so I'm taking the more conservative route of ensuring the server will +-- stay alive. Experiment more when time permits. +-- | Fork a thread from the server (presumably for a handler) with the guarantee +-- that the server won't shut down while the thread is alive. +-- Returns true if the fork happens successfully, and false if the server is +-- already shutting down (in which case the function to fork is never executed). +-- If the thread stays alive too long while the server is trying to shut down, +-- the thread will be killed with 'killThread'. +-- The purpose of this is to prevent memory access +-- errors at the C level of the library, not to ensure that application layer +-- operations in user code complete successfully. +forkServer :: Server -> IO () -> IO Bool +forkServer Server{..} f = do + shutdown <- readTVarIO serverShuttingDown + case shutdown of + True -> return False + False -> do + tid <- forkFinally f cleanup + atomically $ modifyTVar' outstandingForks (S.insert tid) + #ifdef DEBUG + currSet <- readTVarIO outstandingForks + grpcDebug $ "forkServer: number of outstandingForks is " + ++ show (S.size currSet) + #endif + return True + where cleanup _ = do + tid <- myThreadId + atomically $ modifyTVar' outstandingForks (S.delete tid) + -- | Configuration needed to start a server. data ServerConfig = ServerConfig { host :: Host @@ -90,15 +137,18 @@ startServer grpc conf@ServerConfig{..} = bs <- mapM (\nm -> serverRegisterMethodBiDiStreaming server nm e) methodsToRegisterBiDiStreaming C.grpcServerStart server - return $ Server grpc server cq ns ss cs bs conf + forks <- newTVarIO S.empty + shutdown <- newTVarIO False + return $ Server grpc server cq ns ss cs bs conf forks shutdown stopServer :: Server -> IO () -- TODO: Do method handles need to be freed? -stopServer Server{ unsafeServer = s, serverCQ = scq } = do +stopServer Server{ unsafeServer = s, serverCQ = scq, .. } = do grpcDebug "stopServer: calling shutdownNotify." shutdownNotify grpcDebug "stopServer: cancelling all calls." C.grpcServerCancelAllCalls s + cleanupForks grpcDebug "stopServer: call grpc_server_destroy." C.grpcServerDestroy s grpcDebug "stopServer: shutting down CQ." @@ -122,6 +172,14 @@ stopServer Server{ unsafeServer = s, serverCQ = scq } = do (Left GRPCIOShutdown) -> error "Called stopServer twice!" (Left _) -> error "Failed to stop server." (Right _) -> return () + cleanupForks = do + atomically $ writeTVar serverShuttingDown True + liveForks <- readTVarIO outstandingForks + grpcDebug $ "Server shutdown: killing threads: " ++ show liveForks + mapM_ killThread liveForks + --wait for threads to shut down. + atomically $ readTVar outstandingForks >>= (check . (==0) . S.size) + grpcDebug "Server shutdown: All forks cleaned up." -- Uses 'bracket' to safely start and stop a server, even if exceptions occur. withServer :: GRPC -> ServerConfig -> (Server -> IO a) -> IO a diff --git a/src/Network/GRPC/LowLevel/Server/Unregistered.hs b/src/Network/GRPC/LowLevel/Server/Unregistered.hs index 42c79f8..22fcc72 100644 --- a/src/Network/GRPC/LowLevel/Server/Unregistered.hs +++ b/src/Network/GRPC/LowLevel/Server/Unregistered.hs @@ -3,9 +3,7 @@ module Network.GRPC.LowLevel.Server.Unregistered where -import Control.Concurrent (forkIO) import Control.Exception (finally) -import Control.Monad import Control.Monad.Trans.Except import Data.ByteString (ByteString) import Network.GRPC.LowLevel.Call.Unregistered @@ -25,7 +23,8 @@ import Network.GRPC.LowLevel.Op (Op (..) import Network.GRPC.LowLevel.Server (Server (..) , ServerReaderHandlerLL , ServerWriterHandlerLL - , ServerRWHandlerLL) + , ServerRWHandlerLL + , forkServer) import qualified Network.GRPC.Unsafe.Op as C serverCreateCall :: Server @@ -54,10 +53,24 @@ withServerCallAsync :: Server -> IO () withServerCallAsync s f = serverCreateCall s >>= \case - Left _ -> return () - Right c -> void $ forkIO (f c `finally` do - grpcDebug "withServerCallAsync: destroying." - destroyServerCall c) + Left e -> do grpcDebug $ "withServerCallAsync: call error: " ++ show e + return () + Right c -> do wasForkSuccess <- forkServer s handler + if wasForkSuccess + then return () + else destroy + where handler = f c `finally` destroy + --TODO: We sometimes never finish cleanup if the server + -- is shutting down and calls killThread. This causes + -- gRPC core to complain about leaks. + -- I think the cause of this is that killThread gets + -- called after we are already in destroyServerCall, + -- and wrapping uninterruptibleMask doesn't seem to help. + -- Doesn't crash, but does emit annoying log messages. + destroy = do + grpcDebug "withServerCallAsync: destroying." + destroyServerCall c + grpcDebug "withServerCallAsync: cleanup finished." -- | Sequence of 'Op's needed to receive a normal (non-streaming) call. -- TODO: We have to put 'OpRecvCloseOnServer' in the response ops, or else the