From 9ffdec4c5652349ced6f7ffe804b5f743b06b252 Mon Sep 17 00:00:00 2001 From: Connor Clark Date: Thu, 2 Jun 2016 08:55:29 -0700 Subject: [PATCH] Add ability to send metadata for server registered calls (#15) * projections from CallDetails * refactor Call, refactor clientRegisteredRequest, handle null error * split ServerCall into separate reg/unreg types * pass method name to unreg call handler, finish destroyServerUnregCall * function for checking client connection * test metadata transmission * send initial metadata for registered calls * projections from CallDetails * refactor Call, refactor clientRegisteredRequest, handle null error * split ServerCall into separate reg/unreg types * pass method name to unreg call handler, finish destroyServerUnregCall * function for checking client connection * test metadata transmission * send initial metadata for registered calls --- cbits/grpc_haskell.c | 14 ++++++++++++++ include/grpc_haskell.h | 4 ++++ src/Network/GRPC/LowLevel.hs | 3 +++ src/Network/GRPC/LowLevel/Call.hs | 5 ++++- src/Network/GRPC/LowLevel/Client.hs | 12 +++++++++--- src/Network/GRPC/LowLevel/CompletionQueue.hs | 13 +++++++++++-- src/Network/GRPC/LowLevel/Op.hs | 12 ++++++++---- src/Network/GRPC/LowLevel/Server.hs | 12 +++++++----- src/Network/GRPC/Unsafe/Metadata.chs | 10 ++++++++-- tests/LowLevelTests.hs | 12 +++++++++--- 10 files changed, 77 insertions(+), 20 deletions(-) diff --git a/cbits/grpc_haskell.c b/cbits/grpc_haskell.c index c2f2228..d0790fa 100644 --- a/cbits/grpc_haskell.c +++ b/cbits/grpc_haskell.c @@ -343,10 +343,24 @@ grpc_metadata* metadata_array_get_metadata(grpc_metadata_array* arr){ return arr->metadata; } +void metadata_array_set_metadata(grpc_metadata_array* arr, grpc_metadata* meta){ + arr->metadata = meta; + //NOTE: we assume count == capacity because that's how the 'createMetadata' + //Haskell function works. It isn't safe to call this function if the + //metadata was created in some other way. + size_t n = sizeof(meta); + arr->count = n; + arr->capacity = n; +} + size_t metadata_array_get_count(grpc_metadata_array* arr){ return arr->count; } +size_t metadata_array_get_capacity(grpc_metadata_array* arr){ + return arr->capacity; +} + grpc_call* grpc_channel_create_registered_call_( grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_completion_queue *completion_queue, void *registered_call_handle, diff --git a/include/grpc_haskell.h b/include/grpc_haskell.h index a2d0218..1bd7368 100644 --- a/include/grpc_haskell.h +++ b/include/grpc_haskell.h @@ -116,8 +116,12 @@ void grpc_channel_watch_connectivity_state_(grpc_channel *channel, grpc_metadata* metadata_array_get_metadata(grpc_metadata_array* arr); +void metadata_array_set_metadata(grpc_metadata_array* arr, grpc_metadata* meta); + size_t metadata_array_get_count(grpc_metadata_array* arr); +size_t metadata_array_get_capacity(grpc_metadata_array* arr); + grpc_call* grpc_channel_create_registered_call_( grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_completion_queue *completion_queue, void *registered_call_handle, diff --git a/src/Network/GRPC/LowLevel.hs b/src/Network/GRPC/LowLevel.hs index a0b076a..9023012 100644 --- a/src/Network/GRPC/LowLevel.hs +++ b/src/Network/GRPC/LowLevel.hs @@ -39,6 +39,8 @@ GRPC , ClientConfig(..) , Client , ClientCall +, ConnectivityState(..) +, clientConnectivity , withClient , clientRegisterMethod , clientRegisteredRequest @@ -62,4 +64,5 @@ import Network.GRPC.LowLevel.Op import Network.GRPC.LowLevel.Client import Network.GRPC.LowLevel.Call +import Network.GRPC.Unsafe (ConnectivityState(..)) import Network.GRPC.Unsafe.Op (StatusCode(..)) diff --git a/src/Network/GRPC/LowLevel/Call.hs b/src/Network/GRPC/LowLevel/Call.hs index 53fca50..5c9d885 100644 --- a/src/Network/GRPC/LowLevel/Call.hs +++ b/src/Network/GRPC/LowLevel/Call.hs @@ -61,7 +61,10 @@ serverRegCallGetMetadata ServerRegCall{..} = do -- | Extract the client request body from the given registered call, if present. -- TODO: the reason this returns @Maybe ByteString@ is because the gRPC library -- calls the underlying out parameter "optional_payload". I am not sure exactly --- in what cases it won't be present. +-- in what cases it won't be present. The C++ library checks a +-- has_request_payload_ bool and passes in nullptr to request_registered_call +-- if the bool is false, so we may be able to do the payload present/absent +-- check earlier. serverRegCallGetPayload :: ServerRegCall -> IO (Maybe ByteString) serverRegCallGetPayload ServerRegCall{..} = do bb@(C.ByteBuffer rawPtr) <- peek optionalPayload diff --git a/src/Network/GRPC/LowLevel/Client.hs b/src/Network/GRPC/LowLevel/Client.hs index 717b8c3..9d8603f 100644 --- a/src/Network/GRPC/LowLevel/Client.hs +++ b/src/Network/GRPC/LowLevel/Client.hs @@ -45,6 +45,10 @@ withClient grpc config = bracket (createClient grpc config) (\c -> grpcDebug "withClient: destroying." >> destroyClient c) +clientConnectivity :: Client -> IO C.ConnectivityState +clientConnectivity Client{..} = + C.grpcChannelCheckConnectivityState clientChannel False + -- | Register a method on the client so that we can call it with -- 'clientRegisteredRequest'. clientRegisterMethod :: Client @@ -137,9 +141,9 @@ compileNormalRequestResults OpRecvStatusOnClientResult m2 status details] = Right $ NormalRequestResult body Nothing m2 status (StatusDetails details) compileNormalRequestResults x = - case extractStatus x of + case extractStatusInfo x of Nothing -> Left GRPCIOUnknownError - Just (OpRecvStatusOnClientResult _ status details) -> + Just (_meta, status, details) -> Left (GRPCIOBadStatusCode status (StatusDetails details)) -- | Make a request of the given method with the given body. Returns the @@ -176,7 +180,9 @@ clientRegisteredRequest client@(Client{..}) rm@(RegisteredMethod{..}) Left x -> do grpcDebug "clientRegisteredRequest: batch error." return $ Left x Right rs -> do - let recvOps = [OpRecvMessage, OpRecvStatusOnClient] + let recvOps = [OpRecvInitialMetadata, + OpRecvMessage, + OpRecvStatusOnClient] recvRes <- runClientOps call clientCQ recvOps timeLimit case recvRes of Left x -> do diff --git a/src/Network/GRPC/LowLevel/CompletionQueue.hs b/src/Network/GRPC/LowLevel/CompletionQueue.hs index 542e3a2..ae7005d 100644 --- a/src/Network/GRPC/LowLevel/CompletionQueue.hs +++ b/src/Network/GRPC/LowLevel/CompletionQueue.hs @@ -250,10 +250,10 @@ channelCreateCall -- | Create the call object to handle a registered call. serverRequestRegisteredCall :: C.Server -> CompletionQueue -> TimeoutSeconds - -> RegisteredMethod + -> RegisteredMethod -> MetadataMap -> IO (Either GRPCIOError ServerRegCall) serverRequestRegisteredCall - server cq@CompletionQueue{..} timeLimit RegisteredMethod{..} = + server cq@CompletionQueue{..} timeLimit RegisteredMethod{..} initMeta = withPermission Push cq $ do -- TODO: Is gRPC supposed to populate this deadline? -- NOTE: the below stuff is freed when we free the call we return. @@ -261,6 +261,15 @@ serverRequestRegisteredCall callPtr <- malloc metadataArrayPtr <- C.metadataArrayCreate metadataArray <- peek metadataArrayPtr + #ifdef DEBUG + metaCount <- C.metadataArrayGetCount metadataArray + metaCap <- C.metadataArrayGetCapacity metadataArray + kvPtr <- C.metadataArrayGetMetadata metadataArray + grpcDebug $ "grpc-created meta: count: " ++ show metaCount + ++ " capacity: " ++ show metaCap ++ " ptr: " ++ show kvPtr + #endif + metadataContents <- C.createMetadata initMeta + C.metadataArraySetMetadata metadataArray metadataContents bbPtr <- malloc tag <- newTag cq callError <- C.grpcServerRequestRegisteredCall diff --git a/src/Network/GRPC/LowLevel/Op.hs b/src/Network/GRPC/LowLevel/Op.hs index 3f5747a..fc39016 100644 --- a/src/Network/GRPC/LowLevel/Op.hs +++ b/src/Network/GRPC/LowLevel/Op.hs @@ -257,7 +257,11 @@ runClientOps :: ClientCall -> IO (Either GRPCIOError [OpRecvResult]) runClientOps = runOps . internalClientCall -extractStatus :: [OpRecvResult] -> Maybe OpRecvResult -extractStatus [] = Nothing -extractStatus (res@(OpRecvStatusOnClientResult _ _ _):_) = Just res -extractStatus (_:xs) = extractStatus xs +-- | If response status info is present in the given 'OpRecvResult's, returns +-- a tuple of trailing metadata, status code, and status details. +extractStatusInfo :: [OpRecvResult] + -> Maybe (MetadataMap, C.StatusCode, B.ByteString) +extractStatusInfo [] = Nothing +extractStatusInfo (res@(OpRecvStatusOnClientResult meta code details):_) = + Just (meta, code, details) +extractStatusInfo (_:xs) = extractStatusInfo xs diff --git a/src/Network/GRPC/LowLevel/Server.hs b/src/Network/GRPC/LowLevel/Server.hs index 4a36a78..aaa124b 100644 --- a/src/Network/GRPC/LowLevel/Server.hs +++ b/src/Network/GRPC/LowLevel/Server.hs @@ -118,16 +118,18 @@ serverRegisterMethod _ _ _ _ = error "Streaming methods not implemented yet." -- | Create a 'Call' with which to wait for the invocation of a registered -- method. serverCreateRegisteredCall :: Server -> RegisteredMethod -> TimeoutSeconds + -> MetadataMap -> IO (Either GRPCIOError ServerRegCall) -serverCreateRegisteredCall Server{..} rm timeLimit = - serverRequestRegisteredCall internalServer serverCQ timeLimit rm +serverCreateRegisteredCall Server{..} rm timeLimit initMeta = + serverRequestRegisteredCall internalServer serverCQ timeLimit rm initMeta withServerRegisteredCall :: Server -> RegisteredMethod -> TimeoutSeconds + -> MetadataMap -> (ServerRegCall -> IO (Either GRPCIOError a)) -> IO (Either GRPCIOError a) -withServerRegisteredCall server regmethod timeout f = do - createResult <- serverCreateRegisteredCall server regmethod timeout +withServerRegisteredCall server regmethod timeout initMeta f = do + createResult <- serverCreateRegisteredCall server regmethod timeout initMeta case createResult of Left x -> return $ Left x Right call -> f call `finally` logDestroy call @@ -204,7 +206,7 @@ serverHandleNormalRegisteredCall s@Server{..} rm timeLimit srvMetadata f = do -- Should we just hard-code time limits instead? Not sure if client -- programmer cares, since this function will likely just be put in a loop -- anyway. - withServerRegisteredCall s rm timeLimit $ \call -> do + withServerRegisteredCall s rm timeLimit srvMetadata $ \call -> do grpcDebug "serverHandleNormalRegisteredCall: starting batch." debugServerRegCall call payload <- serverRegCallGetPayload call diff --git a/src/Network/GRPC/Unsafe/Metadata.chs b/src/Network/GRPC/Unsafe/Metadata.chs index 13264fd..0ab7121 100644 --- a/src/Network/GRPC/Unsafe/Metadata.chs +++ b/src/Network/GRPC/Unsafe/Metadata.chs @@ -25,8 +25,6 @@ deriving instance Show MetadataKeyValPtr -- | Represents a pointer to a grpc_metadata_array. Must be destroyed with -- 'metadataArrayDestroy'. This type is intended for receiving metadata. -- This can be populated by passing it to e.g. 'grpcServerRequestCall'. --- TODO: we need a function for getting a 'MetadataKeyValPtr' --- and length from this type. {#pointer *grpc_metadata_array as MetadataArray newtype#} deriving instance Show MetadataArray @@ -34,8 +32,16 @@ deriving instance Show MetadataArray {#fun metadata_array_get_metadata as ^ {`MetadataArray'} -> `MetadataKeyValPtr'#} +-- | Overwrites the metadata in the given 'MetadataArray'. The given +-- 'MetadataKeyValPtr' *must* have been created with 'createMetadata' in this +-- module. +{#fun metadata_array_set_metadata as ^ + {`MetadataArray', `MetadataKeyValPtr'} -> `()'#} + {#fun metadata_array_get_count as ^ {`MetadataArray'} -> `Int'#} +{#fun metadata_array_get_capacity as ^ {`MetadataArray'} -> `Int'#} + instance Storable MetadataArray where sizeOf (MetadataArray r) = sizeOf r alignment (MetadataArray r) = alignment r diff --git a/tests/LowLevelTests.hs b/tests/LowLevelTests.hs index 035741c..f5a9d4d 100644 --- a/tests/LowLevelTests.hs +++ b/tests/LowLevelTests.hs @@ -48,7 +48,9 @@ payloadLowLevelServer = TestServer $ \grpc -> do withServer grpc conf $ \server -> do let method = head (registeredMethods server) result <- serverHandleNormalRegisteredCall server method 11 M.empty $ - \_reqBody _reqMeta -> + \reqBody reqMeta -> do + reqMeta M.! "foo_key" @?= "foo_val" + reqBody @?= "Hello!" return ("reply test", dummyMeta, dummyMeta, StatusDetails "details string") case result of @@ -60,13 +62,17 @@ payloadLowLevelClient = TestClient $ \grpc -> withClient grpc (ClientConfig "localhost" 50051) $ \client -> do method <- clientRegisterMethod client "/foo" "localhost" Normal putStrLn "registered method on client." - reqResult <- clientRegisteredRequest client method 10 "Hello!" M.empty + let reqMeta = M.fromList [("foo_key", "foo_val")] + reqResult <- clientRegisteredRequest client method 10 "Hello!" reqMeta case reqResult of Left x -> error $ "Client got error: " ++ show x - Right (NormalRequestResult respBody _initMeta _trailingMeta respCode details) -> do + Right (NormalRequestResult respBody (Just initMeta) trailingMeta respCode details) -> do details @?= "details string" respBody @?= "reply test" respCode @?= GrpcStatusOk + initMeta M.! "foo" @?= "bar" + trailingMeta M.! "foo" @?= "bar" + Right (NormalRequestResult _ Nothing _ _ _) -> error $ "got no metadata." payloadLowLevelClientUnregistered :: TestClient payloadLowLevelClientUnregistered = TestClient $ \grpc -> do