mirror of
https://github.com/unclechu/gRPC-haskell.git
synced 2024-06-18 02:28:35 +02:00
d4a80a9a4e
35163c3c18
introduced a new use of `mask` which makes the server process uninterruptible
while waiting for a new incoming request. This change fixes that by
surrounding the logic that waits for a new request with `unmask`. This
new `unmask` should still respect the finalization guarantees of the
surrounding masked code.
144 lines
6.2 KiB
Haskell
144 lines
6.2 KiB
Haskell
{-# LANGUAGE LambdaCase #-}
|
|
{-# LANGUAGE RecordWildCards #-}
|
|
|
|
module Network.GRPC.LowLevel.Server.Unregistered where
|
|
|
|
import Control.Exception (bracket, finally, mask)
|
|
import Control.Monad
|
|
import Control.Monad.Trans.Except
|
|
import Data.ByteString (ByteString)
|
|
import Network.GRPC.LowLevel.Call.Unregistered
|
|
import Network.GRPC.LowLevel.CompletionQueue.Unregistered (serverRequestCall)
|
|
import Network.GRPC.LowLevel.GRPC
|
|
import Network.GRPC.LowLevel.Op
|
|
import Network.GRPC.LowLevel.Server (Server (..),
|
|
ServerRWHandlerLL,
|
|
ServerReaderHandlerLL,
|
|
ServerWriterHandlerLL,
|
|
forkServer,
|
|
serverReader',
|
|
serverWriter',
|
|
serverRW')
|
|
import qualified Network.GRPC.Unsafe.Op as C
|
|
|
|
serverCreateCall :: Server
|
|
-> IO (Either GRPCIOError ServerCall)
|
|
serverCreateCall Server{..} =
|
|
serverRequestCall unsafeServer serverCQ serverCallCQ
|
|
|
|
withServerCall :: Server
|
|
-> (ServerCall -> IO (Either GRPCIOError a))
|
|
-> IO (Either GRPCIOError a)
|
|
withServerCall s f =
|
|
bracket (serverCreateCall s) cleanup $ \case
|
|
Left e -> return (Left e)
|
|
Right c -> f c
|
|
where
|
|
cleanup (Left _) = pure ()
|
|
cleanup (Right c) = do
|
|
grpcDebug "withServerCall: destroying."
|
|
destroyServerCall c
|
|
|
|
-- | Gets a call and then forks the given function on a new thread, with the
|
|
-- new call as input. Blocks until a call is received, then returns immediately.
|
|
-- Handles cleaning up the call safely.
|
|
-- Because this function doesn't wait for the handler to return, it cannot
|
|
-- return errors.
|
|
withServerCallAsync :: Server
|
|
-> (ServerCall -> IO ())
|
|
-> IO ()
|
|
withServerCallAsync s f = mask $ \unmask ->
|
|
unmask (serverCreateCall s) >>= \case
|
|
Left e -> do grpcDebug $ "withServerCallAsync: call error: " ++ show e
|
|
return ()
|
|
Right c -> do wasForkSuccess <- forkServer s handler
|
|
unless wasForkSuccess destroy
|
|
where handler = unmask (f c) `finally` destroy
|
|
-- TODO: We sometimes never finish cleanup if the server
|
|
-- is shutting down and calls killThread. This causes gRPC
|
|
-- core to complain about leaks. I think the cause of
|
|
-- this is that killThread gets called after we are
|
|
-- already in destroyServerCall, and wrapping
|
|
-- uninterruptibleMask doesn't seem to help. Doesn't
|
|
-- crash, but does emit annoying log messages.
|
|
destroy = do
|
|
grpcDebug "withServerCallAsync: destroying."
|
|
destroyServerCall c
|
|
grpcDebug "withServerCallAsync: cleanup finished."
|
|
|
|
-- | A handler for an unregistered server call; bytestring arguments are the
|
|
-- request body and response body respectively.
|
|
type ServerHandler
|
|
= ServerCall
|
|
-> ByteString
|
|
-> IO (ByteString, MetadataMap, C.StatusCode, StatusDetails)
|
|
|
|
-- | Handle one unregistered call.
|
|
serverHandleNormalCall :: Server
|
|
-> MetadataMap -- ^ Initial server metadata.
|
|
-> ServerHandler
|
|
-> IO (Either GRPCIOError ())
|
|
serverHandleNormalCall s initMeta f =
|
|
withServerCall s $ \c -> serverHandleNormalCall' s c initMeta f
|
|
|
|
serverHandleNormalCall' :: Server
|
|
-> ServerCall
|
|
-> MetadataMap -- ^ Initial server metadata.
|
|
-> ServerHandler
|
|
-> IO (Either GRPCIOError ())
|
|
serverHandleNormalCall'
|
|
_ sc@ServerCall{ unsafeSC = c, callCQ = cq, .. } initMeta f = do
|
|
grpcDebug "serverHandleNormalCall(U): starting batch."
|
|
runOps c cq
|
|
[ OpSendInitialMetadata initMeta
|
|
, OpRecvMessage
|
|
]
|
|
>>= \case
|
|
Left x -> do
|
|
grpcDebug "serverHandleNormalCall(U): ops failed; aborting"
|
|
return $ Left x
|
|
Right [OpRecvMessageResult (Just body)] -> do
|
|
grpcDebug $ "got client metadata: " ++ show metadata
|
|
grpcDebug $ "call_details host is: " ++ show callHost
|
|
(rsp, trailMeta, st, ds) <- f sc body
|
|
-- TODO: We have to put 'OpRecvCloseOnServer' in the response ops,
|
|
-- or else the client times out. Given this, I have no idea how to
|
|
-- check for cancellation on the server.
|
|
runOps c cq
|
|
[ OpRecvCloseOnServer
|
|
, OpSendMessage rsp,
|
|
OpSendStatusFromServer trailMeta st ds
|
|
]
|
|
>>= \case
|
|
Left x -> do
|
|
grpcDebug "serverHandleNormalCall(U): resp failed."
|
|
return $ Left x
|
|
Right _ -> do
|
|
grpcDebug "serverHandleNormalCall(U): ops done."
|
|
return $ Right ()
|
|
x -> error $ "impossible pattern match: " ++ show x
|
|
|
|
serverReader :: Server
|
|
-> ServerCall
|
|
-> MetadataMap -- ^ Initial server metadata
|
|
-> ServerReaderHandlerLL
|
|
-> IO (Either GRPCIOError ())
|
|
serverReader s = serverReader' s . convertCall
|
|
|
|
serverWriter :: Server
|
|
-> ServerCall
|
|
-> MetadataMap -- ^ Initial server metadata
|
|
-> ServerWriterHandlerLL
|
|
-> IO (Either GRPCIOError ())
|
|
serverWriter s sc@ServerCall{ unsafeSC = c, callCQ = ccq } initMeta f =
|
|
runExceptT $ do
|
|
bs <- recvInitialMessage c ccq
|
|
ExceptT (serverWriter' s (const bs <$> convertCall sc) initMeta f)
|
|
|
|
serverRW :: Server
|
|
-> ServerCall
|
|
-> MetadataMap -- ^ Initial server metadata
|
|
-> ServerRWHandlerLL
|
|
-> IO (Either GRPCIOError ())
|
|
serverRW s = serverRW' s . convertCall
|