mirror of
https://github.com/unclechu/gRPC-haskell.git
synced 2024-11-26 21:19:43 +01:00
fix up handler type, tweak ServerCall record names (#42)
* fix up handler type, tweak ServerCall record names * remove ' from handler types, use LL suffix for low-level handlers * fix all build warnings
This commit is contained in:
parent
e4a28e9e4b
commit
99e6f0652d
14 changed files with 108 additions and 146 deletions
|
@ -13,7 +13,6 @@ import Data.Word
|
|||
import GHC.Generics (Generic)
|
||||
import Network.GRPC.LowLevel
|
||||
import qualified Network.GRPC.LowLevel.Client.Unregistered as U
|
||||
import Proto3.Wire.Decode (ParseError)
|
||||
|
||||
echoMethod = MethodName "/echo.Echo/DoEcho"
|
||||
addMethod = MethodName "/echo.Add/DoAdd"
|
||||
|
|
|
@ -47,8 +47,7 @@ regMain = withGRPC $ \grpc -> do
|
|||
forever $ do
|
||||
let method = head (normalMethods server)
|
||||
result <- serverHandleNormalCall server method serverMeta $
|
||||
\_call reqBody _reqMeta -> return (reqBody, serverMeta, StatusOk,
|
||||
StatusDetails "")
|
||||
\call -> return (payload call, serverMeta, StatusOk, StatusDetails "")
|
||||
case result of
|
||||
Left x -> putStrLn $ "registered call result error: " ++ show x
|
||||
Right _ -> return ()
|
||||
|
@ -61,8 +60,8 @@ regLoop :: Server -> RegisteredMethod 'Normal -> IO ()
|
|||
regLoop server method = forever $ do
|
||||
-- tputStrLn "about to block on call handler"
|
||||
result <- serverHandleNormalCall server method serverMeta $
|
||||
\_call reqBody _reqMeta ->
|
||||
return (reqBody, serverMeta, StatusOk, StatusDetails "")
|
||||
\call ->
|
||||
return (payload call, serverMeta, StatusOk, StatusDetails "")
|
||||
case result of
|
||||
Left x -> error $! "registered call result error: " ++ show x
|
||||
Right _ -> return ()
|
||||
|
@ -75,7 +74,7 @@ regMainThreaded = do
|
|||
let method = head (normalMethods server)
|
||||
tids <- replicateM 7 $ async $ do tputStrLn "starting handler"
|
||||
regLoop server method
|
||||
waitAnyCancel tids
|
||||
_ <- waitAnyCancel tids
|
||||
tputStrLn "finishing"
|
||||
|
||||
-- NB: If you change these, make sure to change them in the client as well.
|
||||
|
@ -86,9 +85,9 @@ instance Message EchoRequest
|
|||
echoHandler :: Handler 'Normal
|
||||
echoHandler =
|
||||
UnaryHandler "/echo.Echo/DoEcho" $
|
||||
\_c body m -> do
|
||||
return ( body :: EchoRequest
|
||||
, m
|
||||
\call -> do
|
||||
return ( payload call :: EchoRequest
|
||||
, metadata call
|
||||
, StatusOk
|
||||
, StatusDetails ""
|
||||
)
|
||||
|
@ -104,12 +103,13 @@ instance Message AddResponse
|
|||
addHandler :: Handler 'Normal
|
||||
addHandler =
|
||||
UnaryHandler "/echo.Add/DoAdd" $
|
||||
\_c b m -> do
|
||||
\c -> do
|
||||
--tputStrLn $ "UnaryHandler for DoAdd hit, b=" ++ show b
|
||||
let b = payload c
|
||||
print (addX b)
|
||||
print (addY b)
|
||||
return ( AddResponse $ addX b + addY b
|
||||
, m
|
||||
, metadata c
|
||||
, StatusOk
|
||||
, StatusDetails ""
|
||||
)
|
||||
|
|
|
@ -15,45 +15,41 @@ import qualified Data.ByteString.Lazy as BL
|
|||
import Data.Protobuf.Wire.Class
|
||||
import Network.GRPC.LowLevel
|
||||
import qualified Network.GRPC.LowLevel.Call.Unregistered as U
|
||||
import Network.GRPC.LowLevel.GRPC
|
||||
import qualified Network.GRPC.LowLevel.Server.Unregistered as U
|
||||
|
||||
type ServerHandler' a b =
|
||||
forall c .
|
||||
ServerCall c
|
||||
-> a
|
||||
-> MetadataMap
|
||||
type ServerHandler a b =
|
||||
ServerCall a
|
||||
-> IO (b, MetadataMap, StatusCode, StatusDetails)
|
||||
|
||||
convertServerHandler :: (Message a, Message b)
|
||||
=> ServerHandler' a b
|
||||
-> ServerHandler
|
||||
convertServerHandler f c bs m = case fromByteString bs of
|
||||
=> ServerHandler a b
|
||||
-> ServerHandlerLL
|
||||
convertServerHandler f c = case fromByteString (payload c) of
|
||||
Left x -> error $ "Failed to deserialize message: " ++ show x
|
||||
Right x -> do (y, tm, sc, sd) <- f c x m
|
||||
Right x -> do (y, tm, sc, sd) <- f (fmap (const x) c)
|
||||
return (toBS y, tm, sc, sd)
|
||||
|
||||
type ServerReaderHandler' a b =
|
||||
type ServerReaderHandler a b =
|
||||
ServerCall ()
|
||||
-> StreamRecv a
|
||||
-> Streaming (Maybe b, MetadataMap, StatusCode, StatusDetails)
|
||||
|
||||
convertServerReaderHandler :: (Message a, Message b)
|
||||
=> ServerReaderHandler' a b
|
||||
-> ServerReaderHandler
|
||||
=> ServerReaderHandler a b
|
||||
-> ServerReaderHandlerLL
|
||||
convertServerReaderHandler f c recv =
|
||||
serialize <$> f c (convertRecv recv)
|
||||
where
|
||||
serialize (mmsg, m, sc, sd) = (toBS <$> mmsg, m, sc, sd)
|
||||
|
||||
type ServerWriterHandler' a b =
|
||||
type ServerWriterHandler a b =
|
||||
ServerCall a
|
||||
-> StreamSend b
|
||||
-> Streaming (MetadataMap, StatusCode, StatusDetails)
|
||||
|
||||
convertServerWriterHandler :: (Message a, Message b) =>
|
||||
ServerWriterHandler' a b
|
||||
-> ServerWriterHandler
|
||||
ServerWriterHandler a b
|
||||
-> ServerWriterHandlerLL
|
||||
convertServerWriterHandler f c send =
|
||||
f (convert <$> c) (convertSend send)
|
||||
where
|
||||
|
@ -61,15 +57,15 @@ convertServerWriterHandler f c send =
|
|||
Left x -> error $ "deserialization error: " ++ show x -- TODO FIXME
|
||||
Right x -> x
|
||||
|
||||
type ServerRWHandler' a b =
|
||||
type ServerRWHandler a b =
|
||||
ServerCall ()
|
||||
-> StreamRecv a
|
||||
-> StreamSend b
|
||||
-> Streaming (MetadataMap, StatusCode, StatusDetails)
|
||||
|
||||
convertServerRWHandler :: (Message a, Message b)
|
||||
=> ServerRWHandler' a b
|
||||
-> ServerRWHandler
|
||||
=> ServerRWHandler a b
|
||||
-> ServerRWHandlerLL
|
||||
convertServerRWHandler f c recv send =
|
||||
f c (convertRecv recv) (convertSend send)
|
||||
|
||||
|
@ -93,25 +89,25 @@ data Handler (a :: GRPCMethodType) where
|
|||
UnaryHandler
|
||||
:: (Message c, Message d)
|
||||
=> MethodName
|
||||
-> ServerHandler' c d
|
||||
-> ServerHandler c d
|
||||
-> Handler 'Normal
|
||||
|
||||
ClientStreamHandler
|
||||
:: (Message c, Message d)
|
||||
=> MethodName
|
||||
-> ServerReaderHandler' c d
|
||||
-> ServerReaderHandler c d
|
||||
-> Handler 'ClientStreaming
|
||||
|
||||
ServerStreamHandler
|
||||
:: (Message c, Message d)
|
||||
=> MethodName
|
||||
-> ServerWriterHandler' c d
|
||||
-> ServerWriterHandler c d
|
||||
-> Handler 'ServerStreaming
|
||||
|
||||
BiDiStreamHandler
|
||||
:: (Message c, Message d)
|
||||
=> MethodName
|
||||
-> ServerRWHandler' c d
|
||||
-> ServerRWHandler c d
|
||||
-> Handler 'BiDiStreaming
|
||||
|
||||
data AnyHandler = forall (a :: GRPCMethodType) . AnyHandler (Handler a)
|
||||
|
|
|
@ -6,16 +6,11 @@
|
|||
|
||||
module Network.GRPC.HighLevel.Server.Unregistered where
|
||||
|
||||
import Control.Applicative ((<|>))
|
||||
import Control.Concurrent.Async
|
||||
import Control.Monad
|
||||
import Data.ByteString (ByteString)
|
||||
import Data.Protobuf.Wire.Class
|
||||
import Data.Foldable (find)
|
||||
import Network.GRPC.HighLevel.Server
|
||||
import Network.GRPC.LowLevel
|
||||
import Network.GRPC.LowLevel.GRPC
|
||||
import Network.GRPC.LowLevel.Call
|
||||
import qualified Network.GRPC.LowLevel.Server.Unregistered as U
|
||||
import qualified Network.GRPC.LowLevel.Call.Unregistered as U
|
||||
|
||||
|
@ -48,48 +43,45 @@ dispatchLoop server hN hC hS hB =
|
|||
handleError f = f >>= handleCallError
|
||||
unaryHandler :: (Message a, Message b) =>
|
||||
U.ServerCall
|
||||
-> ServerHandler' a b
|
||||
-> ServerHandler a b
|
||||
-> IO ()
|
||||
unaryHandler call h =
|
||||
handleError $
|
||||
U.serverHandleNormalCall' server call mempty $ \call' bs -> do
|
||||
let h' = convertServerHandler h
|
||||
h' (fmap (const bs) $ U.convertCall call)
|
||||
bs
|
||||
(U.requestMetadataRecv call)
|
||||
U.serverHandleNormalCall' server call mempty $ \_call' bs ->
|
||||
convertServerHandler h (fmap (const bs) $ U.convertCall call)
|
||||
csHandler :: (Message a, Message b) =>
|
||||
U.ServerCall
|
||||
-> ServerReaderHandler' a b
|
||||
-> ServerReaderHandler a b
|
||||
-> IO ()
|
||||
csHandler call h =
|
||||
handleError $
|
||||
U.serverReader server call mempty (convertServerReaderHandler h)
|
||||
ssHandler :: (Message a, Message b) =>
|
||||
U.ServerCall
|
||||
-> ServerWriterHandler' a b
|
||||
-> ServerWriterHandler a b
|
||||
-> IO ()
|
||||
ssHandler call h =
|
||||
handleError $
|
||||
U.serverWriter server call mempty (convertServerWriterHandler h)
|
||||
bdHandler :: (Message a, Message b) =>
|
||||
U.ServerCall
|
||||
-> ServerRWHandler' a b
|
||||
-> ServerRWHandler a b
|
||||
-> IO ()
|
||||
bdHandler call h =
|
||||
handleError $
|
||||
U.serverRW server call mempty (convertServerRWHandler h)
|
||||
|
||||
serverLoop :: ServerOptions -> IO ()
|
||||
serverLoop opts@ServerOptions{..} =
|
||||
serverLoop ServerOptions{..} =
|
||||
withGRPC $ \grpc ->
|
||||
withServer grpc (mkConfig opts) $ \server -> do
|
||||
withServer grpc config $ \server -> do
|
||||
dispatchLoop server
|
||||
optNormalHandlers
|
||||
optClientStreamHandlers
|
||||
optServerStreamHandlers
|
||||
optBiDiStreamHandlers
|
||||
where
|
||||
mkConfig ServerOptions{..} =
|
||||
config =
|
||||
ServerConfig
|
||||
{ host = "localhost"
|
||||
, port = optServerPort
|
||||
|
|
|
@ -35,19 +35,19 @@ GRPC
|
|||
, ServerConfig(..)
|
||||
, Server(normalMethods, sstreamingMethods, cstreamingMethods,
|
||||
bidiStreamingMethods)
|
||||
, ServerCall(optionalPayload, requestMetadataRecv)
|
||||
, ServerCall(payload, metadata)
|
||||
, withServer
|
||||
, serverHandleNormalCall
|
||||
, ServerHandler
|
||||
, ServerHandlerLL
|
||||
, withServerCall
|
||||
, serverCallCancel
|
||||
, serverCallIsExpired
|
||||
, serverReader -- for client streaming
|
||||
, ServerReaderHandler
|
||||
, ServerReaderHandlerLL
|
||||
, serverWriter -- for server streaming
|
||||
, ServerWriterHandler
|
||||
, ServerWriterHandlerLL
|
||||
, serverRW -- for bidirectional streaming
|
||||
, ServerRWHandler
|
||||
, ServerRWHandlerLL
|
||||
|
||||
-- * Client
|
||||
, ClientConfig(..)
|
||||
|
|
|
@ -48,13 +48,15 @@ type family MethodPayload a where
|
|||
--TODO: try replacing this class with a plain old function so we don't have the
|
||||
-- Payloadable constraint everywhere.
|
||||
|
||||
payload :: RegisteredMethod mt -> Ptr C.ByteBuffer -> IO (MethodPayload mt)
|
||||
payload (RegisteredMethodNormal _ _ _) p =
|
||||
extractPayload :: RegisteredMethod mt
|
||||
-> Ptr C.ByteBuffer
|
||||
-> IO (MethodPayload mt)
|
||||
extractPayload (RegisteredMethodNormal _ _ _) p =
|
||||
peek p >>= C.copyByteBufferToByteString
|
||||
payload (RegisteredMethodClientStreaming _ _ _) _ = return ()
|
||||
payload (RegisteredMethodServerStreaming _ _ _) p =
|
||||
extractPayload (RegisteredMethodClientStreaming _ _ _) _ = return ()
|
||||
extractPayload (RegisteredMethodServerStreaming _ _ _) p =
|
||||
peek p >>= C.copyByteBufferToByteString
|
||||
payload (RegisteredMethodBiDiStreaming _ _ _) _ = return ()
|
||||
extractPayload (RegisteredMethodBiDiStreaming _ _ _) _ = return ()
|
||||
|
||||
newtype MethodName = MethodName {unMethodName :: String}
|
||||
deriving (Show, Eq, IsString)
|
||||
|
@ -147,8 +149,8 @@ clientCallCancel cc = C.grpcCallCancel (unsafeCC cc) C.reserved
|
|||
data ServerCall a = ServerCall
|
||||
{ unsafeSC :: C.Call
|
||||
, callCQ :: CompletionQueue
|
||||
, requestMetadataRecv :: MetadataMap
|
||||
, optionalPayload :: a
|
||||
, metadata :: MetadataMap
|
||||
, payload :: a
|
||||
, callDeadline :: TimeSpec
|
||||
} deriving (Functor, Show)
|
||||
|
||||
|
@ -194,7 +196,7 @@ debugServerCall sc@(ServerCall (C.Call ptr) _ _ _ _) = do
|
|||
let dbug = grpcDebug . ("debugServerCall(R): " ++)
|
||||
dbug $ "server call: " ++ show ptr
|
||||
dbug $ "callCQ: " ++ show (callCQ sc)
|
||||
dbug $ "metadata ptr: " ++ show (requestMetadataRecv sc)
|
||||
dbug $ "metadata: " ++ show (metadata sc)
|
||||
dbug $ "deadline ptr: " ++ show (callDeadline sc)
|
||||
#else
|
||||
{-# INLINE debugServerCall #-}
|
||||
|
|
|
@ -2,15 +2,8 @@
|
|||
|
||||
module Network.GRPC.LowLevel.Call.Unregistered where
|
||||
|
||||
import Control.Monad
|
||||
import Foreign.Marshal.Alloc (free)
|
||||
import Foreign.Ptr (Ptr)
|
||||
#ifdef DEBUG
|
||||
import Foreign.Storable (peek)
|
||||
#endif
|
||||
import qualified Network.GRPC.LowLevel.Call as Reg
|
||||
import Network.GRPC.LowLevel.CompletionQueue
|
||||
import Network.GRPC.LowLevel.CompletionQueue.Internal
|
||||
import Network.GRPC.LowLevel.GRPC (MetadataMap,
|
||||
grpcDebug)
|
||||
import qualified Network.GRPC.Unsafe as C
|
||||
|
@ -22,7 +15,7 @@ import System.Clock (TimeSpec)
|
|||
data ServerCall = ServerCall
|
||||
{ unsafeSC :: C.Call
|
||||
, callCQ :: CompletionQueue
|
||||
, requestMetadataRecv :: MetadataMap
|
||||
, metadata :: MetadataMap
|
||||
, callDeadline :: TimeSpec
|
||||
, callMethod :: Reg.MethodName
|
||||
, callHost :: Reg.Host
|
||||
|
@ -30,7 +23,7 @@ data ServerCall = ServerCall
|
|||
|
||||
convertCall :: ServerCall -> Reg.ServerCall ()
|
||||
convertCall ServerCall{..} =
|
||||
Reg.ServerCall unsafeSC callCQ requestMetadataRecv () callDeadline
|
||||
Reg.ServerCall unsafeSC callCQ metadata () callDeadline
|
||||
|
||||
serverCallCancel :: ServerCall -> C.StatusCode -> String -> IO ()
|
||||
serverCallCancel sc code reason =
|
||||
|
@ -43,7 +36,7 @@ debugServerCall ServerCall{..} = do
|
|||
dbug = grpcDebug . ("debugServerCall(U): " ++)
|
||||
|
||||
dbug $ "server call: " ++ show ptr
|
||||
dbug $ "metadata: " ++ show requestMetadataRecv
|
||||
dbug $ "metadata: " ++ show metadata
|
||||
|
||||
dbug $ "deadline: " ++ show callDeadline
|
||||
dbug $ "method: " ++ show callMethod
|
||||
|
|
|
@ -35,25 +35,19 @@ module Network.GRPC.LowLevel.CompletionQueue
|
|||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent.STM (atomically,
|
||||
check)
|
||||
import Control.Concurrent.STM.TVar (newTVarIO,
|
||||
readTVar,
|
||||
writeTVar)
|
||||
import Control.Concurrent.STM.TVar (newTVarIO)
|
||||
import Control.Exception (bracket)
|
||||
import Control.Monad.Managed
|
||||
import Control.Monad.Trans.Class (MonadTrans (lift))
|
||||
import Control.Monad.Trans.Except
|
||||
import Data.IORef (newIORef)
|
||||
import Data.List (intersperse)
|
||||
import Foreign.Marshal.Alloc (free, malloc)
|
||||
import Foreign.Ptr (Ptr, nullPtr)
|
||||
import Foreign.Storable (Storable, peek)
|
||||
import Foreign.Ptr (nullPtr)
|
||||
import Foreign.Storable (peek)
|
||||
import Network.GRPC.LowLevel.Call
|
||||
import Network.GRPC.LowLevel.CompletionQueue.Internal
|
||||
import Network.GRPC.LowLevel.GRPC
|
||||
import qualified Network.GRPC.Unsafe as C
|
||||
import qualified Network.GRPC.Unsafe.ByteBuffer as C
|
||||
import qualified Network.GRPC.Unsafe.Constants as C
|
||||
import qualified Network.GRPC.Unsafe.Metadata as C
|
||||
import qualified Network.GRPC.Unsafe.Op as C
|
||||
|
@ -136,7 +130,7 @@ serverRequestCall rm s scq ccq =
|
|||
<$> peek call
|
||||
<*> return ccq
|
||||
<*> C.getAllMetadataArray md
|
||||
<*> payload rm pay
|
||||
<*> extractPayload rm pay
|
||||
<*> convertDeadline dead
|
||||
_ -> do
|
||||
lift $ dbug $ "Throwing callError: " ++ show ce
|
||||
|
|
|
@ -8,13 +8,10 @@
|
|||
|
||||
module Network.GRPC.LowLevel.CompletionQueue.Unregistered where
|
||||
|
||||
import Control.Exception (bracket)
|
||||
import Control.Monad.Managed
|
||||
import Control.Monad.Trans.Class (MonadTrans (lift))
|
||||
import Control.Monad.Trans.Except
|
||||
import Foreign.Marshal.Alloc (free, malloc)
|
||||
import Foreign.Ptr (Ptr)
|
||||
import Foreign.Storable (Storable, peek)
|
||||
import Foreign.Storable (peek)
|
||||
import Network.GRPC.LowLevel.Call
|
||||
import qualified Network.GRPC.LowLevel.Call.Unregistered as U
|
||||
import Network.GRPC.LowLevel.CompletionQueue.Internal
|
||||
|
|
|
@ -13,10 +13,6 @@ import qualified Network.GRPC.Unsafe as C
|
|||
import qualified Network.GRPC.Unsafe.Op as C
|
||||
import Proto3.Wire.Decode (ParseError)
|
||||
|
||||
#ifdef DEBUG
|
||||
import GHC.Conc (myThreadId)
|
||||
#endif
|
||||
|
||||
type MetadataMap = M.Map B.ByteString B.ByteString
|
||||
|
||||
newtype StatusDetails = StatusDetails B.ByteString
|
||||
|
|
|
@ -6,7 +6,6 @@
|
|||
|
||||
module Network.GRPC.LowLevel.Op where
|
||||
|
||||
import Control.Arrow
|
||||
import Control.Exception
|
||||
import Control.Monad
|
||||
import Control.Monad.Trans.Class (MonadTrans(lift))
|
||||
|
@ -28,7 +27,6 @@ import qualified Network.GRPC.Unsafe.ByteBuffer as C
|
|||
import qualified Network.GRPC.Unsafe.Metadata as C
|
||||
import qualified Network.GRPC.Unsafe.Op as C
|
||||
import qualified Network.GRPC.Unsafe.Slice as C (Slice, freeSlice)
|
||||
import Pipes ((>->))
|
||||
import qualified Pipes as P
|
||||
import qualified Pipes.Core as P
|
||||
|
||||
|
|
|
@ -25,8 +25,7 @@ import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue,
|
|||
serverRegisterCompletionQueue,
|
||||
serverRequestCall,
|
||||
serverShutdownAndNotify,
|
||||
shutdownCompletionQueue,
|
||||
withCompletionQueue)
|
||||
shutdownCompletionQueue)
|
||||
import Network.GRPC.LowLevel.GRPC
|
||||
import Network.GRPC.LowLevel.Op
|
||||
import qualified Network.GRPC.Unsafe as C
|
||||
|
@ -249,7 +248,7 @@ withServerCall s rm f =
|
|||
--------------------------------------------------------------------------------
|
||||
-- serverReader (server side of client streaming mode)
|
||||
|
||||
type ServerReaderHandler
|
||||
type ServerReaderHandlerLL
|
||||
= ServerCall ()
|
||||
-> StreamRecv ByteString
|
||||
-> Streaming (Maybe ByteString, MetadataMap, C.StatusCode, StatusDetails)
|
||||
|
@ -257,7 +256,7 @@ type ServerReaderHandler
|
|||
serverReader :: Server
|
||||
-> RegisteredMethod 'ClientStreaming
|
||||
-> MetadataMap -- ^ initial server metadata
|
||||
-> ServerReaderHandler
|
||||
-> ServerReaderHandlerLL
|
||||
-> IO (Either GRPCIOError ())
|
||||
serverReader s rm initMeta f = withServerCall s rm go
|
||||
where
|
||||
|
@ -273,7 +272,7 @@ serverReader s rm initMeta f = withServerCall s rm go
|
|||
--------------------------------------------------------------------------------
|
||||
-- serverWriter (server side of server streaming mode)
|
||||
|
||||
type ServerWriterHandler
|
||||
type ServerWriterHandlerLL
|
||||
= ServerCall ByteString
|
||||
-> StreamSend ByteString
|
||||
-> Streaming (MetadataMap, C.StatusCode, StatusDetails)
|
||||
|
@ -283,7 +282,7 @@ serverWriter :: Server
|
|||
-> RegisteredMethod 'ServerStreaming
|
||||
-> MetadataMap
|
||||
-- ^ Initial server metadata
|
||||
-> ServerWriterHandler
|
||||
-> ServerWriterHandlerLL
|
||||
-> IO (Either GRPCIOError ())
|
||||
serverWriter s rm initMeta f = withServerCall s rm go
|
||||
where
|
||||
|
@ -295,7 +294,7 @@ serverWriter s rm initMeta f = withServerCall s rm go
|
|||
--------------------------------------------------------------------------------
|
||||
-- serverRW (server side of bidirectional streaming mode)
|
||||
|
||||
type ServerRWHandler
|
||||
type ServerRWHandlerLL
|
||||
= ServerCall ()
|
||||
-> StreamRecv ByteString
|
||||
-> StreamSend ByteString
|
||||
|
@ -305,7 +304,7 @@ serverRW :: Server
|
|||
-> RegisteredMethod 'BiDiStreaming
|
||||
-> MetadataMap
|
||||
-- ^ initial server metadata
|
||||
-> ServerRWHandler
|
||||
-> ServerRWHandlerLL
|
||||
-> IO (Either GRPCIOError ())
|
||||
serverRW s rm initMeta f = withServerCall s rm go
|
||||
where
|
||||
|
@ -323,10 +322,8 @@ serverRW s rm initMeta f = withServerCall s rm go
|
|||
-- values in the result tuple being the initial and trailing metadata
|
||||
-- respectively. We pass in the 'ServerCall' so that the server can call
|
||||
-- 'serverCallCancel' on it if needed.
|
||||
type ServerHandler
|
||||
type ServerHandlerLL
|
||||
= ServerCall ByteString
|
||||
-> ByteString
|
||||
-> MetadataMap
|
||||
-> IO (ByteString, MetadataMap, C.StatusCode, StatusDetails)
|
||||
|
||||
-- | Wait for and then handle a normal (non-streaming) call.
|
||||
|
@ -334,13 +331,13 @@ serverHandleNormalCall :: Server
|
|||
-> RegisteredMethod 'Normal
|
||||
-> MetadataMap
|
||||
-- ^ Initial server metadata
|
||||
-> ServerHandler
|
||||
-> ServerHandlerLL
|
||||
-> IO (Either GRPCIOError ())
|
||||
serverHandleNormalCall s rm initMeta f =
|
||||
withServerCall s rm go
|
||||
where
|
||||
go sc@ServerCall{..} = do
|
||||
(rsp, trailMeta, st, ds) <- f sc optionalPayload requestMetadataRecv
|
||||
(rsp, trailMeta, st, ds) <- f sc
|
||||
void <$> runOps unsafeSC callCQ
|
||||
[ OpSendInitialMetadata initMeta
|
||||
, OpRecvCloseOnServer
|
||||
|
|
|
@ -9,9 +9,7 @@ import Control.Monad
|
|||
import Control.Monad.Trans.Except
|
||||
import Data.ByteString (ByteString)
|
||||
import Network.GRPC.LowLevel.Call.Unregistered
|
||||
import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue
|
||||
, withCompletionQueue
|
||||
, createCompletionQueue)
|
||||
import Network.GRPC.LowLevel.CompletionQueue (createCompletionQueue)
|
||||
import Network.GRPC.LowLevel.CompletionQueue.Unregistered (serverRequestCall)
|
||||
import Network.GRPC.LowLevel.GRPC
|
||||
import Network.GRPC.LowLevel.Op (Op (..)
|
||||
|
@ -25,9 +23,9 @@ import Network.GRPC.LowLevel.Op (Op (..)
|
|||
, sendStatusFromServer
|
||||
, recvInitialMessage)
|
||||
import Network.GRPC.LowLevel.Server (Server (..)
|
||||
, ServerReaderHandler
|
||||
, ServerWriterHandler
|
||||
, ServerRWHandler)
|
||||
, ServerReaderHandlerLL
|
||||
, ServerWriterHandlerLL
|
||||
, ServerRWHandlerLL)
|
||||
import qualified Network.GRPC.Unsafe.Op as C
|
||||
|
||||
serverCreateCall :: Server
|
||||
|
@ -56,7 +54,7 @@ withServerCallAsync :: Server
|
|||
-> IO ()
|
||||
withServerCallAsync s f =
|
||||
serverCreateCall s >>= \case
|
||||
Left e -> return ()
|
||||
Left _ -> return ()
|
||||
Right c -> void $ forkIO (f c `finally` do
|
||||
grpcDebug "withServerCallAsync: destroying."
|
||||
destroyServerCall c)
|
||||
|
@ -102,7 +100,7 @@ serverHandleNormalCall' :: Server
|
|||
-> ServerHandler
|
||||
-> IO (Either GRPCIOError ())
|
||||
serverHandleNormalCall'
|
||||
s sc@ServerCall{ unsafeSC = c, callCQ = cq, .. } initMeta f = do
|
||||
_ sc@ServerCall{ unsafeSC = c, callCQ = cq, .. } initMeta f = do
|
||||
grpcDebug "serverHandleNormalCall(U): starting batch."
|
||||
runOps c cq
|
||||
[ OpSendInitialMetadata initMeta
|
||||
|
@ -113,7 +111,7 @@ serverHandleNormalCall'
|
|||
grpcDebug "serverHandleNormalCall(U): ops failed; aborting"
|
||||
return $ Left x
|
||||
Right [OpRecvMessageResult (Just body)] -> do
|
||||
grpcDebug $ "got client metadata: " ++ show requestMetadataRecv
|
||||
grpcDebug $ "got client metadata: " ++ show metadata
|
||||
grpcDebug $ "call_details host is: " ++ show callHost
|
||||
(rsp, trailMeta, st, ds) <- f sc body
|
||||
runOps c cq
|
||||
|
@ -133,9 +131,9 @@ serverHandleNormalCall'
|
|||
serverReader :: Server
|
||||
-> ServerCall
|
||||
-> MetadataMap -- ^ initial server metadata
|
||||
-> ServerReaderHandler
|
||||
-> ServerReaderHandlerLL
|
||||
-> IO (Either GRPCIOError ())
|
||||
serverReader s sc@ServerCall{ unsafeSC = c, callCQ = ccq } initMeta f =
|
||||
serverReader _ sc@ServerCall{ unsafeSC = c, callCQ = ccq } initMeta f =
|
||||
runExceptT $ do
|
||||
(mmsg, trailMeta, st, ds) <-
|
||||
runStreamingProxy "serverReader" c ccq (f (convertCall sc) streamRecv)
|
||||
|
@ -149,9 +147,9 @@ serverWriter :: Server
|
|||
-> ServerCall
|
||||
-> MetadataMap
|
||||
-- ^ Initial server metadata
|
||||
-> ServerWriterHandler
|
||||
-> ServerWriterHandlerLL
|
||||
-> IO (Either GRPCIOError ())
|
||||
serverWriter s sc@ServerCall{ unsafeSC = c, callCQ = ccq } initMeta f =
|
||||
serverWriter _ sc@ServerCall{ unsafeSC = c, callCQ = ccq } initMeta f =
|
||||
runExceptT $ do
|
||||
bs <- recvInitialMessage c ccq
|
||||
sendInitialMetadata c ccq initMeta
|
||||
|
@ -163,9 +161,9 @@ serverRW :: Server
|
|||
-> ServerCall
|
||||
-> MetadataMap
|
||||
-- ^ initial server metadata
|
||||
-> ServerRWHandler
|
||||
-> ServerRWHandlerLL
|
||||
-> IO (Either GRPCIOError ())
|
||||
serverRW s sc@ServerCall{ unsafeSC = c, callCQ = ccq } initMeta f =
|
||||
serverRW _ sc@ServerCall{ unsafeSC = c, callCQ = ccq } initMeta f =
|
||||
runExceptT $ do
|
||||
sendInitialMetadata c ccq initMeta
|
||||
let regCall = convertCall sc
|
||||
|
|
|
@ -105,8 +105,8 @@ testMixRegisteredUnregistered =
|
|||
return ()
|
||||
where regThread = do
|
||||
let rm = head (normalMethods s)
|
||||
r <- serverHandleNormalCall s rm dummyMeta $ \_ body _ -> do
|
||||
body @?= "Hello"
|
||||
r <- serverHandleNormalCall s rm dummyMeta $ \c -> do
|
||||
payload c @?= "Hello"
|
||||
return ("reply test", dummyMeta, StatusOk, "")
|
||||
return ()
|
||||
unregThread = do
|
||||
|
@ -138,9 +138,9 @@ testPayload =
|
|||
trailMD @?= dummyMeta
|
||||
server s = do
|
||||
let rm = head (normalMethods s)
|
||||
r <- serverHandleNormalCall s rm dummyMeta $ \_ reqBody reqMD -> do
|
||||
reqBody @?= "Hello!"
|
||||
checkMD "Server metadata mismatch" clientMD reqMD
|
||||
r <- serverHandleNormalCall s rm dummyMeta $ \c -> do
|
||||
payload c @?= "Hello!"
|
||||
checkMD "Server metadata mismatch" clientMD (metadata c)
|
||||
return ("reply test", dummyMeta, StatusOk, "details string")
|
||||
r @?= Right ()
|
||||
|
||||
|
@ -154,7 +154,7 @@ testServerCancel =
|
|||
res @?= badStatus StatusCancelled
|
||||
server s = do
|
||||
let rm = head (normalMethods s)
|
||||
r <- serverHandleNormalCall s rm mempty $ \c _ _ -> do
|
||||
r <- serverHandleNormalCall s rm mempty $ \c -> do
|
||||
serverCallCancel c StatusCancelled ""
|
||||
return (mempty, mempty, StatusCancelled, "")
|
||||
r @?= Right ()
|
||||
|
@ -181,8 +181,8 @@ testServerStreaming =
|
|||
r <- serverWriter s rm serverInitMD $ \sc send -> do
|
||||
liftIO $ do
|
||||
checkMD "Server request metadata mismatch"
|
||||
clientInitMD (requestMetadataRecv sc)
|
||||
optionalPayload sc @?= clientPay
|
||||
clientInitMD (metadata sc)
|
||||
payload sc @?= clientPay
|
||||
forM_ pays $ \p -> send p `is` Right ()
|
||||
return (dummyMeta, StatusOk, "dtls")
|
||||
r @?= Right ()
|
||||
|
@ -212,8 +212,8 @@ testServerStreamingUnregistered =
|
|||
r <- U.serverWriter s call serverInitMD $ \sc send -> do
|
||||
liftIO $ do
|
||||
checkMD "Server request metadata mismatch"
|
||||
clientInitMD (requestMetadataRecv sc)
|
||||
optionalPayload sc @?= clientPay
|
||||
clientInitMD (metadata sc)
|
||||
payload sc @?= clientPay
|
||||
forM_ pays $ \p -> send p `is` Right ()
|
||||
return (dummyMeta, StatusOk, "dtls")
|
||||
r @?= Right ()
|
||||
|
@ -241,7 +241,7 @@ testClientStreaming =
|
|||
let rm = head (cstreamingMethods s)
|
||||
eea <- serverReader s rm serverInitMD $ \sc recv -> do
|
||||
liftIO $ checkMD "Client request metadata mismatch"
|
||||
clientInitMD (requestMetadataRecv sc)
|
||||
clientInitMD (metadata sc)
|
||||
forM_ pays $ \p -> recv `is` Right (Just p)
|
||||
recv `is` Right Nothing
|
||||
return (Just serverRsp, trailMD, serverStatus, serverDtls)
|
||||
|
@ -269,7 +269,7 @@ testClientStreamingUnregistered =
|
|||
server s = U.withServerCallAsync s $ \call -> do
|
||||
eea <- U.serverReader s call serverInitMD $ \sc recv -> do
|
||||
liftIO $ checkMD "Client request metadata mismatch"
|
||||
clientInitMD (requestMetadataRecv sc)
|
||||
clientInitMD (metadata sc)
|
||||
forM_ pays $ \p -> recv `is` Right (Just p)
|
||||
recv `is` Right Nothing
|
||||
return (Just serverRsp, trailMD, serverStatus, serverDtls)
|
||||
|
@ -301,7 +301,7 @@ testBiDiStreaming =
|
|||
let rm = head (bidiStreamingMethods s)
|
||||
eea <- serverRW s rm serverInitMD $ \sc recv send -> do
|
||||
liftIO $ checkMD "Client request metadata mismatch"
|
||||
clientInitMD (requestMetadataRecv sc)
|
||||
clientInitMD (metadata sc)
|
||||
recv `is` Right (Just "cw0")
|
||||
send "sw0" `is` Right ()
|
||||
recv `is` Right (Just "cw1")
|
||||
|
@ -336,7 +336,7 @@ testBiDiStreamingUnregistered =
|
|||
server s = U.withServerCallAsync s $ \call -> do
|
||||
eea <- U.serverRW s call serverInitMD $ \sc recv send -> do
|
||||
liftIO $ checkMD "Client request metadata mismatch"
|
||||
clientInitMD (requestMetadataRecv sc)
|
||||
clientInitMD (metadata sc)
|
||||
recv `is` Right (Just "cw0")
|
||||
send "sw0" `is` Right ()
|
||||
recv `is` Right (Just "cw1")
|
||||
|
@ -412,7 +412,7 @@ testSlowServer =
|
|||
result @?= badStatus StatusDeadlineExceeded
|
||||
server s = do
|
||||
let rm = head (normalMethods s)
|
||||
serverHandleNormalCall s rm mempty $ \_ _ _ -> do
|
||||
serverHandleNormalCall s rm mempty $ \_ -> do
|
||||
threadDelay (2*10^(6 :: Int))
|
||||
return dummyResp
|
||||
return ()
|
||||
|
@ -427,7 +427,7 @@ testServerCallExpirationCheck =
|
|||
return ()
|
||||
server s = do
|
||||
let rm = head (normalMethods s)
|
||||
serverHandleNormalCall s rm mempty $ \c _ _ -> do
|
||||
serverHandleNormalCall s rm mempty $ \c -> do
|
||||
exp1 <- serverCallIsExpired c
|
||||
assertBool "Call isn't expired when handler starts" $ not exp1
|
||||
threadDelaySecs 1
|
||||
|
@ -451,8 +451,8 @@ testCustomUserAgent =
|
|||
return ()
|
||||
server = TestServer (serverConf (["/foo"],[],[],[])) $ \s -> do
|
||||
let rm = head (normalMethods s)
|
||||
serverHandleNormalCall s rm mempty $ \_ _ meta -> do
|
||||
let ua = meta M.! "user-agent"
|
||||
serverHandleNormalCall s rm mempty $ \c -> do
|
||||
let ua = (metadata c) M.! "user-agent"
|
||||
assertBool "User agent prefix is present" $ isPrefixOf "prefix!" ua
|
||||
assertBool "User agent suffix is present" $ isSuffixOf "suffix!" ua
|
||||
return dummyResp
|
||||
|
@ -472,8 +472,8 @@ testClientCompression =
|
|||
return ()
|
||||
server = TestServer (serverConf (["/foo"],[],[],[])) $ \s -> do
|
||||
let rm = head (normalMethods s)
|
||||
serverHandleNormalCall s rm mempty $ \_ body _ -> do
|
||||
body @?= "hello"
|
||||
serverHandleNormalCall s rm mempty $ \c -> do
|
||||
payload c @?= "hello"
|
||||
return dummyResp
|
||||
return ()
|
||||
|
||||
|
@ -500,8 +500,8 @@ testClientServerCompression =
|
|||
[CompressionAlgArg GrpcCompressDeflate]
|
||||
server = TestServer sconf $ \s -> do
|
||||
let rm = head (normalMethods s)
|
||||
serverHandleNormalCall s rm dummyMeta $ \_sc body _ -> do
|
||||
body @?= "hello"
|
||||
serverHandleNormalCall s rm dummyMeta $ \sc -> do
|
||||
payload sc @?= "hello"
|
||||
return ("hello", dummyMeta, StatusOk, StatusDetails "")
|
||||
return ()
|
||||
|
||||
|
@ -517,9 +517,9 @@ dummyMeta = [("foo","bar")]
|
|||
dummyResp :: (ByteString, MetadataMap, StatusCode, StatusDetails)
|
||||
dummyResp = ("", mempty, StatusOk, StatusDetails "")
|
||||
|
||||
dummyHandler :: ServerCall a -> ByteString -> MetadataMap
|
||||
dummyHandler :: ServerCall a
|
||||
-> IO (ByteString, MetadataMap, StatusCode, StatusDetails)
|
||||
dummyHandler _ _ _ = return dummyResp
|
||||
dummyHandler _ = return dummyResp
|
||||
|
||||
dummyResult' :: StatusDetails
|
||||
-> IO (ByteString, MetadataMap, StatusCode, StatusDetails)
|
||||
|
|
Loading…
Reference in a new issue