diff --git a/src/Network/GRPC/LowLevel/GRPC.hs b/src/Network/GRPC/LowLevel/GRPC.hs index 24457b6..15a7e07 100644 --- a/src/Network/GRPC/LowLevel/GRPC.hs +++ b/src/Network/GRPC/LowLevel/GRPC.hs @@ -8,6 +8,7 @@ GRPC , GRPCIOError(..) , throwIfCallError , grpcDebug +, grpcDebug' , threadDelaySecs , C.MetadataMap(..) , StatusDetails(..) diff --git a/src/Network/GRPC/LowLevel/Server.hs b/src/Network/GRPC/LowLevel/Server.hs index 799b2eb..3ced246 100644 --- a/src/Network/GRPC/LowLevel/Server.hs +++ b/src/Network/GRPC/LowLevel/Server.hs @@ -80,14 +80,24 @@ forkServer :: Server -> IO () -> IO Bool forkServer Server{..} f = do shutdown <- readTVarIO serverShuttingDown case shutdown of - True -> return False + True -> return False False -> do - tid <- forkFinally f cleanup - atomically $ modifyTVar' outstandingForks (S.insert tid) + -- NB: The spawned thread waits on 'ready' before running 'f' to ensure + -- that its ThreadId is inserted into outstandingForks before the cleanup + -- function deletes it from there. Not doing this can lead to stale thread + -- ids in the set when handlers cleanup ahead of the insertion, and a + -- subsequent deadlock in stopServer. We can use a dead-list instead if we + -- need something more performant. + ready <- newTVarIO False + tid <- let act = do atomically (check =<< readTVar ready) + f + in forkFinally act cleanup + atomically $ do + modifyTVar' outstandingForks (S.insert tid) + modifyTVar' ready (const True) #ifdef DEBUG - currSet <- readTVarIO outstandingForks - grpcDebug $ "forkServer: number of outstandingForks is " - ++ show (S.size currSet) + tids <- readTVarIO outstandingForks + grpcDebug $ "after fork and bookkeeping: outstandingForks=" ++ show tids #endif return True where cleanup _ = do @@ -176,16 +186,17 @@ stopServer Server{ unsafeServer = s, .. } = do case shutdownEvent of -- This case occurs when we pluck but the queue is already in the -- 'shuttingDown' state, implying we already tried to shut down. - (Left GRPCIOShutdown) -> error "Called stopServer twice!" - (Left _) -> error "Failed to stop server." - (Right _) -> return () + Left GRPCIOShutdown -> error "Called stopServer twice!" + Left _ -> error "Failed to stop server." + Right _ -> return () cleanupForks = do atomically $ writeTVar serverShuttingDown True liveForks <- readTVarIO outstandingForks grpcDebug $ "Server shutdown: killing threads: " ++ show liveForks mapM_ killThread liveForks - --wait for threads to shut down. - atomically $ readTVar outstandingForks >>= (check . (==0) . S.size) + -- wait for threads to shut down + grpcDebug "Server shutdown: waiting until all threads are dead." + atomically $ check . (==0) . S.size =<< readTVar outstandingForks grpcDebug "Server shutdown: All forks cleaned up." -- Uses 'bracket' to safely start and stop a server, even if exceptions occur. diff --git a/tests/LowLevelTests.hs b/tests/LowLevelTests.hs index e6e91e6..fb10593 100644 --- a/tests/LowLevelTests.hs +++ b/tests/LowLevelTests.hs @@ -179,7 +179,7 @@ testServerStreaming = client c = do rm <- clientRegisterMethodServerStreaming c "/feed" eea <- clientReader c rm 10 clientPay clientInitMD $ \initMD recv -> do - liftIO $ checkMD "Server initial metadata mismatch" serverInitMD initMD + checkMD "Server initial metadata mismatch" serverInitMD initMD forM_ pays $ \p -> recv `is` Right (Just p) recv `is` Right Nothing eea @?= Right (dummyMeta, StatusOk, "dtls") @@ -187,10 +187,8 @@ testServerStreaming = server s = do let rm = head (sstreamingMethods s) r <- serverWriter s rm serverInitMD $ \sc send -> do - liftIO $ do - checkMD "Server request metadata mismatch" - clientInitMD (metadata sc) - payload sc @?= clientPay + checkMD "Server request metadata mismatch" clientInitMD (metadata sc) + payload sc @?= clientPay forM_ pays $ \p -> send p `is` Right () return (dummyMeta, StatusOk, "dtls") r @?= Right () @@ -211,17 +209,15 @@ testServerStreamingUnregistered = client c = do rm <- clientRegisterMethodServerStreaming c "/feed" eea <- clientReader c rm 10 clientPay clientInitMD $ \initMD recv -> do - liftIO $ checkMD "Server initial metadata mismatch" serverInitMD initMD + checkMD "Server initial metadata mismatch" serverInitMD initMD forM_ pays $ \p -> recv `is` Right (Just p) recv `is` Right Nothing eea @?= Right (dummyMeta, StatusOk, "dtls") server s = U.withServerCallAsync s $ \call -> do r <- U.serverWriter s call serverInitMD $ \sc send -> do - liftIO $ do - checkMD "Server request metadata mismatch" - clientInitMD (metadata sc) - payload sc @?= clientPay + checkMD "Server request metadata mismatch" clientInitMD (metadata sc) + payload sc @?= clientPay forM_ pays $ \p -> send p `is` Right () return (dummyMeta, StatusOk, "dtls") r @?= Right () @@ -248,8 +244,7 @@ testClientStreaming = server s = do let rm = head (cstreamingMethods s) eea <- serverReader s rm serverInitMD $ \sc recv -> do - liftIO $ checkMD "Client request metadata mismatch" - clientInitMD (metadata sc) + checkMD "Client request metadata mismatch" clientInitMD (metadata sc) forM_ pays $ \p -> recv `is` Right (Just p) recv `is` Right Nothing return (Just serverRsp, trailMD, serverStatus, serverDtls) @@ -276,8 +271,7 @@ testClientStreamingUnregistered = server s = U.withServerCallAsync s $ \call -> do eea <- U.serverReader s call serverInitMD $ \sc recv -> do - liftIO $ checkMD "Client request metadata mismatch" - clientInitMD (metadata sc) + checkMD "Client request metadata mismatch" clientInitMD (metadata sc) forM_ pays $ \p -> recv `is` Right (Just p) recv `is` Right Nothing return (Just serverRsp, trailMD, serverStatus, serverDtls) @@ -308,8 +302,7 @@ testBiDiStreaming = server s = do let rm = head (bidiStreamingMethods s) eea <- serverRW s rm serverInitMD $ \sc recv send -> do - liftIO $ checkMD "Client request metadata mismatch" - clientInitMD (metadata sc) + checkMD "Client request metadata mismatch" clientInitMD (metadata sc) recv `is` Right (Just "cw0") send "sw0" `is` Right () recv `is` Right (Just "cw1")