gRPC-haskell/src/Network/GRPC/LowLevel/Server.hs
Connor Clark ce56953b24 Various example/benchmarking code (#16)
* initial echo client/server examples

* registered and unregistered versions of the example client

* ignore pyc files

* cpp echo code, flag to build examples

* threaded server example
2016-06-03 10:34:09 -07:00

267 lines
13 KiB
Haskell

{-# LANGUAGE RecordWildCards #-}
module Network.GRPC.LowLevel.Server where
import Control.Concurrent (threadDelay)
import Control.Exception (bracket, finally)
import Control.Monad
import Data.ByteString (ByteString)
import Foreign.Ptr (nullPtr)
import qualified Network.GRPC.Unsafe as C
import qualified Network.GRPC.Unsafe.Op as C
import Network.GRPC.LowLevel.Call
import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue,
TimeoutSeconds,
createCompletionQueue,
pluck,
serverRegisterCompletionQueue,
serverRequestCall,
serverRequestRegisteredCall,
serverShutdownAndNotify,
shutdownCompletionQueue)
import Network.GRPC.LowLevel.GRPC
import Network.GRPC.LowLevel.Op
-- | Wraps various gRPC state needed to run a server.
data Server = Server {internalServer :: C.Server, serverCQ :: CompletionQueue,
registeredMethods :: [RegisteredMethod]}
-- | Configuration needed to start a server. There might be more fields that
-- need to be added to this in the future.
data ServerConfig =
ServerConfig {hostName :: Host,
-- ^ Name of the host the server is running on. Not sure
-- how this is used. Setting to "localhost" works fine in tests.
port :: Int,
-- ^ Port to listen for requests on.
methodsToRegister :: [(MethodName, GRPCMethodType)]
-- ^ List of (method name, method host, method type) tuples
-- specifying all methods to register. You can also handle
-- other unregistered methods with `serverHandleNormalCall`.
}
deriving (Show, Eq)
startServer :: GRPC -> ServerConfig -> IO Server
startServer grpc ServerConfig{..} = do
server <- C.grpcServerCreate nullPtr C.reserved
let hostPort = (unHost hostName) ++ ":" ++ (show port)
actualPort <- C.grpcServerAddInsecureHttp2Port server hostPort
when (actualPort /= port) (error $ "Unable to bind port: " ++ (show port))
cq <- createCompletionQueue grpc
serverRegisterCompletionQueue server cq
methods <- forM methodsToRegister $
\(name, mtype) ->
serverRegisterMethod server name (Host hostPort) mtype
C.grpcServerStart server
return $ Server server cq methods
stopServer :: Server -> IO ()
-- TODO: Do method handles need to be freed?
stopServer (Server server cq _) = do
grpcDebug "stopServer: calling shutdownNotify."
shutdownNotify
grpcDebug "stopServer: cancelling all calls."
C.grpcServerCancelAllCalls server
grpcDebug "stopServer: call grpc_server_destroy."
C.grpcServerDestroy server
grpcDebug "stopServer: shutting down CQ."
shutdownCQ
where shutdownCQ = do
shutdownResult <- shutdownCompletionQueue cq
case shutdownResult of
Left _ -> do putStrLn "Warning: completion queue didn't shut down."
putStrLn "Trying to stop server anyway."
Right _ -> return ()
shutdownNotify = do
let shutdownTag = C.tag 0
serverShutdownAndNotify server cq shutdownTag
shutdownEvent <- pluck cq shutdownTag 30
case shutdownEvent of
-- This case occurs when we pluck but the queue is already in the
-- 'shuttingDown' state, implying we already tried to shut down.
(Left GRPCIOShutdown) -> error "Called stopServer twice!"
(Left _) -> error "Failed to stop server."
(Right _) -> return ()
-- Uses 'bracket' to safely start and stop a server, even if exceptions occur.
withServer :: GRPC -> ServerConfig -> (Server -> IO a) -> IO a
withServer grpc cfg f = bracket (startServer grpc cfg) stopServer f
-- | Register a method on a server. The 'RegisteredMethod' type can then be used
-- to wait for a request to arrive. Note: gRPC claims this must be called before
-- the server is started, so we do it during startup according to the
-- 'ServerConfig'.
serverRegisterMethod :: C.Server
-> MethodName
-- ^ method name, e.g. "/foo"
-> Host
-- ^ host name, e.g. "localhost". I have no idea
-- why this is needed since we have to supply a host
-- name to start a server in the first place. It doesn't
-- seem to have any effect, even if it's filled with
-- nonsense.
-> GRPCMethodType
-- ^ Type of method this will be. In the future, this
-- will be used to switch to the correct handling logic.
-- Currently, the only valid choice is 'Normal'.
-> IO RegisteredMethod
serverRegisterMethod internalServer name host Normal = do
handle <- C.grpcServerRegisterMethod internalServer
(unMethodName name)
(unHost host)
grpcDebug $ "registered method to handle " ++ show handle
return $ RegisteredMethod Normal name host handle
serverRegisterMethod _ _ _ _ = error "Streaming methods not implemented yet."
-- | Create a 'Call' with which to wait for the invocation of a registered
-- method.
serverCreateRegisteredCall :: Server -> RegisteredMethod -> TimeoutSeconds
-> MetadataMap
-> IO (Either GRPCIOError ServerRegCall)
serverCreateRegisteredCall Server{..} rm timeLimit initMeta =
serverRequestRegisteredCall internalServer serverCQ timeLimit rm initMeta
withServerRegisteredCall :: Server -> RegisteredMethod -> TimeoutSeconds
-> MetadataMap
-> (ServerRegCall
-> IO (Either GRPCIOError a))
-> IO (Either GRPCIOError a)
withServerRegisteredCall server regmethod timeout initMeta f = do
createResult <- serverCreateRegisteredCall server regmethod timeout initMeta
case createResult of
Left x -> return $ Left x
Right call -> f call `finally` logDestroy call
where logDestroy c = grpcDebug "withServerRegisteredCall: destroying."
>> destroyServerRegCall c
serverCreateUnregCall :: Server -> TimeoutSeconds
-> IO (Either GRPCIOError ServerUnregCall)
serverCreateUnregCall Server{..} timeLimit =
serverRequestCall internalServer serverCQ timeLimit
withServerUnregCall :: Server -> TimeoutSeconds
-> (ServerUnregCall -> IO (Either GRPCIOError a))
-> IO (Either GRPCIOError a)
withServerUnregCall server timeout f = do
createResult <- serverCreateUnregCall server timeout
case createResult of
Left x -> return $ Left x
Right call -> f call `finally` logDestroy call
where logDestroy c = grpcDebug "withServerCall: destroying."
>> destroyServerUnregCall c
-- | Sequence of 'Op's needed to receive a normal (non-streaming) call.
serverOpsGetNormalCall :: MetadataMap -> [Op]
serverOpsGetNormalCall initMetadata =
[OpSendInitialMetadata initMetadata,
OpRecvMessage]
-- | Sequence of 'Op's needed to respond to a normal (non-streaming) call.
serverOpsSendNormalResponse :: ByteString
-> MetadataMap
-> C.StatusCode
-> StatusDetails
-> [Op]
serverOpsSendNormalResponse body metadata code details =
[OpRecvCloseOnServer,
OpSendMessage body,
OpSendStatusFromServer metadata code details]
serverOpsSendNormalRegisteredResponse :: ByteString
-> MetadataMap
-- ^ initial metadata
-> MetadataMap
-- ^ trailing metadata
-> C.StatusCode
-> StatusDetails
-> [Op]
serverOpsSendNormalRegisteredResponse
body initMetadata trailingMeta code details =
[OpSendInitialMetadata initMetadata,
OpRecvCloseOnServer,
OpSendMessage body,
OpSendStatusFromServer trailingMeta code details]
-- TODO: we will want to replace this with some more general concept that also
-- works with streaming calls in the future.
-- | Wait for and then handle a normal (non-streaming) call.
serverHandleNormalRegisteredCall :: Server
-> RegisteredMethod
-> TimeoutSeconds
-> MetadataMap
-- ^ Initial server metadata
-> (ByteString -> MetadataMap
-> IO (ByteString,
MetadataMap,
MetadataMap,
StatusDetails))
-- ^ Handler function takes a request body and
-- metadata and returns a response body and
-- metadata.
-> IO (Either GRPCIOError ())
serverHandleNormalRegisteredCall s@Server{..} rm timeLimit srvMetadata f = do
-- TODO: we use this timeLimit twice, so the max time spent is 2*timeLimit.
-- Should we just hard-code time limits instead? Not sure if client
-- programmer cares, since this function will likely just be put in a loop
-- anyway.
withServerRegisteredCall s rm timeLimit srvMetadata $ \call -> do
grpcDebug "serverHandleNormalRegisteredCall: starting batch."
debugServerRegCall call
payload <- serverRegCallGetPayload call
case payload of
--TODO: what should we do with an empty payload? Have the handler take
-- @Maybe ByteString@? Need to figure out when/why payload would be empty.
Nothing -> error "serverHandleNormalRegisteredCall: payload empty."
Just requestBody -> do
requestMeta <- serverRegCallGetMetadata call
(respBody, initMeta, trailingMeta, details) <- f requestBody requestMeta
let status = C.GrpcStatusOk
let respOps = serverOpsSendNormalRegisteredResponse
respBody initMeta trailingMeta status details
respOpsResults <- runServerRegOps call serverCQ respOps timeLimit
grpcDebug "serverHandleNormalRegisteredCall: finished response ops."
case respOpsResults of
Left x -> return $ Left x
Right _ -> return $ Right ()
-- TODO: This is preliminary.
-- We still need to provide the method name to the handler.
-- | Handle one unregistered call.
serverHandleNormalCall :: Server -> TimeoutSeconds
-> MetadataMap
-- ^ Initial server metadata.
-> (ByteString -> MetadataMap -> MethodName
-> IO (ByteString, MetadataMap, StatusDetails))
-- ^ Handler function takes a request body and
-- metadata and returns a response body and metadata.
-> IO (Either GRPCIOError ())
serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do
withServerUnregCall s timeLimit $ \call -> do
grpcDebug "serverHandleNormalCall: starting batch."
let recvOps = serverOpsGetNormalCall srvMetadata
opResults <- runServerUnregOps call serverCQ recvOps timeLimit
case opResults of
Left x -> do grpcDebug "serverHandleNormalCall: ops failed; aborting"
return $ Left x
Right [OpRecvMessageResult (Just body)] -> do
requestMeta <- serverUnregCallGetMetadata call
grpcDebug $ "got client metadata: " ++ show requestMeta
methodName <- serverUnregCallGetMethodName call
hostName <- serverUnregCallGetHost call
grpcDebug $ "call_details host is: " ++ show hostName
(respBody, respMetadata, details) <- f body requestMeta methodName
let status = C.GrpcStatusOk
let respOps = serverOpsSendNormalResponse
respBody respMetadata status details
respOpsResults <- runServerUnregOps call serverCQ respOps timeLimit
case respOpsResults of
Left x -> do grpcDebug "serverHandleNormalCall: resp failed."
return $ Left x
Right _ -> grpcDebug "serverHandleNormalCall: ops done."
>> return (Right ())
x -> error $ "impossible pattern match: " ++ show x
_nowarn_unused :: a
_nowarn_unused = undefined threadDelay