mirror of
https://github.com/unclechu/gRPC-haskell.git
synced 2024-11-23 11:39:43 +01:00
Distinguish hostnames vs "host:port" strings; minor echo client cleanup (#20)
* Remove explicit host:port parameter from clientRequest * Save ClientConfig in Client ADT; derive host:port string as needed * Add Port newtype and endpoint string constructor fn * Introduce Endpoint newtype for host:port strings; derive them as needed; misc cleanup * Clean up echo client
This commit is contained in:
parent
6041ae1cb9
commit
d46c0c1c94
7 changed files with 119 additions and 116 deletions
|
@ -1,37 +1,24 @@
|
||||||
{-# LANGUAGE OverloadedStrings #-}
|
{-# LANGUAGE LambdaCase #-}
|
||||||
|
{-# LANGUAGE OverloadedStrings #-}
|
||||||
|
{-# OPTIONS_GHC -fno-warn-missing-signatures #-}
|
||||||
|
{-# OPTIONS_GHC -fno-warn-unused-binds #-}
|
||||||
|
|
||||||
import Control.Concurrent (threadDelay)
|
import Control.Monad
|
||||||
import Control.Monad (forever)
|
import Network.GRPC.LowLevel
|
||||||
import Data.ByteString ()
|
|
||||||
import qualified Data.Map as M
|
|
||||||
import Network.GRPC.LowLevel
|
|
||||||
|
|
||||||
echoMethod :: MethodName
|
|
||||||
echoMethod = MethodName "/echo.Echo/DoEcho"
|
echoMethod = MethodName "/echo.Echo/DoEcho"
|
||||||
|
|
||||||
ntimes :: Int -> IO () -> IO ()
|
unregistered c = do
|
||||||
ntimes 1 f = f
|
clientRequest c echoMethod 1 "hi" mempty
|
||||||
ntimes n f = f >> (ntimes (n-1) f)
|
|
||||||
|
|
||||||
unregClient :: IO ()
|
registered c = do
|
||||||
unregClient = do
|
meth <- clientRegisterMethod c echoMethod Normal
|
||||||
withGRPC $ \grpc ->
|
clientRegisteredRequest c meth 1 "hi" mempty
|
||||||
withClient grpc (ClientConfig "localhost" 50051) $ \client ->
|
|
||||||
ntimes 100000 $ do
|
|
||||||
reqResult <- clientRequest client echoMethod "localhost:50051" 1 "hi" M.empty
|
|
||||||
case reqResult of
|
|
||||||
Left x -> error $ "Got client error: " ++ show x
|
|
||||||
Right resp -> return ()
|
|
||||||
|
|
||||||
regClient :: IO ()
|
run f = withGRPC $ \g -> withClient g (ClientConfig "localhost" 50051) $ \c ->
|
||||||
regClient = do
|
f c >>= \case
|
||||||
withGRPC $ \grpc ->
|
Left e -> error $ "Got client error: " ++ show e
|
||||||
withClient grpc (ClientConfig "localhost" 50051) $ \client -> ntimes 100000 $ do
|
_ -> return ()
|
||||||
regMethod <- clientRegisterMethod client echoMethod Normal
|
|
||||||
reqResult <- clientRegisteredRequest client regMethod 1 "hi" M.empty
|
|
||||||
case reqResult of
|
|
||||||
Left x -> error $ "Got client error: " ++ show x
|
|
||||||
Right resp -> return ()
|
|
||||||
|
|
||||||
main :: IO ()
|
main = replicateM_ 100 $ run $
|
||||||
main = regClient
|
registered
|
||||||
|
|
|
@ -21,7 +21,7 @@ Flag Debug
|
||||||
flag with-examples
|
flag with-examples
|
||||||
description: Also build example executables.
|
description: Also build example executables.
|
||||||
manual: True
|
manual: True
|
||||||
default: False
|
default: True
|
||||||
|
|
||||||
library
|
library
|
||||||
build-depends:
|
build-depends:
|
||||||
|
|
|
@ -28,6 +28,16 @@ newtype MethodName = MethodName {unMethodName :: String}
|
||||||
newtype Host = Host {unHost :: String}
|
newtype Host = Host {unHost :: String}
|
||||||
deriving (Show, Eq, IsString)
|
deriving (Show, Eq, IsString)
|
||||||
|
|
||||||
|
newtype Port = Port {unPort :: Int}
|
||||||
|
deriving (Eq, Num, Show)
|
||||||
|
|
||||||
|
newtype Endpoint = Endpoint {unEndpoint :: String}
|
||||||
|
deriving (Show, Eq, IsString)
|
||||||
|
|
||||||
|
-- | Given a hostname and port, produces a "host:port" string
|
||||||
|
endpoint :: Host -> Port -> Endpoint
|
||||||
|
endpoint (Host h) (Port p) = Endpoint (h ++ ":" ++ show p)
|
||||||
|
|
||||||
-- | Represents a registered method. Methods can optionally be registered in
|
-- | Represents a registered method. Methods can optionally be registered in
|
||||||
-- order to make the C-level request/response code simpler.
|
-- order to make the C-level request/response code simpler.
|
||||||
-- Before making or awaiting a registered call, the
|
-- Before making or awaiting a registered call, the
|
||||||
|
@ -36,7 +46,7 @@ newtype Host = Host {unHost :: String}
|
||||||
-- Contains state for identifying that method in the underlying gRPC library.
|
-- Contains state for identifying that method in the underlying gRPC library.
|
||||||
data RegisteredMethod = RegisteredMethod {methodType :: GRPCMethodType,
|
data RegisteredMethod = RegisteredMethod {methodType :: GRPCMethodType,
|
||||||
methodName :: MethodName,
|
methodName :: MethodName,
|
||||||
methodHost :: Host,
|
methodEndpoint :: Endpoint,
|
||||||
methodHandle :: C.CallHandle}
|
methodHandle :: C.CallHandle}
|
||||||
|
|
||||||
-- | Represents one GRPC call (i.e. request) on the client.
|
-- | Represents one GRPC call (i.e. request) on the client.
|
||||||
|
|
|
@ -19,20 +19,22 @@ import Network.GRPC.LowLevel.Op
|
||||||
-- | Represents the context needed to perform client-side gRPC operations.
|
-- | Represents the context needed to perform client-side gRPC operations.
|
||||||
data Client = Client {clientChannel :: C.Channel,
|
data Client = Client {clientChannel :: C.Channel,
|
||||||
clientCQ :: CompletionQueue,
|
clientCQ :: CompletionQueue,
|
||||||
clientHostPort :: String}
|
clientConfig :: ClientConfig
|
||||||
|
}
|
||||||
|
|
||||||
-- | Configuration necessary to set up a client.
|
-- | Configuration necessary to set up a client.
|
||||||
data ClientConfig = ClientConfig {clientServerHost :: Host,
|
data ClientConfig = ClientConfig {serverHost :: Host,
|
||||||
clientServerPort :: Int}
|
serverPort :: Port}
|
||||||
|
|
||||||
|
clientEndpoint :: ClientConfig -> Endpoint
|
||||||
|
clientEndpoint ClientConfig{..} = endpoint serverHost serverPort
|
||||||
|
|
||||||
createClient :: GRPC -> ClientConfig -> IO Client
|
createClient :: GRPC -> ClientConfig -> IO Client
|
||||||
createClient grpc conf@ClientConfig{..} = do
|
createClient grpc clientConfig = do
|
||||||
let clientHostPort = (unHost clientServerHost)
|
let Endpoint e = clientEndpoint clientConfig
|
||||||
++ ":"
|
clientChannel <- C.grpcInsecureChannelCreate e nullPtr C.reserved
|
||||||
++ (show clientServerPort)
|
|
||||||
clientChannel <- C.grpcInsecureChannelCreate clientHostPort nullPtr C.reserved
|
|
||||||
clientCQ <- createCompletionQueue grpc
|
clientCQ <- createCompletionQueue grpc
|
||||||
return $ Client{..}
|
return Client{..}
|
||||||
|
|
||||||
destroyClient :: Client -> IO ()
|
destroyClient :: Client -> IO ()
|
||||||
destroyClient Client{..} = do
|
destroyClient Client{..} = do
|
||||||
|
@ -59,12 +61,11 @@ clientRegisterMethod :: Client
|
||||||
-- ^ method name, e.g. "/foo"
|
-- ^ method name, e.g. "/foo"
|
||||||
-> GRPCMethodType
|
-> GRPCMethodType
|
||||||
-> IO RegisteredMethod
|
-> IO RegisteredMethod
|
||||||
clientRegisterMethod Client{..} name Normal = do
|
clientRegisterMethod Client{..} meth Normal = do
|
||||||
|
let e = clientEndpoint clientConfig
|
||||||
handle <- C.grpcChannelRegisterCall clientChannel
|
handle <- C.grpcChannelRegisterCall clientChannel
|
||||||
(unMethodName name)
|
(unMethodName meth) (unEndpoint e) C.reserved
|
||||||
clientHostPort
|
return $ RegisteredMethod Normal meth e handle
|
||||||
C.reserved
|
|
||||||
return $ RegisteredMethod Normal name (Host clientHostPort) handle
|
|
||||||
clientRegisterMethod _ _ _ = error "Streaming methods not yet implemented."
|
clientRegisterMethod _ _ _ = error "Streaming methods not yet implemented."
|
||||||
|
|
||||||
-- | Create a new call on the client for a registered method.
|
-- | Create a new call on the client for a registered method.
|
||||||
|
@ -97,23 +98,22 @@ withClientRegisteredCall client regmethod timeout f = do
|
||||||
-- method registration machinery. In practice, we'll probably only use the
|
-- method registration machinery. In practice, we'll probably only use the
|
||||||
-- registered method version, but we include this for completeness and testing.
|
-- registered method version, but we include this for completeness and testing.
|
||||||
clientCreateCall :: Client
|
clientCreateCall :: Client
|
||||||
-> MethodName
|
-> MethodName
|
||||||
-- ^ The method name
|
-> TimeoutSeconds
|
||||||
-> Host
|
-> IO (Either GRPCIOError ClientCall)
|
||||||
-- ^ The host.
|
clientCreateCall Client{..} meth timeout = do
|
||||||
-> TimeoutSeconds
|
|
||||||
-> IO (Either GRPCIOError ClientCall)
|
|
||||||
clientCreateCall Client{..} method host timeout = do
|
|
||||||
let parentCall = C.Call nullPtr
|
let parentCall = C.Call nullPtr
|
||||||
C.withDeadlineSeconds timeout $ \deadline -> do
|
C.withDeadlineSeconds timeout $ \deadline -> do
|
||||||
channelCreateCall clientChannel parentCall C.propagateDefaults
|
channelCreateCall clientChannel parentCall C.propagateDefaults
|
||||||
clientCQ method host deadline
|
clientCQ meth (clientEndpoint clientConfig) deadline
|
||||||
|
|
||||||
withClientCall :: Client -> MethodName -> Host -> TimeoutSeconds
|
withClientCall :: Client
|
||||||
-> (ClientCall -> IO (Either GRPCIOError a))
|
-> MethodName
|
||||||
-> IO (Either GRPCIOError a)
|
-> TimeoutSeconds
|
||||||
withClientCall client method host timeout f = do
|
-> (ClientCall -> IO (Either GRPCIOError a))
|
||||||
createResult <- clientCreateCall client method host timeout
|
-> IO (Either GRPCIOError a)
|
||||||
|
withClientCall client method timeout f = do
|
||||||
|
createResult <- clientCreateCall client method timeout
|
||||||
case createResult of
|
case createResult of
|
||||||
Left x -> return $ Left x
|
Left x -> return $ Left x
|
||||||
Right call -> f call `finally` logDestroy call
|
Right call -> f call `finally` logDestroy call
|
||||||
|
@ -201,18 +201,16 @@ clientRegisteredRequest client@(Client{..}) rm@(RegisteredMethod{..})
|
||||||
clientRequest :: Client
|
clientRequest :: Client
|
||||||
-> MethodName
|
-> MethodName
|
||||||
-- ^ Method name, e.g. "/foo"
|
-- ^ Method name, e.g. "/foo"
|
||||||
-> Host
|
|
||||||
-- ^ Host. Not sure if used.
|
|
||||||
-> TimeoutSeconds
|
-> TimeoutSeconds
|
||||||
|
-- ^ "Number of seconds until request times out"
|
||||||
-> ByteString
|
-> ByteString
|
||||||
-- ^ Request body.
|
-- ^ Request body.
|
||||||
-> MetadataMap
|
-> MetadataMap
|
||||||
-- ^ Request metadata.
|
-- ^ Request metadata.
|
||||||
-> IO (Either GRPCIOError NormalRequestResult)
|
-> IO (Either GRPCIOError NormalRequestResult)
|
||||||
clientRequest client@(Client{..}) (MethodName method) (Host host)
|
clientRequest client@Client{..} meth timeLimit body meta =
|
||||||
timeLimit body meta =
|
fmap join $ do
|
||||||
fmap join $
|
withClientCall client meth timeLimit $ \call -> do
|
||||||
withClientCall client (MethodName method) (Host host) timeLimit $ \call -> do
|
|
||||||
let ops = clientNormalRequestOps body meta
|
let ops = clientNormalRequestOps body meta
|
||||||
results <- runClientOps call clientCQ ops timeLimit
|
results <- runClientOps call clientCQ ops timeLimit
|
||||||
grpcDebug "clientRequest: ops ran."
|
grpcDebug "clientRequest: ops ran."
|
||||||
|
|
|
@ -237,15 +237,18 @@ channelCreateRegisteredCall
|
||||||
handle deadline C.reserved
|
handle deadline C.reserved
|
||||||
return $ Right $ ClientCall call
|
return $ Right $ ClientCall call
|
||||||
|
|
||||||
channelCreateCall :: C.Channel -> C.Call -> C.PropagationMask -> CompletionQueue
|
channelCreateCall :: C.Channel
|
||||||
-> MethodName -> Host -> C.CTimeSpecPtr
|
-> C.Call
|
||||||
-> IO (Either GRPCIOError ClientCall)
|
-> C.PropagationMask
|
||||||
channelCreateCall
|
-> CompletionQueue
|
||||||
chan parent mask cq@CompletionQueue{..} (MethodName methodName) (Host host)
|
-> MethodName
|
||||||
deadline =
|
-> Endpoint
|
||||||
|
-> C.CTimeSpecPtr
|
||||||
|
-> IO (Either GRPCIOError ClientCall)
|
||||||
|
channelCreateCall chan parent mask cq@CompletionQueue{..} meth endpt deadline =
|
||||||
withPermission Push cq $ do
|
withPermission Push cq $ do
|
||||||
call <- C.grpcChannelCreateCall chan parent mask unsafeCQ methodName host
|
call <- C.grpcChannelCreateCall chan parent mask unsafeCQ
|
||||||
deadline C.reserved
|
(unMethodName meth) (unEndpoint endpt) deadline C.reserved
|
||||||
return $ Right $ ClientCall call
|
return $ Right $ ClientCall call
|
||||||
|
|
||||||
-- | Create the call object to handle a registered call.
|
-- | Create the call object to handle a registered call.
|
||||||
|
|
|
@ -24,41 +24,47 @@ import Network.GRPC.LowLevel.GRPC
|
||||||
import Network.GRPC.LowLevel.Op
|
import Network.GRPC.LowLevel.Op
|
||||||
|
|
||||||
-- | Wraps various gRPC state needed to run a server.
|
-- | Wraps various gRPC state needed to run a server.
|
||||||
data Server = Server {internalServer :: C.Server, serverCQ :: CompletionQueue,
|
data Server = Server
|
||||||
registeredMethods :: [RegisteredMethod]}
|
{ internalServer :: C.Server
|
||||||
|
, serverCQ :: CompletionQueue
|
||||||
|
, registeredMethods :: [RegisteredMethod]
|
||||||
|
, serverConfig :: ServerConfig
|
||||||
|
}
|
||||||
|
|
||||||
-- | Configuration needed to start a server. There might be more fields that
|
-- | Configuration needed to start a server.
|
||||||
-- need to be added to this in the future.
|
data ServerConfig = ServerConfig
|
||||||
data ServerConfig =
|
{ host :: Host
|
||||||
ServerConfig {hostName :: Host,
|
-- ^ Name of the host the server is running on. Not sure how this is
|
||||||
-- ^ Name of the host the server is running on. Not sure
|
-- used. Setting to "localhost" works fine in tests.
|
||||||
-- how this is used. Setting to "localhost" works fine in tests.
|
, port :: Port
|
||||||
port :: Int,
|
-- ^ Port on which to listen for requests.
|
||||||
-- ^ Port to listen for requests on.
|
, methodsToRegister :: [(MethodName, GRPCMethodType)]
|
||||||
methodsToRegister :: [(MethodName, GRPCMethodType)]
|
-- ^ List of (method name, method type) tuples specifying all methods to
|
||||||
-- ^ List of (method name, method host, method type) tuples
|
-- register. You can also handle other unregistered methods with
|
||||||
-- specifying all methods to register. You can also handle
|
-- `serverHandleNormalCall`.
|
||||||
-- other unregistered methods with `serverHandleNormalCall`.
|
}
|
||||||
}
|
|
||||||
deriving (Show, Eq)
|
deriving (Show, Eq)
|
||||||
|
|
||||||
|
serverEndpoint :: ServerConfig -> Endpoint
|
||||||
|
serverEndpoint ServerConfig{..} = endpoint host port
|
||||||
|
|
||||||
startServer :: GRPC -> ServerConfig -> IO Server
|
startServer :: GRPC -> ServerConfig -> IO Server
|
||||||
startServer grpc ServerConfig{..} = do
|
startServer grpc conf@ServerConfig{..} = do
|
||||||
|
let e = serverEndpoint conf
|
||||||
server <- C.grpcServerCreate nullPtr C.reserved
|
server <- C.grpcServerCreate nullPtr C.reserved
|
||||||
let hostPort = (unHost hostName) ++ ":" ++ (show port)
|
actualPort <- C.grpcServerAddInsecureHttp2Port server (unEndpoint e)
|
||||||
actualPort <- C.grpcServerAddInsecureHttp2Port server hostPort
|
when (actualPort /= unPort port) $
|
||||||
when (actualPort /= port) (error $ "Unable to bind port: " ++ (show port))
|
error $ "Unable to bind port: " ++ show port
|
||||||
cq <- createCompletionQueue grpc
|
cq <- createCompletionQueue grpc
|
||||||
serverRegisterCompletionQueue server cq
|
serverRegisterCompletionQueue server cq
|
||||||
methods <- forM methodsToRegister $
|
methods <- forM methodsToRegister $ \(name, mtype) ->
|
||||||
\(name, mtype) ->
|
serverRegisterMethod server name e mtype
|
||||||
serverRegisterMethod server name (Host hostPort) mtype
|
|
||||||
C.grpcServerStart server
|
C.grpcServerStart server
|
||||||
return $ Server server cq methods
|
return $ Server server cq methods conf
|
||||||
|
|
||||||
stopServer :: Server -> IO ()
|
stopServer :: Server -> IO ()
|
||||||
-- TODO: Do method handles need to be freed?
|
-- TODO: Do method handles need to be freed?
|
||||||
stopServer (Server server cq _) = do
|
stopServer (Server server cq _ _) = do
|
||||||
grpcDebug "stopServer: calling shutdownNotify."
|
grpcDebug "stopServer: calling shutdownNotify."
|
||||||
shutdownNotify
|
shutdownNotify
|
||||||
grpcDebug "stopServer: cancelling all calls."
|
grpcDebug "stopServer: cancelling all calls."
|
||||||
|
@ -94,25 +100,24 @@ withServer grpc cfg f = bracket (startServer grpc cfg) stopServer f
|
||||||
-- the server is started, so we do it during startup according to the
|
-- the server is started, so we do it during startup according to the
|
||||||
-- 'ServerConfig'.
|
-- 'ServerConfig'.
|
||||||
serverRegisterMethod :: C.Server
|
serverRegisterMethod :: C.Server
|
||||||
-> MethodName
|
-> MethodName
|
||||||
-- ^ method name, e.g. "/foo"
|
-- ^ method name, e.g. "/foo"
|
||||||
-> Host
|
-> Endpoint
|
||||||
-- ^ host name, e.g. "localhost". I have no idea
|
-- ^ Endpoint name name, e.g. "localhost:9999". I have no
|
||||||
-- why this is needed since we have to supply a host
|
-- idea why this is needed since we have to provide these
|
||||||
-- name to start a server in the first place. It doesn't
|
-- parameters to start a server in the first place. It
|
||||||
-- seem to have any effect, even if it's filled with
|
-- doesn't seem to have any effect, even if it's filled
|
||||||
-- nonsense.
|
-- with nonsense.
|
||||||
-> GRPCMethodType
|
-> GRPCMethodType
|
||||||
-- ^ Type of method this will be. In the future, this
|
-- ^ Type of method this will be. In the future, this will
|
||||||
-- will be used to switch to the correct handling logic.
|
-- be used to switch to the correct handling logic.
|
||||||
-- Currently, the only valid choice is 'Normal'.
|
-- Currently, the only valid choice is 'Normal'.
|
||||||
-> IO RegisteredMethod
|
-> IO RegisteredMethod
|
||||||
serverRegisterMethod internalServer name host Normal = do
|
serverRegisterMethod internalServer meth e Normal = do
|
||||||
handle <- C.grpcServerRegisterMethod internalServer
|
handle <- C.grpcServerRegisterMethod internalServer
|
||||||
(unMethodName name)
|
(unMethodName meth) (unEndpoint e)
|
||||||
(unHost host)
|
|
||||||
grpcDebug $ "registered method to handle " ++ show handle
|
grpcDebug $ "registered method to handle " ++ show handle
|
||||||
return $ RegisteredMethod Normal name host handle
|
return $ RegisteredMethod Normal meth e handle
|
||||||
serverRegisterMethod _ _ _ _ = error "Streaming methods not implemented yet."
|
serverRegisterMethod _ _ _ _ = error "Streaming methods not implemented yet."
|
||||||
|
|
||||||
-- | Create a 'Call' with which to wait for the invocation of a registered
|
-- | Create a 'Call' with which to wait for the invocation of a registered
|
||||||
|
|
|
@ -78,7 +78,7 @@ payloadLowLevelClient = TestClient $ \grpc ->
|
||||||
payloadLowLevelClientUnregistered :: TestClient
|
payloadLowLevelClientUnregistered :: TestClient
|
||||||
payloadLowLevelClientUnregistered = TestClient $ \grpc -> do
|
payloadLowLevelClientUnregistered = TestClient $ \grpc -> do
|
||||||
withClient grpc (ClientConfig "localhost" 50051) $ \client -> do
|
withClient grpc (ClientConfig "localhost" 50051) $ \client -> do
|
||||||
reqResult <- clientRequest client "/foo" "localhost" 10 "Hello!" M.empty
|
reqResult <- clientRequest client "/foo" 10 "Hello!" M.empty
|
||||||
case reqResult of
|
case reqResult of
|
||||||
Left x -> error $ "Client got error: " ++ show x
|
Left x -> error $ "Client got error: " ++ show x
|
||||||
Right (NormalRequestResult
|
Right (NormalRequestResult
|
||||||
|
@ -166,7 +166,7 @@ testWithClientCall =
|
||||||
grpcTest "Client - Create/destroy call" $ \grpc -> do
|
grpcTest "Client - Create/destroy call" $ \grpc -> do
|
||||||
let conf = ClientConfig "localhost" 50051
|
let conf = ClientConfig "localhost" 50051
|
||||||
withClient grpc conf $ \client -> do
|
withClient grpc conf $ \client -> do
|
||||||
result <- withClientCall client "foo" "localhost" 10 $
|
result <- withClientCall client "foo" 10 $
|
||||||
const $ return $ Right ()
|
const $ return $ Right ()
|
||||||
case result of
|
case result of
|
||||||
Left err -> error $ show err
|
Left err -> error $ show err
|
||||||
|
|
Loading…
Reference in a new issue