mirror of
https://github.com/unclechu/gRPC-haskell.git
synced 2025-02-17 05:35:02 +01:00
Add ability to send metadata for server registered calls (#15)
* projections from CallDetails * refactor Call, refactor clientRegisteredRequest, handle null error * split ServerCall into separate reg/unreg types * pass method name to unreg call handler, finish destroyServerUnregCall * function for checking client connection * test metadata transmission * send initial metadata for registered calls * projections from CallDetails * refactor Call, refactor clientRegisteredRequest, handle null error * split ServerCall into separate reg/unreg types * pass method name to unreg call handler, finish destroyServerUnregCall * function for checking client connection * test metadata transmission * send initial metadata for registered calls
This commit is contained in:
parent
091bf4a457
commit
9ffdec4c56
10 changed files with 77 additions and 20 deletions
|
@ -343,10 +343,24 @@ grpc_metadata* metadata_array_get_metadata(grpc_metadata_array* arr){
|
||||||
return arr->metadata;
|
return arr->metadata;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void metadata_array_set_metadata(grpc_metadata_array* arr, grpc_metadata* meta){
|
||||||
|
arr->metadata = meta;
|
||||||
|
//NOTE: we assume count == capacity because that's how the 'createMetadata'
|
||||||
|
//Haskell function works. It isn't safe to call this function if the
|
||||||
|
//metadata was created in some other way.
|
||||||
|
size_t n = sizeof(meta);
|
||||||
|
arr->count = n;
|
||||||
|
arr->capacity = n;
|
||||||
|
}
|
||||||
|
|
||||||
size_t metadata_array_get_count(grpc_metadata_array* arr){
|
size_t metadata_array_get_count(grpc_metadata_array* arr){
|
||||||
return arr->count;
|
return arr->count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t metadata_array_get_capacity(grpc_metadata_array* arr){
|
||||||
|
return arr->capacity;
|
||||||
|
}
|
||||||
|
|
||||||
grpc_call* grpc_channel_create_registered_call_(
|
grpc_call* grpc_channel_create_registered_call_(
|
||||||
grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
|
grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
|
||||||
grpc_completion_queue *completion_queue, void *registered_call_handle,
|
grpc_completion_queue *completion_queue, void *registered_call_handle,
|
||||||
|
|
|
@ -116,8 +116,12 @@ void grpc_channel_watch_connectivity_state_(grpc_channel *channel,
|
||||||
|
|
||||||
grpc_metadata* metadata_array_get_metadata(grpc_metadata_array* arr);
|
grpc_metadata* metadata_array_get_metadata(grpc_metadata_array* arr);
|
||||||
|
|
||||||
|
void metadata_array_set_metadata(grpc_metadata_array* arr, grpc_metadata* meta);
|
||||||
|
|
||||||
size_t metadata_array_get_count(grpc_metadata_array* arr);
|
size_t metadata_array_get_count(grpc_metadata_array* arr);
|
||||||
|
|
||||||
|
size_t metadata_array_get_capacity(grpc_metadata_array* arr);
|
||||||
|
|
||||||
grpc_call* grpc_channel_create_registered_call_(
|
grpc_call* grpc_channel_create_registered_call_(
|
||||||
grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
|
grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
|
||||||
grpc_completion_queue *completion_queue, void *registered_call_handle,
|
grpc_completion_queue *completion_queue, void *registered_call_handle,
|
||||||
|
|
|
@ -39,6 +39,8 @@ GRPC
|
||||||
, ClientConfig(..)
|
, ClientConfig(..)
|
||||||
, Client
|
, Client
|
||||||
, ClientCall
|
, ClientCall
|
||||||
|
, ConnectivityState(..)
|
||||||
|
, clientConnectivity
|
||||||
, withClient
|
, withClient
|
||||||
, clientRegisterMethod
|
, clientRegisterMethod
|
||||||
, clientRegisteredRequest
|
, clientRegisteredRequest
|
||||||
|
@ -62,4 +64,5 @@ import Network.GRPC.LowLevel.Op
|
||||||
import Network.GRPC.LowLevel.Client
|
import Network.GRPC.LowLevel.Client
|
||||||
import Network.GRPC.LowLevel.Call
|
import Network.GRPC.LowLevel.Call
|
||||||
|
|
||||||
|
import Network.GRPC.Unsafe (ConnectivityState(..))
|
||||||
import Network.GRPC.Unsafe.Op (StatusCode(..))
|
import Network.GRPC.Unsafe.Op (StatusCode(..))
|
||||||
|
|
|
@ -61,7 +61,10 @@ serverRegCallGetMetadata ServerRegCall{..} = do
|
||||||
-- | Extract the client request body from the given registered call, if present.
|
-- | Extract the client request body from the given registered call, if present.
|
||||||
-- TODO: the reason this returns @Maybe ByteString@ is because the gRPC library
|
-- TODO: the reason this returns @Maybe ByteString@ is because the gRPC library
|
||||||
-- calls the underlying out parameter "optional_payload". I am not sure exactly
|
-- calls the underlying out parameter "optional_payload". I am not sure exactly
|
||||||
-- in what cases it won't be present.
|
-- in what cases it won't be present. The C++ library checks a
|
||||||
|
-- has_request_payload_ bool and passes in nullptr to request_registered_call
|
||||||
|
-- if the bool is false, so we may be able to do the payload present/absent
|
||||||
|
-- check earlier.
|
||||||
serverRegCallGetPayload :: ServerRegCall -> IO (Maybe ByteString)
|
serverRegCallGetPayload :: ServerRegCall -> IO (Maybe ByteString)
|
||||||
serverRegCallGetPayload ServerRegCall{..} = do
|
serverRegCallGetPayload ServerRegCall{..} = do
|
||||||
bb@(C.ByteBuffer rawPtr) <- peek optionalPayload
|
bb@(C.ByteBuffer rawPtr) <- peek optionalPayload
|
||||||
|
|
|
@ -45,6 +45,10 @@ withClient grpc config = bracket (createClient grpc config)
|
||||||
(\c -> grpcDebug "withClient: destroying."
|
(\c -> grpcDebug "withClient: destroying."
|
||||||
>> destroyClient c)
|
>> destroyClient c)
|
||||||
|
|
||||||
|
clientConnectivity :: Client -> IO C.ConnectivityState
|
||||||
|
clientConnectivity Client{..} =
|
||||||
|
C.grpcChannelCheckConnectivityState clientChannel False
|
||||||
|
|
||||||
-- | Register a method on the client so that we can call it with
|
-- | Register a method on the client so that we can call it with
|
||||||
-- 'clientRegisteredRequest'.
|
-- 'clientRegisteredRequest'.
|
||||||
clientRegisterMethod :: Client
|
clientRegisterMethod :: Client
|
||||||
|
@ -137,9 +141,9 @@ compileNormalRequestResults
|
||||||
OpRecvStatusOnClientResult m2 status details]
|
OpRecvStatusOnClientResult m2 status details]
|
||||||
= Right $ NormalRequestResult body Nothing m2 status (StatusDetails details)
|
= Right $ NormalRequestResult body Nothing m2 status (StatusDetails details)
|
||||||
compileNormalRequestResults x =
|
compileNormalRequestResults x =
|
||||||
case extractStatus x of
|
case extractStatusInfo x of
|
||||||
Nothing -> Left GRPCIOUnknownError
|
Nothing -> Left GRPCIOUnknownError
|
||||||
Just (OpRecvStatusOnClientResult _ status details) ->
|
Just (_meta, status, details) ->
|
||||||
Left (GRPCIOBadStatusCode status (StatusDetails details))
|
Left (GRPCIOBadStatusCode status (StatusDetails details))
|
||||||
|
|
||||||
-- | Make a request of the given method with the given body. Returns the
|
-- | Make a request of the given method with the given body. Returns the
|
||||||
|
@ -176,7 +180,9 @@ clientRegisteredRequest client@(Client{..}) rm@(RegisteredMethod{..})
|
||||||
Left x -> do grpcDebug "clientRegisteredRequest: batch error."
|
Left x -> do grpcDebug "clientRegisteredRequest: batch error."
|
||||||
return $ Left x
|
return $ Left x
|
||||||
Right rs -> do
|
Right rs -> do
|
||||||
let recvOps = [OpRecvMessage, OpRecvStatusOnClient]
|
let recvOps = [OpRecvInitialMetadata,
|
||||||
|
OpRecvMessage,
|
||||||
|
OpRecvStatusOnClient]
|
||||||
recvRes <- runClientOps call clientCQ recvOps timeLimit
|
recvRes <- runClientOps call clientCQ recvOps timeLimit
|
||||||
case recvRes of
|
case recvRes of
|
||||||
Left x -> do
|
Left x -> do
|
||||||
|
|
|
@ -250,10 +250,10 @@ channelCreateCall
|
||||||
|
|
||||||
-- | Create the call object to handle a registered call.
|
-- | Create the call object to handle a registered call.
|
||||||
serverRequestRegisteredCall :: C.Server -> CompletionQueue -> TimeoutSeconds
|
serverRequestRegisteredCall :: C.Server -> CompletionQueue -> TimeoutSeconds
|
||||||
-> RegisteredMethod
|
-> RegisteredMethod -> MetadataMap
|
||||||
-> IO (Either GRPCIOError ServerRegCall)
|
-> IO (Either GRPCIOError ServerRegCall)
|
||||||
serverRequestRegisteredCall
|
serverRequestRegisteredCall
|
||||||
server cq@CompletionQueue{..} timeLimit RegisteredMethod{..} =
|
server cq@CompletionQueue{..} timeLimit RegisteredMethod{..} initMeta =
|
||||||
withPermission Push cq $ do
|
withPermission Push cq $ do
|
||||||
-- TODO: Is gRPC supposed to populate this deadline?
|
-- TODO: Is gRPC supposed to populate this deadline?
|
||||||
-- NOTE: the below stuff is freed when we free the call we return.
|
-- NOTE: the below stuff is freed when we free the call we return.
|
||||||
|
@ -261,6 +261,15 @@ serverRequestRegisteredCall
|
||||||
callPtr <- malloc
|
callPtr <- malloc
|
||||||
metadataArrayPtr <- C.metadataArrayCreate
|
metadataArrayPtr <- C.metadataArrayCreate
|
||||||
metadataArray <- peek metadataArrayPtr
|
metadataArray <- peek metadataArrayPtr
|
||||||
|
#ifdef DEBUG
|
||||||
|
metaCount <- C.metadataArrayGetCount metadataArray
|
||||||
|
metaCap <- C.metadataArrayGetCapacity metadataArray
|
||||||
|
kvPtr <- C.metadataArrayGetMetadata metadataArray
|
||||||
|
grpcDebug $ "grpc-created meta: count: " ++ show metaCount
|
||||||
|
++ " capacity: " ++ show metaCap ++ " ptr: " ++ show kvPtr
|
||||||
|
#endif
|
||||||
|
metadataContents <- C.createMetadata initMeta
|
||||||
|
C.metadataArraySetMetadata metadataArray metadataContents
|
||||||
bbPtr <- malloc
|
bbPtr <- malloc
|
||||||
tag <- newTag cq
|
tag <- newTag cq
|
||||||
callError <- C.grpcServerRequestRegisteredCall
|
callError <- C.grpcServerRequestRegisteredCall
|
||||||
|
|
|
@ -257,7 +257,11 @@ runClientOps :: ClientCall
|
||||||
-> IO (Either GRPCIOError [OpRecvResult])
|
-> IO (Either GRPCIOError [OpRecvResult])
|
||||||
runClientOps = runOps . internalClientCall
|
runClientOps = runOps . internalClientCall
|
||||||
|
|
||||||
extractStatus :: [OpRecvResult] -> Maybe OpRecvResult
|
-- | If response status info is present in the given 'OpRecvResult's, returns
|
||||||
extractStatus [] = Nothing
|
-- a tuple of trailing metadata, status code, and status details.
|
||||||
extractStatus (res@(OpRecvStatusOnClientResult _ _ _):_) = Just res
|
extractStatusInfo :: [OpRecvResult]
|
||||||
extractStatus (_:xs) = extractStatus xs
|
-> Maybe (MetadataMap, C.StatusCode, B.ByteString)
|
||||||
|
extractStatusInfo [] = Nothing
|
||||||
|
extractStatusInfo (res@(OpRecvStatusOnClientResult meta code details):_) =
|
||||||
|
Just (meta, code, details)
|
||||||
|
extractStatusInfo (_:xs) = extractStatusInfo xs
|
||||||
|
|
|
@ -118,16 +118,18 @@ serverRegisterMethod _ _ _ _ = error "Streaming methods not implemented yet."
|
||||||
-- | Create a 'Call' with which to wait for the invocation of a registered
|
-- | Create a 'Call' with which to wait for the invocation of a registered
|
||||||
-- method.
|
-- method.
|
||||||
serverCreateRegisteredCall :: Server -> RegisteredMethod -> TimeoutSeconds
|
serverCreateRegisteredCall :: Server -> RegisteredMethod -> TimeoutSeconds
|
||||||
|
-> MetadataMap
|
||||||
-> IO (Either GRPCIOError ServerRegCall)
|
-> IO (Either GRPCIOError ServerRegCall)
|
||||||
serverCreateRegisteredCall Server{..} rm timeLimit =
|
serverCreateRegisteredCall Server{..} rm timeLimit initMeta =
|
||||||
serverRequestRegisteredCall internalServer serverCQ timeLimit rm
|
serverRequestRegisteredCall internalServer serverCQ timeLimit rm initMeta
|
||||||
|
|
||||||
withServerRegisteredCall :: Server -> RegisteredMethod -> TimeoutSeconds
|
withServerRegisteredCall :: Server -> RegisteredMethod -> TimeoutSeconds
|
||||||
|
-> MetadataMap
|
||||||
-> (ServerRegCall
|
-> (ServerRegCall
|
||||||
-> IO (Either GRPCIOError a))
|
-> IO (Either GRPCIOError a))
|
||||||
-> IO (Either GRPCIOError a)
|
-> IO (Either GRPCIOError a)
|
||||||
withServerRegisteredCall server regmethod timeout f = do
|
withServerRegisteredCall server regmethod timeout initMeta f = do
|
||||||
createResult <- serverCreateRegisteredCall server regmethod timeout
|
createResult <- serverCreateRegisteredCall server regmethod timeout initMeta
|
||||||
case createResult of
|
case createResult of
|
||||||
Left x -> return $ Left x
|
Left x -> return $ Left x
|
||||||
Right call -> f call `finally` logDestroy call
|
Right call -> f call `finally` logDestroy call
|
||||||
|
@ -204,7 +206,7 @@ serverHandleNormalRegisteredCall s@Server{..} rm timeLimit srvMetadata f = do
|
||||||
-- Should we just hard-code time limits instead? Not sure if client
|
-- Should we just hard-code time limits instead? Not sure if client
|
||||||
-- programmer cares, since this function will likely just be put in a loop
|
-- programmer cares, since this function will likely just be put in a loop
|
||||||
-- anyway.
|
-- anyway.
|
||||||
withServerRegisteredCall s rm timeLimit $ \call -> do
|
withServerRegisteredCall s rm timeLimit srvMetadata $ \call -> do
|
||||||
grpcDebug "serverHandleNormalRegisteredCall: starting batch."
|
grpcDebug "serverHandleNormalRegisteredCall: starting batch."
|
||||||
debugServerRegCall call
|
debugServerRegCall call
|
||||||
payload <- serverRegCallGetPayload call
|
payload <- serverRegCallGetPayload call
|
||||||
|
|
|
@ -25,8 +25,6 @@ deriving instance Show MetadataKeyValPtr
|
||||||
-- | Represents a pointer to a grpc_metadata_array. Must be destroyed with
|
-- | Represents a pointer to a grpc_metadata_array. Must be destroyed with
|
||||||
-- 'metadataArrayDestroy'. This type is intended for receiving metadata.
|
-- 'metadataArrayDestroy'. This type is intended for receiving metadata.
|
||||||
-- This can be populated by passing it to e.g. 'grpcServerRequestCall'.
|
-- This can be populated by passing it to e.g. 'grpcServerRequestCall'.
|
||||||
-- TODO: we need a function for getting a 'MetadataKeyValPtr'
|
|
||||||
-- and length from this type.
|
|
||||||
{#pointer *grpc_metadata_array as MetadataArray newtype#}
|
{#pointer *grpc_metadata_array as MetadataArray newtype#}
|
||||||
|
|
||||||
deriving instance Show MetadataArray
|
deriving instance Show MetadataArray
|
||||||
|
@ -34,8 +32,16 @@ deriving instance Show MetadataArray
|
||||||
{#fun metadata_array_get_metadata as ^
|
{#fun metadata_array_get_metadata as ^
|
||||||
{`MetadataArray'} -> `MetadataKeyValPtr'#}
|
{`MetadataArray'} -> `MetadataKeyValPtr'#}
|
||||||
|
|
||||||
|
-- | Overwrites the metadata in the given 'MetadataArray'. The given
|
||||||
|
-- 'MetadataKeyValPtr' *must* have been created with 'createMetadata' in this
|
||||||
|
-- module.
|
||||||
|
{#fun metadata_array_set_metadata as ^
|
||||||
|
{`MetadataArray', `MetadataKeyValPtr'} -> `()'#}
|
||||||
|
|
||||||
{#fun metadata_array_get_count as ^ {`MetadataArray'} -> `Int'#}
|
{#fun metadata_array_get_count as ^ {`MetadataArray'} -> `Int'#}
|
||||||
|
|
||||||
|
{#fun metadata_array_get_capacity as ^ {`MetadataArray'} -> `Int'#}
|
||||||
|
|
||||||
instance Storable MetadataArray where
|
instance Storable MetadataArray where
|
||||||
sizeOf (MetadataArray r) = sizeOf r
|
sizeOf (MetadataArray r) = sizeOf r
|
||||||
alignment (MetadataArray r) = alignment r
|
alignment (MetadataArray r) = alignment r
|
||||||
|
|
|
@ -48,7 +48,9 @@ payloadLowLevelServer = TestServer $ \grpc -> do
|
||||||
withServer grpc conf $ \server -> do
|
withServer grpc conf $ \server -> do
|
||||||
let method = head (registeredMethods server)
|
let method = head (registeredMethods server)
|
||||||
result <- serverHandleNormalRegisteredCall server method 11 M.empty $
|
result <- serverHandleNormalRegisteredCall server method 11 M.empty $
|
||||||
\_reqBody _reqMeta ->
|
\reqBody reqMeta -> do
|
||||||
|
reqMeta M.! "foo_key" @?= "foo_val"
|
||||||
|
reqBody @?= "Hello!"
|
||||||
return ("reply test", dummyMeta, dummyMeta,
|
return ("reply test", dummyMeta, dummyMeta,
|
||||||
StatusDetails "details string")
|
StatusDetails "details string")
|
||||||
case result of
|
case result of
|
||||||
|
@ -60,13 +62,17 @@ payloadLowLevelClient = TestClient $ \grpc ->
|
||||||
withClient grpc (ClientConfig "localhost" 50051) $ \client -> do
|
withClient grpc (ClientConfig "localhost" 50051) $ \client -> do
|
||||||
method <- clientRegisterMethod client "/foo" "localhost" Normal
|
method <- clientRegisterMethod client "/foo" "localhost" Normal
|
||||||
putStrLn "registered method on client."
|
putStrLn "registered method on client."
|
||||||
reqResult <- clientRegisteredRequest client method 10 "Hello!" M.empty
|
let reqMeta = M.fromList [("foo_key", "foo_val")]
|
||||||
|
reqResult <- clientRegisteredRequest client method 10 "Hello!" reqMeta
|
||||||
case reqResult of
|
case reqResult of
|
||||||
Left x -> error $ "Client got error: " ++ show x
|
Left x -> error $ "Client got error: " ++ show x
|
||||||
Right (NormalRequestResult respBody _initMeta _trailingMeta respCode details) -> do
|
Right (NormalRequestResult respBody (Just initMeta) trailingMeta respCode details) -> do
|
||||||
details @?= "details string"
|
details @?= "details string"
|
||||||
respBody @?= "reply test"
|
respBody @?= "reply test"
|
||||||
respCode @?= GrpcStatusOk
|
respCode @?= GrpcStatusOk
|
||||||
|
initMeta M.! "foo" @?= "bar"
|
||||||
|
trailingMeta M.! "foo" @?= "bar"
|
||||||
|
Right (NormalRequestResult _ Nothing _ _ _) -> error $ "got no metadata."
|
||||||
|
|
||||||
payloadLowLevelClientUnregistered :: TestClient
|
payloadLowLevelClientUnregistered :: TestClient
|
||||||
payloadLowLevelClientUnregistered = TestClient $ \grpc -> do
|
payloadLowLevelClientUnregistered = TestClient $ \grpc -> do
|
||||||
|
|
Loading…
Add table
Reference in a new issue