gRPC-haskell/core/src/Network/GRPC/LowLevel/Server/Unregistered.hs
Gabriel Gonzalez d532cec4d1
Make server process killable (#105)
35163c3 introduced a new use of `mask` which makes the server
process uninterruptible while waiting for a new incoming request.
This change fixes that by surrounding the logic that waits for a
new request with `unmask`.  This new `unmask` should still
respect the finalization guarantees of the surrounding masked
code.
2020-05-28 13:56:03 -04:00

144 lines
6.2 KiB
Haskell

{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}
module Network.GRPC.LowLevel.Server.Unregistered where
import Control.Exception (bracket, finally, mask)
import Control.Monad
import Control.Monad.Trans.Except
import Data.ByteString (ByteString)
import Network.GRPC.LowLevel.Call.Unregistered
import Network.GRPC.LowLevel.CompletionQueue.Unregistered (serverRequestCall)
import Network.GRPC.LowLevel.GRPC
import Network.GRPC.LowLevel.Op
import Network.GRPC.LowLevel.Server (Server (..),
ServerRWHandlerLL,
ServerReaderHandlerLL,
ServerWriterHandlerLL,
forkServer,
serverReader',
serverWriter',
serverRW')
import qualified Network.GRPC.Unsafe.Op as C
serverCreateCall :: Server
-> IO (Either GRPCIOError ServerCall)
serverCreateCall Server{..} =
serverRequestCall unsafeServer serverCQ serverCallCQ
withServerCall :: Server
-> (ServerCall -> IO (Either GRPCIOError a))
-> IO (Either GRPCIOError a)
withServerCall s f =
bracket (serverCreateCall s) cleanup $ \case
Left e -> return (Left e)
Right c -> f c
where
cleanup (Left _) = pure ()
cleanup (Right c) = do
grpcDebug "withServerCall: destroying."
destroyServerCall c
-- | Gets a call and then forks the given function on a new thread, with the
-- new call as input. Blocks until a call is received, then returns immediately.
-- Handles cleaning up the call safely.
-- Because this function doesn't wait for the handler to return, it cannot
-- return errors.
withServerCallAsync :: Server
-> (ServerCall -> IO ())
-> IO ()
withServerCallAsync s f = mask $ \unmask ->
unmask (serverCreateCall s) >>= \case
Left e -> do grpcDebug $ "withServerCallAsync: call error: " ++ show e
return ()
Right c -> do wasForkSuccess <- forkServer s handler
unless wasForkSuccess destroy
where handler = unmask (f c) `finally` destroy
-- TODO: We sometimes never finish cleanup if the server
-- is shutting down and calls killThread. This causes gRPC
-- core to complain about leaks. I think the cause of
-- this is that killThread gets called after we are
-- already in destroyServerCall, and wrapping
-- uninterruptibleMask doesn't seem to help. Doesn't
-- crash, but does emit annoying log messages.
destroy = do
grpcDebug "withServerCallAsync: destroying."
destroyServerCall c
grpcDebug "withServerCallAsync: cleanup finished."
-- | A handler for an unregistered server call; bytestring arguments are the
-- request body and response body respectively.
type ServerHandler
= ServerCall
-> ByteString
-> IO (ByteString, MetadataMap, C.StatusCode, StatusDetails)
-- | Handle one unregistered call.
serverHandleNormalCall :: Server
-> MetadataMap -- ^ Initial server metadata.
-> ServerHandler
-> IO (Either GRPCIOError ())
serverHandleNormalCall s initMeta f =
withServerCall s $ \c -> serverHandleNormalCall' s c initMeta f
serverHandleNormalCall' :: Server
-> ServerCall
-> MetadataMap -- ^ Initial server metadata.
-> ServerHandler
-> IO (Either GRPCIOError ())
serverHandleNormalCall'
_ sc@ServerCall{ unsafeSC = c, callCQ = cq, .. } initMeta f = do
grpcDebug "serverHandleNormalCall(U): starting batch."
runOps c cq
[ OpSendInitialMetadata initMeta
, OpRecvMessage
]
>>= \case
Left x -> do
grpcDebug "serverHandleNormalCall(U): ops failed; aborting"
return $ Left x
Right [OpRecvMessageResult (Just body)] -> do
grpcDebug $ "got client metadata: " ++ show metadata
grpcDebug $ "call_details host is: " ++ show callHost
(rsp, trailMeta, st, ds) <- f sc body
-- TODO: We have to put 'OpRecvCloseOnServer' in the response ops,
-- or else the client times out. Given this, I have no idea how to
-- check for cancellation on the server.
runOps c cq
[ OpRecvCloseOnServer
, OpSendMessage rsp,
OpSendStatusFromServer trailMeta st ds
]
>>= \case
Left x -> do
grpcDebug "serverHandleNormalCall(U): resp failed."
return $ Left x
Right _ -> do
grpcDebug "serverHandleNormalCall(U): ops done."
return $ Right ()
x -> error $ "impossible pattern match: " ++ show x
serverReader :: Server
-> ServerCall
-> MetadataMap -- ^ Initial server metadata
-> ServerReaderHandlerLL
-> IO (Either GRPCIOError ())
serverReader s = serverReader' s . convertCall
serverWriter :: Server
-> ServerCall
-> MetadataMap -- ^ Initial server metadata
-> ServerWriterHandlerLL
-> IO (Either GRPCIOError ())
serverWriter s sc@ServerCall{ unsafeSC = c, callCQ = ccq } initMeta f =
runExceptT $ do
bs <- recvInitialMessage c ccq
ExceptT (serverWriter' s (const bs <$> convertCall sc) initMeta f)
serverRW :: Server
-> ServerCall
-> MetadataMap -- ^ Initial server metadata
-> ServerRWHandlerLL
-> IO (Either GRPCIOError ())
serverRW s = serverRW' s . convertCall