mirror of
https://github.com/unclechu/gRPC-haskell.git
synced 2025-02-17 05:35:02 +01:00
Channel options: user agents and compression (#32)
* get_peer: fix todo * add documentation for server registered call function * test roundtrip conversion of larger bytestrings (32 mb) * Add channel args interface: currently supports user agents and compression. * fix build failure after stack clean
This commit is contained in:
parent
08b22d198a
commit
a530faf912
15 changed files with 310 additions and 51 deletions
|
@ -2,6 +2,7 @@
|
||||||
#include <grpc/byte_buffer.h>
|
#include <grpc/byte_buffer.h>
|
||||||
#include <grpc/byte_buffer_reader.h>
|
#include <grpc/byte_buffer_reader.h>
|
||||||
#include <grpc/impl/codegen/grpc_types.h>
|
#include <grpc/impl/codegen/grpc_types.h>
|
||||||
|
#include <grpc/impl/codegen/compression_types.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
@ -408,3 +409,50 @@ void* grpc_server_register_method_(grpc_server* server, const char* method,
|
||||||
GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER,
|
GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER,
|
||||||
0);
|
0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
grpc_arg* create_arg_array(size_t n){
|
||||||
|
return malloc(sizeof(grpc_arg)*n);
|
||||||
|
}
|
||||||
|
|
||||||
|
//Converts our enum into real GRPC #defines. c2hs workaround.
|
||||||
|
char* translate_arg_key(enum supported_arg_key key){
|
||||||
|
switch (key) {
|
||||||
|
case compression_algorithm_key:
|
||||||
|
return GRPC_COMPRESSION_ALGORITHM_ARG;
|
||||||
|
case user_agent_prefix_key:
|
||||||
|
return GRPC_ARG_PRIMARY_USER_AGENT_STRING;
|
||||||
|
case user_agent_suffix_key:
|
||||||
|
return GRPC_ARG_SECONDARY_USER_AGENT_STRING;
|
||||||
|
default:
|
||||||
|
return "unknown_arg_key";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void create_string_arg(grpc_arg* args, size_t i,
|
||||||
|
enum supported_arg_key key, char* value){
|
||||||
|
grpc_arg* arg = args+i;
|
||||||
|
arg->type = GRPC_ARG_STRING;
|
||||||
|
arg->key = translate_arg_key(key);
|
||||||
|
char* storeValue = malloc(sizeof(char)*strlen(value));
|
||||||
|
arg->value.string = strcpy(storeValue, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
void create_int_arg(grpc_arg* args, size_t i,
|
||||||
|
enum supported_arg_key key, int value){
|
||||||
|
grpc_arg* arg = args+i;
|
||||||
|
arg->type = GRPC_ARG_INTEGER;
|
||||||
|
arg->key = translate_arg_key(key);
|
||||||
|
arg->value.integer = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Destroys an arg array of the given length. NOTE: the args in the arg array
|
||||||
|
//MUST have been created by the create_*_arg functions above!
|
||||||
|
void destroy_arg_array(grpc_arg* args, size_t n){
|
||||||
|
for(int i = 0; i < n; i++){
|
||||||
|
grpc_arg* arg = args+i;
|
||||||
|
if(arg->type == GRPC_ARG_STRING){
|
||||||
|
free(arg->value.string);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
free(args);
|
||||||
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@ registered c = do
|
||||||
meth <- clientRegisterMethod c echoMethod Normal
|
meth <- clientRegisterMethod c echoMethod Normal
|
||||||
clientRequest c meth 1 "hi" mempty
|
clientRequest c meth 1 "hi" mempty
|
||||||
|
|
||||||
run f = withGRPC $ \g -> withClient g (ClientConfig "localhost" 50051) $ \c ->
|
run f = withGRPC $ \g -> withClient g (ClientConfig "localhost" 50051 []) $ \c ->
|
||||||
f c >>= \case
|
f c >>= \case
|
||||||
Left e -> error $ "Got client error: " ++ show e
|
Left e -> error $ "Got client error: " ++ show e
|
||||||
_ -> return ()
|
_ -> return ()
|
||||||
|
|
|
@ -24,7 +24,7 @@ handler U.ServerCall{..} reqBody = do
|
||||||
|
|
||||||
unregMain :: IO ()
|
unregMain :: IO ()
|
||||||
unregMain = withGRPC $ \grpc -> do
|
unregMain = withGRPC $ \grpc -> do
|
||||||
withServer grpc (ServerConfig "localhost" 50051 []) $ \server -> forever $ do
|
withServer grpc (ServerConfig "localhost" 50051 [] []) $ \server -> forever $ do
|
||||||
result <- U.serverHandleNormalCall server serverMeta handler
|
result <- U.serverHandleNormalCall server serverMeta handler
|
||||||
case result of
|
case result of
|
||||||
Left x -> putStrLn $ "handle call result error: " ++ show x
|
Left x -> putStrLn $ "handle call result error: " ++ show x
|
||||||
|
@ -33,7 +33,7 @@ unregMain = withGRPC $ \grpc -> do
|
||||||
regMain :: IO ()
|
regMain :: IO ()
|
||||||
regMain = withGRPC $ \grpc -> do
|
regMain = withGRPC $ \grpc -> do
|
||||||
let methods = [(MethodName "/echo.Echo/DoEcho", Normal)]
|
let methods = [(MethodName "/echo.Echo/DoEcho", Normal)]
|
||||||
withServer grpc (ServerConfig "localhost" 50051 methods) $ \server ->
|
withServer grpc (ServerConfig "localhost" 50051 methods []) $ \server ->
|
||||||
forever $ do
|
forever $ do
|
||||||
let method = head (registeredMethods server)
|
let method = head (registeredMethods server)
|
||||||
result <- serverHandleNormalCall server method serverMeta $
|
result <- serverHandleNormalCall server method serverMeta $
|
||||||
|
@ -57,7 +57,7 @@ regMainThreaded :: IO ()
|
||||||
regMainThreaded = do
|
regMainThreaded = do
|
||||||
withGRPC $ \grpc -> do
|
withGRPC $ \grpc -> do
|
||||||
let methods = [(MethodName "/echo.Echo/DoEcho", Normal)]
|
let methods = [(MethodName "/echo.Echo/DoEcho", Normal)]
|
||||||
withServer grpc (ServerConfig "localhost" 50051 methods) $ \server -> do
|
withServer grpc (ServerConfig "localhost" 50051 methods []) $ \server -> do
|
||||||
let method = head (registeredMethods server)
|
let method = head (registeredMethods server)
|
||||||
tid1 <- async $ regLoop server method
|
tid1 <- async $ regLoop server method
|
||||||
tid2 <- async $ regLoop server method
|
tid2 <- async $ regLoop server method
|
||||||
|
|
|
@ -37,6 +37,7 @@ library
|
||||||
Network.GRPC.Unsafe.Constants
|
Network.GRPC.Unsafe.Constants
|
||||||
Network.GRPC.Unsafe.Time
|
Network.GRPC.Unsafe.Time
|
||||||
Network.GRPC.Unsafe.Slice
|
Network.GRPC.Unsafe.Slice
|
||||||
|
Network.GRPC.Unsafe.ChannelArgs
|
||||||
Network.GRPC.Unsafe.ByteBuffer
|
Network.GRPC.Unsafe.ByteBuffer
|
||||||
Network.GRPC.Unsafe.Metadata
|
Network.GRPC.Unsafe.Metadata
|
||||||
Network.GRPC.Unsafe.Op
|
Network.GRPC.Unsafe.Op
|
||||||
|
|
|
@ -140,4 +140,23 @@ gpr_timespec* call_details_get_deadline(grpc_call_details* details);
|
||||||
void* grpc_server_register_method_(grpc_server* server, const char* method,
|
void* grpc_server_register_method_(grpc_server* server, const char* method,
|
||||||
const char* host);
|
const char* host);
|
||||||
|
|
||||||
|
//c2hs doesn't support #const pragmas referring to #define'd strings, so we use
|
||||||
|
//this enum as a workaround. These are converted into actual GRPC #defines in
|
||||||
|
// translate_arg_key in grpc_haskell.c.
|
||||||
|
enum supported_arg_key {
|
||||||
|
compression_algorithm_key = 0,
|
||||||
|
user_agent_prefix_key,
|
||||||
|
user_agent_suffix_key
|
||||||
|
};
|
||||||
|
|
||||||
|
grpc_arg* create_arg_array(size_t n);
|
||||||
|
|
||||||
|
void create_string_arg(grpc_arg* args, size_t i,
|
||||||
|
enum supported_arg_key key, char* value);
|
||||||
|
|
||||||
|
void create_int_arg(grpc_arg* args, size_t i,
|
||||||
|
enum supported_arg_key key, int value);
|
||||||
|
|
||||||
|
void destroy_arg_array(grpc_arg* args, size_t n);
|
||||||
|
|
||||||
#endif //GRPC_HASKELL
|
#endif //GRPC_HASKELL
|
||||||
|
|
|
@ -26,6 +26,10 @@ GRPC
|
||||||
, MethodName(..)
|
, MethodName(..)
|
||||||
, StatusDetails(..)
|
, StatusDetails(..)
|
||||||
|
|
||||||
|
-- * Configuration options
|
||||||
|
, Arg(..)
|
||||||
|
, CompressionAlgorithm(..)
|
||||||
|
|
||||||
-- * Server
|
-- * Server
|
||||||
, ServerConfig(..)
|
, ServerConfig(..)
|
||||||
, Server
|
, Server
|
||||||
|
@ -64,3 +68,4 @@ import Network.GRPC.LowLevel.Call
|
||||||
|
|
||||||
import Network.GRPC.Unsafe (ConnectivityState(..))
|
import Network.GRPC.Unsafe (ConnectivityState(..))
|
||||||
import Network.GRPC.Unsafe.Op (StatusCode(..))
|
import Network.GRPC.Unsafe.Op (StatusCode(..))
|
||||||
|
import Network.GRPC.Unsafe.ChannelArgs(Arg(..), CompressionAlgorithm(..))
|
||||||
|
|
|
@ -10,10 +10,12 @@ import Control.Monad (join)
|
||||||
import Data.ByteString (ByteString)
|
import Data.ByteString (ByteString)
|
||||||
import Foreign.Ptr (nullPtr)
|
import Foreign.Ptr (nullPtr)
|
||||||
import qualified Network.GRPC.Unsafe as C
|
import qualified Network.GRPC.Unsafe as C
|
||||||
|
import qualified Network.GRPC.Unsafe.ChannelArgs as C
|
||||||
import qualified Network.GRPC.Unsafe.Constants as C
|
import qualified Network.GRPC.Unsafe.Constants as C
|
||||||
import qualified Network.GRPC.Unsafe.Op as C
|
import qualified Network.GRPC.Unsafe.Op as C
|
||||||
import qualified Network.GRPC.Unsafe.Time as C
|
import qualified Network.GRPC.Unsafe.Time as C
|
||||||
|
|
||||||
|
|
||||||
import Network.GRPC.LowLevel.Call
|
import Network.GRPC.LowLevel.Call
|
||||||
import Network.GRPC.LowLevel.CompletionQueue
|
import Network.GRPC.LowLevel.CompletionQueue
|
||||||
import Network.GRPC.LowLevel.GRPC
|
import Network.GRPC.LowLevel.GRPC
|
||||||
|
@ -27,17 +29,24 @@ data Client = Client {clientChannel :: C.Channel,
|
||||||
|
|
||||||
-- | Configuration necessary to set up a client.
|
-- | Configuration necessary to set up a client.
|
||||||
data ClientConfig = ClientConfig {serverHost :: Host,
|
data ClientConfig = ClientConfig {serverHost :: Host,
|
||||||
serverPort :: Port}
|
serverPort :: Port,
|
||||||
|
clientArgs :: [C.Arg]
|
||||||
|
-- ^ Optional arguments for setting up the
|
||||||
|
-- channel on the client. Supplying an empty
|
||||||
|
-- list will cause the channel to use gRPC's
|
||||||
|
-- default options.
|
||||||
|
}
|
||||||
|
|
||||||
clientEndpoint :: ClientConfig -> Endpoint
|
clientEndpoint :: ClientConfig -> Endpoint
|
||||||
clientEndpoint ClientConfig{..} = endpoint serverHost serverPort
|
clientEndpoint ClientConfig{..} = endpoint serverHost serverPort
|
||||||
|
|
||||||
createClient :: GRPC -> ClientConfig -> IO Client
|
createClient :: GRPC -> ClientConfig -> IO Client
|
||||||
createClient grpc clientConfig = do
|
createClient grpc clientConfig =
|
||||||
let Endpoint e = clientEndpoint clientConfig
|
C.withChannelArgs (clientArgs clientConfig) $ \chanargs -> do
|
||||||
clientChannel <- C.grpcInsecureChannelCreate e nullPtr C.reserved
|
let Endpoint e = clientEndpoint clientConfig
|
||||||
clientCQ <- createCompletionQueue grpc
|
clientChannel <- C.grpcInsecureChannelCreate e chanargs C.reserved
|
||||||
return Client{..}
|
clientCQ <- createCompletionQueue grpc
|
||||||
|
return Client{..}
|
||||||
|
|
||||||
destroyClient :: Client -> IO ()
|
destroyClient :: Client -> IO ()
|
||||||
destroyClient Client{..} = do
|
destroyClient Client{..} = do
|
||||||
|
|
|
@ -22,6 +22,7 @@ import Network.GRPC.LowLevel.GRPC
|
||||||
import Network.GRPC.LowLevel.Op
|
import Network.GRPC.LowLevel.Op
|
||||||
import qualified Network.GRPC.Unsafe as C
|
import qualified Network.GRPC.Unsafe as C
|
||||||
import qualified Network.GRPC.Unsafe.Op as C
|
import qualified Network.GRPC.Unsafe.Op as C
|
||||||
|
import qualified Network.GRPC.Unsafe.ChannelArgs as C
|
||||||
|
|
||||||
-- | Wraps various gRPC state needed to run a server.
|
-- | Wraps various gRPC state needed to run a server.
|
||||||
data Server = Server
|
data Server = Server
|
||||||
|
@ -42,6 +43,11 @@ data ServerConfig = ServerConfig
|
||||||
-- ^ List of (method name, method type) tuples specifying all methods to
|
-- ^ List of (method name, method type) tuples specifying all methods to
|
||||||
-- register. You can also handle other unregistered methods with
|
-- register. You can also handle other unregistered methods with
|
||||||
-- `serverHandleNormalCall`.
|
-- `serverHandleNormalCall`.
|
||||||
|
, serverArgs :: [C.Arg]
|
||||||
|
-- ^ Optional arguments for setting up the
|
||||||
|
-- channel on the server. Supplying an empty
|
||||||
|
-- list will cause the channel to use gRPC's
|
||||||
|
-- default options.
|
||||||
}
|
}
|
||||||
deriving (Show, Eq)
|
deriving (Show, Eq)
|
||||||
|
|
||||||
|
@ -49,18 +55,19 @@ serverEndpoint :: ServerConfig -> Endpoint
|
||||||
serverEndpoint ServerConfig{..} = endpoint host port
|
serverEndpoint ServerConfig{..} = endpoint host port
|
||||||
|
|
||||||
startServer :: GRPC -> ServerConfig -> IO Server
|
startServer :: GRPC -> ServerConfig -> IO Server
|
||||||
startServer grpc conf@ServerConfig{..} = do
|
startServer grpc conf@ServerConfig{..} =
|
||||||
let e = serverEndpoint conf
|
C.withChannelArgs serverArgs $ \args -> do
|
||||||
server <- C.grpcServerCreate nullPtr C.reserved
|
let e = serverEndpoint conf
|
||||||
actualPort <- C.grpcServerAddInsecureHttp2Port server (unEndpoint e)
|
server <- C.grpcServerCreate args C.reserved
|
||||||
when (actualPort /= unPort port) $
|
actualPort <- C.grpcServerAddInsecureHttp2Port server (unEndpoint e)
|
||||||
error $ "Unable to bind port: " ++ show port
|
when (actualPort /= unPort port) $
|
||||||
cq <- createCompletionQueue grpc
|
error $ "Unable to bind port: " ++ show port
|
||||||
serverRegisterCompletionQueue server cq
|
cq <- createCompletionQueue grpc
|
||||||
methods <- forM methodsToRegister $ \(name, mtype) ->
|
serverRegisterCompletionQueue server cq
|
||||||
serverRegisterMethod server name e mtype
|
methods <- forM methodsToRegister $ \(name, mtype) ->
|
||||||
C.grpcServerStart server
|
serverRegisterMethod server name e mtype
|
||||||
return $ Server server cq methods conf
|
C.grpcServerStart server
|
||||||
|
return $ Server server cq methods conf
|
||||||
|
|
||||||
stopServer :: Server -> IO ()
|
stopServer :: Server -> IO ()
|
||||||
-- TODO: Do method handles need to be freed?
|
-- TODO: Do method handles need to be freed?
|
||||||
|
|
|
@ -5,6 +5,7 @@ module Network.GRPC.Unsafe where
|
||||||
import Control.Exception (bracket)
|
import Control.Exception (bracket)
|
||||||
import Control.Monad
|
import Control.Monad
|
||||||
|
|
||||||
|
import Foreign.C.String (CString, peekCString)
|
||||||
import Foreign.C.Types
|
import Foreign.C.Types
|
||||||
import Foreign.Marshal.Alloc (free)
|
import Foreign.Marshal.Alloc (free)
|
||||||
import Foreign.Ptr
|
import Foreign.Ptr
|
||||||
|
@ -15,9 +16,11 @@ import Network.GRPC.Unsafe.Constants
|
||||||
{#import Network.GRPC.Unsafe.ByteBuffer#}
|
{#import Network.GRPC.Unsafe.ByteBuffer#}
|
||||||
{#import Network.GRPC.Unsafe.Op#}
|
{#import Network.GRPC.Unsafe.Op#}
|
||||||
{#import Network.GRPC.Unsafe.Metadata#}
|
{#import Network.GRPC.Unsafe.Metadata#}
|
||||||
|
{#import Network.GRPC.Unsafe.ChannelArgs#}
|
||||||
|
|
||||||
#include <grpc/grpc.h>
|
#include <grpc/grpc.h>
|
||||||
#include <grpc/status.h>
|
#include <grpc/status.h>
|
||||||
|
#include <grpc/impl/codegen/alloc.h>
|
||||||
#include <grpc_haskell.h>
|
#include <grpc_haskell.h>
|
||||||
|
|
||||||
{#context prefix = "grpc" #}
|
{#context prefix = "grpc" #}
|
||||||
|
@ -58,17 +61,6 @@ instance Storable Call where
|
||||||
peek p = fmap Call (peek (castPtr p))
|
peek p = fmap Call (peek (castPtr p))
|
||||||
poke p (Call r) = poke (castPtr p) r
|
poke p (Call r) = poke (castPtr p) r
|
||||||
|
|
||||||
-- {#enum grpc_arg_type as ArgType {underscoreToCase} deriving (Eq)#}
|
|
||||||
|
|
||||||
newtype ChannelArgs = ChannelArgs [Arg]
|
|
||||||
|
|
||||||
-- TODO Storable ChannelArgs
|
|
||||||
|
|
||||||
{#pointer *grpc_channel_args as ChannelArgsPtr -> ChannelArgs #}
|
|
||||||
|
|
||||||
data Arg = Arg { argKey :: String, argValue :: ArgValue }
|
|
||||||
data ArgValue = ArgString String | ArgInt Int
|
|
||||||
|
|
||||||
-- | A 'Tag' is an identifier that is used with a 'CompletionQueue' to signal
|
-- | A 'Tag' is an identifier that is used with a 'CompletionQueue' to signal
|
||||||
-- that the corresponding operation has completed.
|
-- that the corresponding operation has completed.
|
||||||
newtype Tag = Tag {unTag :: Ptr ()} deriving (Show, Eq)
|
newtype Tag = Tag {unTag :: Ptr ()} deriving (Show, Eq)
|
||||||
|
@ -184,7 +176,7 @@ castPeek p = do
|
||||||
-- expose any functions for creating channel args, since they are entirely
|
-- expose any functions for creating channel args, since they are entirely
|
||||||
-- undocumented.
|
-- undocumented.
|
||||||
{#fun grpc_insecure_channel_create as ^
|
{#fun grpc_insecure_channel_create as ^
|
||||||
{`String', `ChannelArgsPtr',unReserved `Reserved'} -> `Channel'#}
|
{`String', `GrpcChannelArgs', unReserved `Reserved'} -> `Channel'#}
|
||||||
|
|
||||||
{#fun grpc_channel_register_call as ^
|
{#fun grpc_channel_register_call as ^
|
||||||
{`Channel', `String', `String',unReserved `Reserved'}
|
{`Channel', `String', `String',unReserved `Reserved'}
|
||||||
|
@ -226,13 +218,21 @@ castPeek p = do
|
||||||
|
|
||||||
{#fun grpc_call_destroy as ^ {`Call'} -> `()'#}
|
{#fun grpc_call_destroy as ^ {`Call'} -> `()'#}
|
||||||
|
|
||||||
--TODO: we need to free this string with gpr_free!
|
-- | Gets the peer of the current call as a string.
|
||||||
{#fun grpc_call_get_peer as ^ {`Call'} -> `String' #}
|
{#fun grpc_call_get_peer as ^ {`Call'} -> `String' getPeerPeek* #}
|
||||||
|
|
||||||
|
{#fun gpr_free as ^ {`Ptr ()'} -> `()'#}
|
||||||
|
|
||||||
|
getPeerPeek :: CString -> IO String
|
||||||
|
getPeerPeek cstr = do
|
||||||
|
haskellStr <- peekCString cstr
|
||||||
|
gprFree (castPtr cstr)
|
||||||
|
return haskellStr
|
||||||
|
|
||||||
-- Server stuff
|
-- Server stuff
|
||||||
|
|
||||||
{#fun grpc_server_create as ^
|
{#fun grpc_server_create as ^
|
||||||
{`ChannelArgsPtr',unReserved `Reserved'} -> `Server'#}
|
{`GrpcChannelArgs',unReserved `Reserved'} -> `Server'#}
|
||||||
|
|
||||||
{#fun grpc_server_register_method_ as ^
|
{#fun grpc_server_register_method_ as ^
|
||||||
{`Server', `String', `String'} -> `CallHandle' CallHandle#}
|
{`Server', `String', `String'} -> `CallHandle' CallHandle#}
|
||||||
|
@ -269,7 +269,9 @@ castPeek p = do
|
||||||
`CompletionQueue', `CompletionQueue',unTag `Tag'}
|
`CompletionQueue', `CompletionQueue',unTag `Tag'}
|
||||||
-> `CallError'#}
|
-> `CallError'#}
|
||||||
|
|
||||||
-- | TODO: I am not yet sure how this function is supposed to be used.
|
-- | Request a registered call for the given registered method described by a
|
||||||
|
-- 'CallHandle'. The call, deadline, metadata array, and byte buffer are all
|
||||||
|
-- out parameters.
|
||||||
{#fun grpc_server_request_registered_call as ^
|
{#fun grpc_server_request_registered_call as ^
|
||||||
{`Server',unCallHandle `CallHandle',id `Ptr Call', `CTimeSpecPtr',
|
{`Server',unCallHandle `CallHandle',id `Ptr Call', `CTimeSpecPtr',
|
||||||
`MetadataArray', id `Ptr ByteBuffer', `CompletionQueue',
|
`MetadataArray', id `Ptr ByteBuffer', `CompletionQueue',
|
||||||
|
|
|
@ -10,15 +10,13 @@ module Network.GRPC.Unsafe.ByteBuffer where
|
||||||
#include <grpc_haskell.h>
|
#include <grpc_haskell.h>
|
||||||
|
|
||||||
{#import Network.GRPC.Unsafe.Slice#}
|
{#import Network.GRPC.Unsafe.Slice#}
|
||||||
|
{#import Network.GRPC.Unsafe.ChannelArgs#}
|
||||||
import Control.Exception (bracket)
|
import Control.Exception (bracket)
|
||||||
import qualified Data.ByteString as B
|
import qualified Data.ByteString as B
|
||||||
import Foreign.Ptr
|
import Foreign.Ptr
|
||||||
import Foreign.C.Types
|
import Foreign.C.Types
|
||||||
import Foreign.Storable
|
import Foreign.Storable
|
||||||
|
|
||||||
{#enum grpc_compression_algorithm as GRPCCompressionAlgorithm
|
|
||||||
{underscoreToCase} deriving (Eq) #}
|
|
||||||
|
|
||||||
-- | Represents a pointer to a gRPC byte buffer containing 1 or more 'Slice's.
|
-- | Represents a pointer to a gRPC byte buffer containing 1 or more 'Slice's.
|
||||||
-- Must be destroyed manually with 'grpcByteBufferDestroy'.
|
-- Must be destroyed manually with 'grpcByteBufferDestroy'.
|
||||||
{#pointer *grpc_byte_buffer as ByteBuffer newtype #}
|
{#pointer *grpc_byte_buffer as ByteBuffer newtype #}
|
||||||
|
@ -56,7 +54,7 @@ withByteBufferPtr
|
||||||
{#fun grpc_raw_byte_buffer_create as ^ {`Slice', `CULong'} -> `ByteBuffer'#}
|
{#fun grpc_raw_byte_buffer_create as ^ {`Slice', `CULong'} -> `ByteBuffer'#}
|
||||||
|
|
||||||
{#fun grpc_raw_compressed_byte_buffer_create as ^
|
{#fun grpc_raw_compressed_byte_buffer_create as ^
|
||||||
{`Slice', `CULong', `GRPCCompressionAlgorithm'} -> `ByteBuffer'#}
|
{`Slice', `CULong', `CompressionAlgorithm'} -> `ByteBuffer'#}
|
||||||
|
|
||||||
{#fun grpc_byte_buffer_copy as ^ {`ByteBuffer'} -> `ByteBuffer'#}
|
{#fun grpc_byte_buffer_copy as ^ {`ByteBuffer'} -> `ByteBuffer'#}
|
||||||
|
|
||||||
|
@ -78,7 +76,6 @@ withByteBufferPtr
|
||||||
{#fun grpc_raw_byte_buffer_from_reader as ^
|
{#fun grpc_raw_byte_buffer_from_reader as ^
|
||||||
{`ByteBufferReader'} -> `ByteBuffer'#}
|
{`ByteBufferReader'} -> `ByteBuffer'#}
|
||||||
|
|
||||||
-- TODO: Issue #5
|
|
||||||
withByteStringAsByteBuffer :: B.ByteString -> (ByteBuffer -> IO a) -> IO a
|
withByteStringAsByteBuffer :: B.ByteString -> (ByteBuffer -> IO a) -> IO a
|
||||||
withByteStringAsByteBuffer bs f = do
|
withByteStringAsByteBuffer bs f = do
|
||||||
bracket (byteStringToSlice bs) freeSlice $ \slice -> do
|
bracket (byteStringToSlice bs) freeSlice $ \slice -> do
|
||||||
|
|
85
src/Network/GRPC/Unsafe/ChannelArgs.chs
Normal file
85
src/Network/GRPC/Unsafe/ChannelArgs.chs
Normal file
|
@ -0,0 +1,85 @@
|
||||||
|
{-# LANGUAGE RecordWildCards #-}
|
||||||
|
|
||||||
|
module Network.GRPC.Unsafe.ChannelArgs where
|
||||||
|
|
||||||
|
import Control.Exception
|
||||||
|
import Control.Monad
|
||||||
|
import Foreign.Storable
|
||||||
|
import Foreign.Ptr (nullPtr)
|
||||||
|
import Foreign.Marshal.Alloc (malloc, free)
|
||||||
|
|
||||||
|
#include <grpc/grpc.h>
|
||||||
|
#include <grpc/status.h>
|
||||||
|
#include <grpc/impl/codegen/alloc.h>
|
||||||
|
#include <grpc/impl/codegen/compression_types.h>
|
||||||
|
#include <grpc_haskell.h>
|
||||||
|
|
||||||
|
{#enum supported_arg_key as ArgKey {underscoreToCase} deriving (Show, Eq)#}
|
||||||
|
|
||||||
|
{#enum grpc_compression_algorithm
|
||||||
|
as CompressionAlgorithm {underscoreToCase} deriving (Show, Eq)#}
|
||||||
|
|
||||||
|
{#enum grpc_compression_level
|
||||||
|
as CompressionLevel {underscoreToCase} deriving (Show, Eq)#}
|
||||||
|
|
||||||
|
{#pointer *grpc_arg as ^#}
|
||||||
|
|
||||||
|
data ChannelArgs = ChannelArgs {channelArgsN :: Int,
|
||||||
|
channelArgsArray :: GrpcArg}
|
||||||
|
deriving (Show, Eq)
|
||||||
|
|
||||||
|
{#pointer *grpc_channel_args as ^ -> ChannelArgs#}
|
||||||
|
|
||||||
|
instance Storable ChannelArgs where
|
||||||
|
sizeOf _ = {#sizeof grpc_channel_args#}
|
||||||
|
alignment _ = {#alignof grpc_channel_args#}
|
||||||
|
peek p = ChannelArgs <$> fmap fromIntegral
|
||||||
|
({#get grpc_channel_args->num_args#} p)
|
||||||
|
<*> ({#get grpc_channel_args->args#} p)
|
||||||
|
poke p ChannelArgs{..} = do
|
||||||
|
{#set grpc_channel_args.num_args#} p $ fromIntegral channelArgsN
|
||||||
|
{#set grpc_channel_args.args#} p channelArgsArray
|
||||||
|
|
||||||
|
{#fun create_arg_array as ^ {`Int'} -> `GrpcArg'#}
|
||||||
|
|
||||||
|
data ArgValue = StringArg String | IntArg Int
|
||||||
|
deriving (Show, Eq)
|
||||||
|
|
||||||
|
-- | Supported arguments for a channel. More cases will be added as we figure
|
||||||
|
-- out what they are.
|
||||||
|
data Arg = CompressionAlgArg CompressionAlgorithm
|
||||||
|
| UserAgentPrefix String
|
||||||
|
| UserAgentSuffix String
|
||||||
|
deriving (Show, Eq)
|
||||||
|
|
||||||
|
{#fun create_string_arg as ^ {`GrpcArg', `Int', `ArgKey', `String'} -> `()'#}
|
||||||
|
|
||||||
|
{#fun create_int_arg as ^ {`GrpcArg', `Int', `ArgKey', `Int'} -> `()'#}
|
||||||
|
|
||||||
|
{#fun destroy_arg_array as ^ {`GrpcArg', `Int'} -> `()'#}
|
||||||
|
|
||||||
|
createArg :: GrpcArg -> Arg -> Int -> IO ()
|
||||||
|
createArg array (CompressionAlgArg alg) i =
|
||||||
|
createIntArg array i CompressionAlgorithmKey (fromEnum alg)
|
||||||
|
createArg array (UserAgentPrefix prefix) i =
|
||||||
|
createStringArg array i UserAgentPrefixKey prefix
|
||||||
|
createArg array (UserAgentSuffix suffix) i =
|
||||||
|
createStringArg array i UserAgentSuffixKey suffix
|
||||||
|
|
||||||
|
createChannelArgs :: [Arg] -> IO GrpcChannelArgs
|
||||||
|
createChannelArgs args = do
|
||||||
|
let l = length args
|
||||||
|
array <- createArgArray l
|
||||||
|
forM_ (zip [0..l] args) $ \(i, arg) -> createArg array arg i
|
||||||
|
ptr <- malloc
|
||||||
|
poke ptr $ ChannelArgs l array
|
||||||
|
return ptr
|
||||||
|
|
||||||
|
destroyChannelArgs :: GrpcChannelArgs -> IO ()
|
||||||
|
destroyChannelArgs ptr =
|
||||||
|
do ChannelArgs{..} <- peek ptr
|
||||||
|
destroyArgArray channelArgsArray channelArgsN
|
||||||
|
free ptr
|
||||||
|
|
||||||
|
withChannelArgs :: [Arg] -> (GrpcChannelArgs -> IO a) -> IO a
|
||||||
|
withChannelArgs args f = bracket (createChannelArgs args) destroyChannelArgs f
|
|
@ -4,6 +4,7 @@ module Network.GRPC.Unsafe.Constants where
|
||||||
|
|
||||||
#include "grpc/grpc.h"
|
#include "grpc/grpc.h"
|
||||||
#include "grpc/impl/codegen/propagation_bits.h"
|
#include "grpc/impl/codegen/propagation_bits.h"
|
||||||
|
#include "grpc/impl/codegen/compression_types.h"
|
||||||
|
|
||||||
argEnableCensus :: Int
|
argEnableCensus :: Int
|
||||||
argEnableCensus = #const GRPC_ARG_ENABLE_CENSUS
|
argEnableCensus = #const GRPC_ARG_ENABLE_CENSUS
|
||||||
|
|
|
@ -8,7 +8,9 @@ module LowLevelTests where
|
||||||
import Control.Concurrent (threadDelay)
|
import Control.Concurrent (threadDelay)
|
||||||
import Control.Concurrent.Async
|
import Control.Concurrent.Async
|
||||||
import Control.Monad
|
import Control.Monad
|
||||||
import Data.ByteString (ByteString)
|
import Data.ByteString (ByteString,
|
||||||
|
isPrefixOf,
|
||||||
|
isSuffixOf)
|
||||||
import qualified Data.Map as M
|
import qualified Data.Map as M
|
||||||
import Network.GRPC.LowLevel
|
import Network.GRPC.LowLevel
|
||||||
import qualified Network.GRPC.LowLevel.Call.Unregistered as U
|
import qualified Network.GRPC.LowLevel.Call.Unregistered as U
|
||||||
|
@ -37,6 +39,9 @@ lowLevelTests = testGroup "Unit tests of low-level Haskell library"
|
||||||
, testGoaway
|
, testGoaway
|
||||||
, testSlowServer
|
, testSlowServer
|
||||||
, testServerCallExpirationCheck
|
, testServerCallExpirationCheck
|
||||||
|
, testCustomUserAgent
|
||||||
|
, testClientCompression
|
||||||
|
, testClientServerCompression
|
||||||
]
|
]
|
||||||
|
|
||||||
testGRPCBracket :: TestTree
|
testGRPCBracket :: TestTree
|
||||||
|
@ -204,7 +209,7 @@ testSlowServer =
|
||||||
let rm = head (registeredMethods s)
|
let rm = head (registeredMethods s)
|
||||||
serverHandleNormalCall s rm mempty $ \_ _ _ -> do
|
serverHandleNormalCall s rm mempty $ \_ _ _ -> do
|
||||||
threadDelay (2*10^(6 :: Int))
|
threadDelay (2*10^(6 :: Int))
|
||||||
return ("", mempty, StatusOk, StatusDetails "")
|
return dummyResp
|
||||||
return ()
|
return ()
|
||||||
|
|
||||||
testServerCallExpirationCheck :: TestTree
|
testServerCallExpirationCheck :: TestTree
|
||||||
|
@ -226,7 +231,73 @@ testServerCallExpirationCheck =
|
||||||
threadDelaySecs 3
|
threadDelaySecs 3
|
||||||
exp3 <- serverCallIsExpired c
|
exp3 <- serverCallIsExpired c
|
||||||
assertBool "Call is expired after 4 seconds" exp3
|
assertBool "Call is expired after 4 seconds" exp3
|
||||||
return ("", mempty, StatusCancelled, StatusDetails "")
|
return dummyResp
|
||||||
|
return ()
|
||||||
|
|
||||||
|
testCustomUserAgent :: TestTree
|
||||||
|
testCustomUserAgent =
|
||||||
|
csTest' "Server sees custom user agent prefix/suffix" client server
|
||||||
|
where
|
||||||
|
clientArgs = [UserAgentPrefix "prefix!", UserAgentSuffix "suffix!"]
|
||||||
|
client =
|
||||||
|
TestClient (ClientConfig "localhost" 50051 clientArgs) $
|
||||||
|
\c -> do rm <- clientRegisterMethod c "/foo" Normal
|
||||||
|
result <- clientRequest c rm 4 "" mempty
|
||||||
|
return ()
|
||||||
|
server = TestServer (stdServerConf [("/foo", Normal)]) $ \s -> do
|
||||||
|
let rm = head (registeredMethods s)
|
||||||
|
serverHandleNormalCall s rm mempty $ \_ _ meta -> do
|
||||||
|
let ua = meta M.! "user-agent"
|
||||||
|
assertBool "User agent prefix is present" $ isPrefixOf "prefix!" ua
|
||||||
|
assertBool "User agent suffix is present" $ isSuffixOf "suffix!" ua
|
||||||
|
return dummyResp
|
||||||
|
return ()
|
||||||
|
|
||||||
|
testClientCompression :: TestTree
|
||||||
|
testClientCompression =
|
||||||
|
csTest' "client-only compression: no errors" client server
|
||||||
|
where
|
||||||
|
client =
|
||||||
|
TestClient (ClientConfig
|
||||||
|
"localhost"
|
||||||
|
50051
|
||||||
|
[CompressionAlgArg GrpcCompressDeflate]) $ \c -> do
|
||||||
|
rm <- clientRegisterMethod c "/foo" Normal
|
||||||
|
result <- clientRequest c rm 1 "hello" mempty
|
||||||
|
return ()
|
||||||
|
server = TestServer (stdServerConf [("/foo", Normal)]) $ \s -> do
|
||||||
|
let rm = head (registeredMethods s)
|
||||||
|
serverHandleNormalCall s rm mempty $ \c body _ -> do
|
||||||
|
body @?= "hello"
|
||||||
|
return dummyResp
|
||||||
|
return ()
|
||||||
|
|
||||||
|
testClientServerCompression :: TestTree
|
||||||
|
testClientServerCompression =
|
||||||
|
csTest' "client/server compression: no errors" client server
|
||||||
|
where
|
||||||
|
cconf = ClientConfig "localhost"
|
||||||
|
50051
|
||||||
|
[CompressionAlgArg GrpcCompressDeflate]
|
||||||
|
client = TestClient cconf $ \c -> do
|
||||||
|
rm <- clientRegisterMethod c "/foo" Normal
|
||||||
|
clientRequest c rm 1 "hello" mempty >>= do
|
||||||
|
checkReqRslt $ \NormalRequestResult{..} -> do
|
||||||
|
rspCode @?= StatusOk
|
||||||
|
rspBody @?= "hello"
|
||||||
|
details @?= ""
|
||||||
|
initMD @?= Just dummyMeta
|
||||||
|
trailMD @?= dummyMeta
|
||||||
|
return ()
|
||||||
|
sconf = ServerConfig "localhost"
|
||||||
|
50051
|
||||||
|
[("/foo", Normal)]
|
||||||
|
[CompressionAlgArg GrpcCompressDeflate]
|
||||||
|
server = TestServer sconf $ \s -> do
|
||||||
|
let rm = head (registeredMethods s)
|
||||||
|
serverHandleNormalCall s rm dummyMeta $ \c body _ -> do
|
||||||
|
body @?= "hello"
|
||||||
|
return ("hello", dummyMeta, StatusOk, StatusDetails "")
|
||||||
return ()
|
return ()
|
||||||
|
|
||||||
--------------------------------------------------------------------------------
|
--------------------------------------------------------------------------------
|
||||||
|
@ -235,9 +306,11 @@ testServerCallExpirationCheck =
|
||||||
dummyMeta :: M.Map ByteString ByteString
|
dummyMeta :: M.Map ByteString ByteString
|
||||||
dummyMeta = [("foo","bar")]
|
dummyMeta = [("foo","bar")]
|
||||||
|
|
||||||
|
dummyResp = ("", mempty, StatusOk, StatusDetails "")
|
||||||
|
|
||||||
dummyHandler :: ServerCall -> ByteString -> MetadataMap
|
dummyHandler :: ServerCall -> ByteString -> MetadataMap
|
||||||
-> IO (ByteString, MetadataMap, StatusCode, StatusDetails)
|
-> IO (ByteString, MetadataMap, StatusCode, StatusDetails)
|
||||||
dummyHandler _ _ _ = return ("", mempty, StatusOk, StatusDetails "")
|
dummyHandler _ _ _ = return dummyResp
|
||||||
|
|
||||||
unavailableStatus :: Either GRPCIOError a
|
unavailableStatus :: Either GRPCIOError a
|
||||||
unavailableStatus =
|
unavailableStatus =
|
||||||
|
@ -302,7 +375,7 @@ stdTestClient :: (Client -> IO ()) -> TestClient
|
||||||
stdTestClient = TestClient stdClientConf
|
stdTestClient = TestClient stdClientConf
|
||||||
|
|
||||||
stdClientConf :: ClientConfig
|
stdClientConf :: ClientConfig
|
||||||
stdClientConf = ClientConfig "localhost" 50051
|
stdClientConf = ClientConfig "localhost" 50051 []
|
||||||
|
|
||||||
data TestServer = TestServer ServerConfig (Server -> IO ())
|
data TestServer = TestServer ServerConfig (Server -> IO ())
|
||||||
|
|
||||||
|
@ -313,7 +386,7 @@ stdTestServer :: [(MethodName, GRPCMethodType)] -> (Server -> IO ()) -> TestServ
|
||||||
stdTestServer = TestServer . stdServerConf
|
stdTestServer = TestServer . stdServerConf
|
||||||
|
|
||||||
stdServerConf :: [(MethodName, GRPCMethodType)] -> ServerConfig
|
stdServerConf :: [(MethodName, GRPCMethodType)] -> ServerConfig
|
||||||
stdServerConf = ServerConfig "localhost" 50051
|
stdServerConf xs = ServerConfig "localhost" 50051 xs []
|
||||||
|
|
||||||
|
|
||||||
threadDelaySecs :: Int -> IO ()
|
threadDelaySecs :: Int -> IO ()
|
||||||
|
|
|
@ -87,9 +87,9 @@ withClientServerUnaryCall grpc f = do
|
||||||
withServerCall s srm $ \sc ->
|
withServerCall s srm $ \sc ->
|
||||||
f (c, s, cc, sc)
|
f (c, s, cc, sc)
|
||||||
|
|
||||||
serverConf = (ServerConfig "localhost" 50051 [("/foo", Normal)])
|
serverConf = ServerConfig "localhost" 50051 [("/foo", Normal)] []
|
||||||
|
|
||||||
clientConf = (ClientConfig "localhost" 50051)
|
clientConf = ClientConfig "localhost" 50051 []
|
||||||
|
|
||||||
clientEmptySendOps = [OpSendInitialMetadata mempty,
|
clientEmptySendOps = [OpSendInitialMetadata mempty,
|
||||||
OpSendMessage "",
|
OpSendMessage "",
|
||||||
|
|
|
@ -16,6 +16,7 @@ import Network.GRPC.Unsafe.Metadata
|
||||||
import Network.GRPC.Unsafe.Op
|
import Network.GRPC.Unsafe.Op
|
||||||
import Network.GRPC.Unsafe.Slice
|
import Network.GRPC.Unsafe.Slice
|
||||||
import Network.GRPC.Unsafe.Time
|
import Network.GRPC.Unsafe.Time
|
||||||
|
import Network.GRPC.Unsafe.ChannelArgs
|
||||||
import Test.Tasty
|
import Test.Tasty
|
||||||
import Test.Tasty.HUnit as HU (testCase, (@?=))
|
import Test.Tasty.HUnit as HU (testCase, (@?=))
|
||||||
|
|
||||||
|
@ -23,14 +24,20 @@ unsafeTests :: TestTree
|
||||||
unsafeTests = testGroup "Unit tests for unsafe C bindings"
|
unsafeTests = testGroup "Unit tests for unsafe C bindings"
|
||||||
[ roundtripSlice "Hello, world!"
|
[ roundtripSlice "Hello, world!"
|
||||||
, roundtripByteBuffer "Hwaet! We gardena in geardagum..."
|
, roundtripByteBuffer "Hwaet! We gardena in geardagum..."
|
||||||
|
, roundtripSlice largeByteString
|
||||||
|
, roundtripByteBuffer largeByteString
|
||||||
, testMetadata
|
, testMetadata
|
||||||
, testNow
|
, testNow
|
||||||
, testCreateDestroyMetadata
|
, testCreateDestroyMetadata
|
||||||
, testCreateDestroyMetadataKeyVals
|
, testCreateDestroyMetadataKeyVals
|
||||||
, testCreateDestroyDeadline
|
, testCreateDestroyDeadline
|
||||||
, testPayload
|
, testPayload
|
||||||
|
, testCreateDestroyChannelArgs
|
||||||
]
|
]
|
||||||
|
|
||||||
|
largeByteString :: B.ByteString
|
||||||
|
largeByteString = B.pack $ take (32*1024*1024) $ cycle [97..99]
|
||||||
|
|
||||||
roundtripSlice :: B.ByteString -> TestTree
|
roundtripSlice :: B.ByteString -> TestTree
|
||||||
roundtripSlice bs = testCase "ByteString slice roundtrip" $ do
|
roundtripSlice bs = testCase "ByteString slice roundtrip" $ do
|
||||||
slice <- byteStringToSlice bs
|
slice <- byteStringToSlice bs
|
||||||
|
@ -98,6 +105,11 @@ testCreateDestroyDeadline :: TestTree
|
||||||
testCreateDestroyDeadline = testCase "Create/destroy deadline" $ do
|
testCreateDestroyDeadline = testCase "Create/destroy deadline" $ do
|
||||||
grpc $ withDeadlineSeconds 10 $ const $ return ()
|
grpc $ withDeadlineSeconds 10 $ const $ return ()
|
||||||
|
|
||||||
|
testCreateDestroyChannelArgs :: TestTree
|
||||||
|
testCreateDestroyChannelArgs = testCase "Create/destroy channel args" $
|
||||||
|
grpc $ withChannelArgs [CompressionAlgArg GrpcCompressDeflate] $
|
||||||
|
const $ return ()
|
||||||
|
|
||||||
assertCqEventComplete :: Event -> IO ()
|
assertCqEventComplete :: Event -> IO ()
|
||||||
assertCqEventComplete e = do
|
assertCqEventComplete e = do
|
||||||
eventCompletionType e HU.@?= OpComplete
|
eventCompletionType e HU.@?= OpComplete
|
||||||
|
|
Loading…
Add table
Reference in a new issue