mirror of
https://github.com/unclechu/gRPC-haskell.git
synced 2025-01-12 20:19:47 +01:00
Fix race condition while tracking outstanding handler threads (#66)
* Demonstrate shutdown concurrency bug via unregistered server streaming test * Comments, cleanup * Remove extraneous liftIOs from low level tests
This commit is contained in:
parent
d618b90481
commit
d7b00ac054
3 changed files with 32 additions and 27 deletions
|
@ -8,6 +8,7 @@ GRPC
|
|||
, GRPCIOError(..)
|
||||
, throwIfCallError
|
||||
, grpcDebug
|
||||
, grpcDebug'
|
||||
, threadDelaySecs
|
||||
, C.MetadataMap(..)
|
||||
, StatusDetails(..)
|
||||
|
|
|
@ -80,14 +80,24 @@ forkServer :: Server -> IO () -> IO Bool
|
|||
forkServer Server{..} f = do
|
||||
shutdown <- readTVarIO serverShuttingDown
|
||||
case shutdown of
|
||||
True -> return False
|
||||
True -> return False
|
||||
False -> do
|
||||
tid <- forkFinally f cleanup
|
||||
atomically $ modifyTVar' outstandingForks (S.insert tid)
|
||||
-- NB: The spawned thread waits on 'ready' before running 'f' to ensure
|
||||
-- that its ThreadId is inserted into outstandingForks before the cleanup
|
||||
-- function deletes it from there. Not doing this can lead to stale thread
|
||||
-- ids in the set when handlers cleanup ahead of the insertion, and a
|
||||
-- subsequent deadlock in stopServer. We can use a dead-list instead if we
|
||||
-- need something more performant.
|
||||
ready <- newTVarIO False
|
||||
tid <- let act = do atomically (check =<< readTVar ready)
|
||||
f
|
||||
in forkFinally act cleanup
|
||||
atomically $ do
|
||||
modifyTVar' outstandingForks (S.insert tid)
|
||||
modifyTVar' ready (const True)
|
||||
#ifdef DEBUG
|
||||
currSet <- readTVarIO outstandingForks
|
||||
grpcDebug $ "forkServer: number of outstandingForks is "
|
||||
++ show (S.size currSet)
|
||||
tids <- readTVarIO outstandingForks
|
||||
grpcDebug $ "after fork and bookkeeping: outstandingForks=" ++ show tids
|
||||
#endif
|
||||
return True
|
||||
where cleanup _ = do
|
||||
|
@ -176,16 +186,17 @@ stopServer Server{ unsafeServer = s, .. } = do
|
|||
case shutdownEvent of
|
||||
-- This case occurs when we pluck but the queue is already in the
|
||||
-- 'shuttingDown' state, implying we already tried to shut down.
|
||||
(Left GRPCIOShutdown) -> error "Called stopServer twice!"
|
||||
(Left _) -> error "Failed to stop server."
|
||||
(Right _) -> return ()
|
||||
Left GRPCIOShutdown -> error "Called stopServer twice!"
|
||||
Left _ -> error "Failed to stop server."
|
||||
Right _ -> return ()
|
||||
cleanupForks = do
|
||||
atomically $ writeTVar serverShuttingDown True
|
||||
liveForks <- readTVarIO outstandingForks
|
||||
grpcDebug $ "Server shutdown: killing threads: " ++ show liveForks
|
||||
mapM_ killThread liveForks
|
||||
--wait for threads to shut down.
|
||||
atomically $ readTVar outstandingForks >>= (check . (==0) . S.size)
|
||||
-- wait for threads to shut down
|
||||
grpcDebug "Server shutdown: waiting until all threads are dead."
|
||||
atomically $ check . (==0) . S.size =<< readTVar outstandingForks
|
||||
grpcDebug "Server shutdown: All forks cleaned up."
|
||||
|
||||
-- Uses 'bracket' to safely start and stop a server, even if exceptions occur.
|
||||
|
|
|
@ -179,7 +179,7 @@ testServerStreaming =
|
|||
client c = do
|
||||
rm <- clientRegisterMethodServerStreaming c "/feed"
|
||||
eea <- clientReader c rm 10 clientPay clientInitMD $ \initMD recv -> do
|
||||
liftIO $ checkMD "Server initial metadata mismatch" serverInitMD initMD
|
||||
checkMD "Server initial metadata mismatch" serverInitMD initMD
|
||||
forM_ pays $ \p -> recv `is` Right (Just p)
|
||||
recv `is` Right Nothing
|
||||
eea @?= Right (dummyMeta, StatusOk, "dtls")
|
||||
|
@ -187,10 +187,8 @@ testServerStreaming =
|
|||
server s = do
|
||||
let rm = head (sstreamingMethods s)
|
||||
r <- serverWriter s rm serverInitMD $ \sc send -> do
|
||||
liftIO $ do
|
||||
checkMD "Server request metadata mismatch"
|
||||
clientInitMD (metadata sc)
|
||||
payload sc @?= clientPay
|
||||
checkMD "Server request metadata mismatch" clientInitMD (metadata sc)
|
||||
payload sc @?= clientPay
|
||||
forM_ pays $ \p -> send p `is` Right ()
|
||||
return (dummyMeta, StatusOk, "dtls")
|
||||
r @?= Right ()
|
||||
|
@ -211,17 +209,15 @@ testServerStreamingUnregistered =
|
|||
client c = do
|
||||
rm <- clientRegisterMethodServerStreaming c "/feed"
|
||||
eea <- clientReader c rm 10 clientPay clientInitMD $ \initMD recv -> do
|
||||
liftIO $ checkMD "Server initial metadata mismatch" serverInitMD initMD
|
||||
checkMD "Server initial metadata mismatch" serverInitMD initMD
|
||||
forM_ pays $ \p -> recv `is` Right (Just p)
|
||||
recv `is` Right Nothing
|
||||
eea @?= Right (dummyMeta, StatusOk, "dtls")
|
||||
|
||||
server s = U.withServerCallAsync s $ \call -> do
|
||||
r <- U.serverWriter s call serverInitMD $ \sc send -> do
|
||||
liftIO $ do
|
||||
checkMD "Server request metadata mismatch"
|
||||
clientInitMD (metadata sc)
|
||||
payload sc @?= clientPay
|
||||
checkMD "Server request metadata mismatch" clientInitMD (metadata sc)
|
||||
payload sc @?= clientPay
|
||||
forM_ pays $ \p -> send p `is` Right ()
|
||||
return (dummyMeta, StatusOk, "dtls")
|
||||
r @?= Right ()
|
||||
|
@ -248,8 +244,7 @@ testClientStreaming =
|
|||
server s = do
|
||||
let rm = head (cstreamingMethods s)
|
||||
eea <- serverReader s rm serverInitMD $ \sc recv -> do
|
||||
liftIO $ checkMD "Client request metadata mismatch"
|
||||
clientInitMD (metadata sc)
|
||||
checkMD "Client request metadata mismatch" clientInitMD (metadata sc)
|
||||
forM_ pays $ \p -> recv `is` Right (Just p)
|
||||
recv `is` Right Nothing
|
||||
return (Just serverRsp, trailMD, serverStatus, serverDtls)
|
||||
|
@ -276,8 +271,7 @@ testClientStreamingUnregistered =
|
|||
|
||||
server s = U.withServerCallAsync s $ \call -> do
|
||||
eea <- U.serverReader s call serverInitMD $ \sc recv -> do
|
||||
liftIO $ checkMD "Client request metadata mismatch"
|
||||
clientInitMD (metadata sc)
|
||||
checkMD "Client request metadata mismatch" clientInitMD (metadata sc)
|
||||
forM_ pays $ \p -> recv `is` Right (Just p)
|
||||
recv `is` Right Nothing
|
||||
return (Just serverRsp, trailMD, serverStatus, serverDtls)
|
||||
|
@ -308,8 +302,7 @@ testBiDiStreaming =
|
|||
server s = do
|
||||
let rm = head (bidiStreamingMethods s)
|
||||
eea <- serverRW s rm serverInitMD $ \sc recv send -> do
|
||||
liftIO $ checkMD "Client request metadata mismatch"
|
||||
clientInitMD (metadata sc)
|
||||
checkMD "Client request metadata mismatch" clientInitMD (metadata sc)
|
||||
recv `is` Right (Just "cw0")
|
||||
send "sw0" `is` Right ()
|
||||
recv `is` Right (Just "cw1")
|
||||
|
|
Loading…
Reference in a new issue