mirror of
https://github.com/unclechu/gRPC-haskell.git
synced 2024-12-25 11:19:44 +01:00
Merge pull request #9 from connor/joel/low-level-testing
Prelim cleanup and reorg
This commit is contained in:
commit
e039adc2b7
9 changed files with 410 additions and 389 deletions
|
@ -56,7 +56,7 @@ library
|
|||
, grpc/impl/codegen/slice.h
|
||||
build-tools: c2hs
|
||||
default-language: Haskell2010
|
||||
ghc-options: -Wall -fwarn-incomplete-patterns
|
||||
ghc-options: -Wall -fwarn-incomplete-patterns -fno-warn-unused-do-bind
|
||||
include-dirs: include
|
||||
hs-source-dirs: src
|
||||
default-extensions: CPP
|
||||
|
@ -76,9 +76,11 @@ test-suite test
|
|||
, tasty >= 0.11 && <0.12
|
||||
, tasty-hunit >= 0.9 && <0.10
|
||||
, containers ==0.5.*
|
||||
other-modules: LowLevelTests
|
||||
other-modules:
|
||||
LowLevelTests,
|
||||
UnsafeTests
|
||||
default-language: Haskell2010
|
||||
ghc-options: -Wall -fwarn-incomplete-patterns -g -threaded
|
||||
ghc-options: -Wall -fwarn-incomplete-patterns -fno-warn-unused-do-bind -g -threaded
|
||||
hs-source-dirs: tests
|
||||
main-is: Properties.hs
|
||||
type: exitcode-stdio-1.0
|
||||
|
|
|
@ -107,3 +107,9 @@ destroyCall call@ServerCall{..} = do
|
|||
forM_ callDetails C.destroyCallDetails
|
||||
grpcDebug $ "destroying deadline." ++ show callDeadline
|
||||
forM_ callDeadline C.timespecDestroy
|
||||
|
||||
_nowarn_unused :: a
|
||||
_nowarn_unused =
|
||||
castPtr `undefined`
|
||||
(peek :: Ptr Int -> IO Int) `undefined`
|
||||
()
|
||||
|
|
|
@ -6,8 +6,8 @@
|
|||
|
||||
{-# LANGUAGE RecordWildCards #-}
|
||||
|
||||
module Network.GRPC.LowLevel.CompletionQueue (
|
||||
CompletionQueue
|
||||
module Network.GRPC.LowLevel.CompletionQueue
|
||||
( CompletionQueue
|
||||
, withCompletionQueue
|
||||
, createCompletionQueue
|
||||
, shutdownCompletionQueue
|
||||
|
@ -16,33 +16,35 @@ module Network.GRPC.LowLevel.CompletionQueue (
|
|||
, channelCreateRegisteredCall
|
||||
, channelCreateCall
|
||||
, TimeoutSeconds
|
||||
, eventSuccess
|
||||
, isEventSuccessful
|
||||
, serverRegisterCompletionQueue
|
||||
, serverShutdownAndNotify
|
||||
, serverRequestRegisteredCall
|
||||
, serverRequestCall
|
||||
, newTag
|
||||
) where
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent (forkIO, threadDelay)
|
||||
import Control.Concurrent.STM (atomically, retry, check)
|
||||
import Control.Concurrent.STM.TVar (TVar, newTVarIO, modifyTVar',
|
||||
readTVar, writeTVar)
|
||||
import Control.Exception (bracket)
|
||||
import Data.IORef (IORef, newIORef, atomicModifyIORef')
|
||||
import Data.List (intersperse)
|
||||
import Foreign.Marshal.Alloc (malloc, free)
|
||||
import Foreign.Ptr (nullPtr, plusPtr)
|
||||
import Foreign.Storable (peek)
|
||||
import qualified Network.GRPC.Unsafe as C
|
||||
import Control.Concurrent (forkIO, threadDelay)
|
||||
import Control.Concurrent.STM (atomically, check, retry)
|
||||
import Control.Concurrent.STM.TVar (TVar, modifyTVar', newTVarIO,
|
||||
readTVar, writeTVar)
|
||||
import Control.Exception (bracket)
|
||||
import Data.IORef (IORef, atomicModifyIORef',
|
||||
newIORef)
|
||||
import Data.List (intersperse)
|
||||
import Foreign.Marshal.Alloc (free, malloc)
|
||||
import Foreign.Ptr (nullPtr, plusPtr)
|
||||
import Foreign.Storable (peek)
|
||||
import qualified Network.GRPC.Unsafe as C
|
||||
import qualified Network.GRPC.Unsafe.Constants as C
|
||||
import qualified Network.GRPC.Unsafe.Time as C
|
||||
import qualified Network.GRPC.Unsafe.Op as C
|
||||
import qualified Network.GRPC.Unsafe.Metadata as C
|
||||
import System.Timeout (timeout)
|
||||
import qualified Network.GRPC.Unsafe.Metadata as C
|
||||
import qualified Network.GRPC.Unsafe.Op as C
|
||||
import qualified Network.GRPC.Unsafe.Time as C
|
||||
import System.Timeout (timeout)
|
||||
|
||||
import Network.GRPC.LowLevel.GRPC
|
||||
import Network.GRPC.LowLevel.Call
|
||||
import Network.GRPC.LowLevel.Call
|
||||
import Network.GRPC.LowLevel.GRPC
|
||||
|
||||
-- NOTE: the concurrency requirements for a CompletionQueue are a little
|
||||
-- complicated. There are two read operations: next and pluck. We can either
|
||||
|
@ -69,7 +71,7 @@ import Network.GRPC.LowLevel.Call
|
|||
-- are used to wait for batches gRPC operations ('Op's) to finish running, as
|
||||
-- well as wait for various other operations, such as server shutdown, pinging,
|
||||
-- checking to see if we've been disconnected, and so forth.
|
||||
data CompletionQueue = CompletionQueue {unsafeCQ :: C.CompletionQueue,
|
||||
data CompletionQueue = CompletionQueue {unsafeCQ :: C.CompletionQueue,
|
||||
-- ^ All access to this field must be
|
||||
-- guarded by a check of 'shuttingDown'.
|
||||
currentPluckers :: TVar Int,
|
||||
|
@ -78,15 +80,15 @@ data CompletionQueue = CompletionQueue {unsafeCQ :: C.CompletionQueue,
|
|||
-- queue.
|
||||
-- The max value is set by gRPC in
|
||||
-- 'C.maxCompletionQueuePluckers'
|
||||
currentPushers :: TVar Int,
|
||||
currentPushers :: TVar Int,
|
||||
-- ^ Used to prevent new work from
|
||||
-- being pushed onto the queue when
|
||||
-- the queue begins to shut down.
|
||||
shuttingDown :: TVar Bool,
|
||||
shuttingDown :: TVar Bool,
|
||||
-- ^ Used to prevent new pluck calls on
|
||||
-- the queue when the queue begins to
|
||||
-- shut down.
|
||||
nextTag :: IORef Int
|
||||
nextTag :: IORef Int
|
||||
-- ^ Used to supply unique tags for work
|
||||
-- items pushed onto the queue.
|
||||
}
|
||||
|
@ -159,13 +161,14 @@ eventToError (C.Event C.QueueShutdown _ _) = Left GRPCIOShutdown
|
|||
eventToError (C.Event C.QueueTimeout _ _) = Left GRPCIOTimeout
|
||||
eventToError _ = Left GRPCIOUnknownError
|
||||
|
||||
isFailedEvent :: C.Event -> Bool
|
||||
isFailedEvent C.Event{..} = (eventCompletionType /= C.OpComplete)
|
||||
|| not eventSuccess
|
||||
-- | Returns true iff the given grpc_event was a success.
|
||||
isEventSuccessful :: C.Event -> Bool
|
||||
isEventSuccessful (C.Event C.OpComplete True _) = True
|
||||
isEventSuccessful _ = False
|
||||
|
||||
-- | Waits for the given number of seconds for the given tag to appear on the
|
||||
-- completion queue. Throws 'GRPCIOShutdown' if the completion queue is shutting
|
||||
--down and cannot handle new requests.
|
||||
-- down and cannot handle new requests.
|
||||
pluck :: CompletionQueue -> C.Tag -> TimeoutSeconds
|
||||
-> IO (Either GRPCIOError ())
|
||||
pluck cq@CompletionQueue{..} tag waitSeconds = do
|
||||
|
@ -175,9 +178,7 @@ pluck cq@CompletionQueue{..} tag waitSeconds = do
|
|||
C.withDeadlineSeconds waitSeconds $ \deadline -> do
|
||||
ev <- C.grpcCompletionQueuePluck unsafeCQ tag deadline C.reserved
|
||||
grpcDebug $ "pluck: finished. Event: " ++ show ev
|
||||
if isFailedEvent ev
|
||||
then return $ eventToError ev
|
||||
else return $ Right ()
|
||||
return $ if isEventSuccessful ev then Right () else eventToError ev
|
||||
|
||||
-- TODO: I'm thinking it might be easier to use 'Either' uniformly everywhere
|
||||
-- even when it's isomorphic to 'Maybe'. If that doesn't turn out to be the
|
||||
|
@ -207,7 +208,7 @@ shutdownCompletionQueue (CompletionQueue{..}) = do
|
|||
atomically $ readTVar currentPluckers >>= \x -> check (x == 0)
|
||||
--drain the queue
|
||||
C.grpcCompletionQueueShutdown unsafeCQ
|
||||
loopRes <- timeout (5*10^6) drainLoop
|
||||
loopRes <- timeout (5*10^(6::Int)) drainLoop
|
||||
case loopRes of
|
||||
Nothing -> return $ Left GRPCIOShutdownFailure
|
||||
Just () -> C.grpcCompletionQueueDestroy unsafeCQ >> return (Right ())
|
||||
|
@ -221,11 +222,6 @@ shutdownCompletionQueue (CompletionQueue{..}) = do
|
|||
C.QueueTimeout -> drainLoop
|
||||
C.OpComplete -> drainLoop
|
||||
|
||||
-- | Returns true iff the given grpc_event was a success.
|
||||
eventSuccess :: C.Event -> Bool
|
||||
eventSuccess (C.Event C.OpComplete True _) = True
|
||||
eventSuccess _ = False
|
||||
|
||||
channelCreateRegisteredCall :: C.Channel -> C.Call -> C.PropagationMask
|
||||
-> CompletionQueue -> C.CallHandle
|
||||
-> C.CTimeSpecPtr -> IO (Either GRPCIOError Call)
|
||||
|
@ -290,7 +286,7 @@ serverRequestRegisteredCall
|
|||
return $ Right assembledCall
|
||||
-- TODO: see TODO for failureCleanup in serverRequestCall.
|
||||
where failureCleanup deadline callPtr metadataArrayPtr bbPtr = forkIO $ do
|
||||
threadDelay (30*10^6)
|
||||
threadDelaySecs 30
|
||||
grpcDebug "serverRequestRegisteredCall: doing delayed cleanup."
|
||||
C.timespecDestroy deadline
|
||||
free callPtr
|
||||
|
@ -340,7 +336,7 @@ serverRequestCall server cq@CompletionQueue{..} timeLimit =
|
|||
-- we sleep for a while before freeing the objects. We should find a
|
||||
-- permanent solution that's more robust.
|
||||
where failureCleanup callPtr callDetails metadataArrayPtr = forkIO $ do
|
||||
threadDelay (30*10^6)
|
||||
threadDelaySecs 30
|
||||
grpcDebug "serverRequestCall: doing delayed cleanup."
|
||||
free callPtr
|
||||
C.destroyCallDetails callDetails
|
||||
|
@ -356,3 +352,6 @@ serverRegisterCompletionQueue server CompletionQueue{..} =
|
|||
serverShutdownAndNotify :: C.Server -> CompletionQueue -> C.Tag -> IO ()
|
||||
serverShutdownAndNotify server CompletionQueue{..} tag =
|
||||
C.grpcServerShutdownAndNotify server unsafeCQ tag
|
||||
|
||||
threadDelaySecs :: Int -> IO ()
|
||||
threadDelaySecs = threadDelay . (* 10^(6::Int))
|
||||
|
|
|
@ -51,7 +51,7 @@ grpcDebug :: String -> IO ()
|
|||
grpcDebug str = do tid <- myThreadId
|
||||
putStrLn $ (show tid) ++ ": " ++ str
|
||||
#else
|
||||
grpcDebug str = return ()
|
||||
grpcDebug _ = return ()
|
||||
#endif
|
||||
|
||||
{-
|
||||
|
|
|
@ -1,26 +1,27 @@
|
|||
{-# LANGUAGE RecordWildCards #-}
|
||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
|
||||
{-# LANGUAGE RecordWildCards #-}
|
||||
|
||||
module Network.GRPC.LowLevel.Op where
|
||||
|
||||
import Control.Exception
|
||||
import qualified Data.ByteString as B
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (catMaybes)
|
||||
import Data.String (IsString)
|
||||
import Foreign.C.String (CString)
|
||||
import Foreign.C.Types (CInt)
|
||||
import Foreign.Marshal.Alloc (malloc, mallocBytes, free)
|
||||
import Foreign.Ptr (Ptr, nullPtr)
|
||||
import Foreign.Storable (peek, poke)
|
||||
import qualified Network.GRPC.Unsafe as C
|
||||
import qualified Network.GRPC.Unsafe.Metadata as C
|
||||
import qualified Network.GRPC.Unsafe.ByteBuffer as C
|
||||
import qualified Network.GRPC.Unsafe.Op as C
|
||||
import qualified Data.ByteString as B
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (catMaybes)
|
||||
import Data.String (IsString)
|
||||
import Foreign.C.String (CString)
|
||||
import Foreign.C.Types (CInt)
|
||||
import Foreign.Marshal.Alloc (free, malloc,
|
||||
mallocBytes)
|
||||
import Foreign.Ptr (Ptr, nullPtr)
|
||||
import Foreign.Storable (peek, poke)
|
||||
import qualified Network.GRPC.Unsafe as C ()
|
||||
import qualified Network.GRPC.Unsafe.ByteBuffer as C
|
||||
import qualified Network.GRPC.Unsafe.Metadata as C
|
||||
import qualified Network.GRPC.Unsafe.Op as C
|
||||
|
||||
import Network.GRPC.LowLevel.GRPC
|
||||
import Network.GRPC.LowLevel.CompletionQueue
|
||||
import Network.GRPC.LowLevel.Call
|
||||
import Network.GRPC.LowLevel.Call
|
||||
import Network.GRPC.LowLevel.CompletionQueue
|
||||
import Network.GRPC.LowLevel.GRPC
|
||||
|
||||
type MetadataMap = M.Map B.ByteString B.ByteString
|
||||
|
||||
|
@ -224,3 +225,6 @@ runOps call cq ops timeLimit =
|
|||
grpcDebug "runOps: got good op; starting."
|
||||
fmap (Right . catMaybes) $ mapM resultFromOpContext contexts
|
||||
Left err -> return $ Left err
|
||||
|
||||
_nowarn_unused :: a
|
||||
_nowarn_unused = undefined nullPtr
|
||||
|
|
|
@ -2,26 +2,31 @@
|
|||
|
||||
module Network.GRPC.LowLevel.Server where
|
||||
|
||||
import Control.Concurrent (threadDelay)
|
||||
import Control.Exception (bracket, finally)
|
||||
import Control.Concurrent (threadDelay)
|
||||
import Control.Exception (bracket, finally)
|
||||
import Control.Monad
|
||||
import Data.ByteString (ByteString)
|
||||
import qualified Data.Map as M
|
||||
import Foreign.Ptr (nullPtr)
|
||||
import Foreign.Storable (peek)
|
||||
import qualified Network.GRPC.Unsafe as C
|
||||
import qualified Network.GRPC.Unsafe.Op as C
|
||||
import Data.ByteString (ByteString)
|
||||
import qualified Data.Map as M
|
||||
import Foreign.Ptr (nullPtr)
|
||||
import Foreign.Storable (peek)
|
||||
import qualified Network.GRPC.Unsafe as C
|
||||
import qualified Network.GRPC.Unsafe.Op as C
|
||||
|
||||
import Network.GRPC.LowLevel.GRPC
|
||||
import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue,
|
||||
pluck, serverRegisterCompletionQueue, serverShutdownAndNotify,
|
||||
createCompletionQueue, shutdownCompletionQueue, TimeoutSeconds,
|
||||
serverRequestRegisteredCall, serverRequestCall)
|
||||
import Network.GRPC.LowLevel.Call
|
||||
import Network.GRPC.LowLevel.CompletionQueue (CompletionQueue,
|
||||
TimeoutSeconds,
|
||||
createCompletionQueue,
|
||||
pluck,
|
||||
serverRegisterCompletionQueue,
|
||||
serverRequestCall,
|
||||
serverRequestRegisteredCall,
|
||||
serverShutdownAndNotify,
|
||||
shutdownCompletionQueue)
|
||||
import Network.GRPC.LowLevel.GRPC
|
||||
import Network.GRPC.LowLevel.Op
|
||||
|
||||
import qualified Network.GRPC.Unsafe.ByteBuffer as C
|
||||
import qualified Network.GRPC.Unsafe.Metadata as C
|
||||
import qualified Network.GRPC.Unsafe.ByteBuffer as C
|
||||
import qualified Network.GRPC.Unsafe.Metadata as C
|
||||
|
||||
-- | Wraps various gRPC state needed to run a server.
|
||||
data Server = Server {internalServer :: C.Server, serverCQ :: CompletionQueue,
|
||||
|
@ -30,10 +35,10 @@ data Server = Server {internalServer :: C.Server, serverCQ :: CompletionQueue,
|
|||
-- | Configuration needed to start a server. There might be more fields that
|
||||
-- need to be added to this in the future.
|
||||
data ServerConfig =
|
||||
ServerConfig {hostName :: Host,
|
||||
ServerConfig {hostName :: Host,
|
||||
-- ^ Name of the host the server is running on. Not sure
|
||||
-- how this is used. Setting to "localhost" works fine in tests.
|
||||
port :: Int,
|
||||
port :: Int,
|
||||
-- ^ Port to listen for requests on.
|
||||
methodsToRegister :: [(MethodName, Host, GRPCMethodType)]
|
||||
-- ^ List of (method name, method host, method type) tuples
|
||||
|
@ -199,7 +204,7 @@ serverHandleNormalRegisteredCall :: Server
|
|||
-- metadata and returns a response body and
|
||||
-- metadata.
|
||||
-> IO (Either GRPCIOError ())
|
||||
serverHandleNormalRegisteredCall s@Server{..} rm timeLimit initMetadata f = do
|
||||
serverHandleNormalRegisteredCall s@Server{..} rm timeLimit srvMetadata f = do
|
||||
-- TODO: we use this timeLimit twice, so the max time spent is 2*timeLimit.
|
||||
-- Should we just hard-code time limits instead? Not sure if client
|
||||
-- programmer cares, since this function will likely just be put in a loop
|
||||
|
@ -229,16 +234,16 @@ serverHandleNormalRegisteredCall s@Server{..} rm timeLimit initMetadata f = do
|
|||
-- | Handle one unregistered call.
|
||||
serverHandleNormalCall :: Server -> TimeoutSeconds
|
||||
-> MetadataMap
|
||||
-- ^ Initial metadata.
|
||||
-- ^ Initial server metadata.
|
||||
-> (ByteString -> MetadataMap
|
||||
-> IO (ByteString, MetadataMap, StatusDetails))
|
||||
-- ^ Handler function takes a request body and
|
||||
-- metadata and returns a response body and metadata.
|
||||
-> IO (Either GRPCIOError ())
|
||||
serverHandleNormalCall s@Server{..} timeLimit initMetadata f = do
|
||||
serverHandleNormalCall s@Server{..} timeLimit srvMetadata f = do
|
||||
withServerCall s timeLimit $ \call -> do
|
||||
grpcDebug "serverHandleNormalCall: starting batch."
|
||||
let recvOps = serverOpsGetNormalCall initMetadata
|
||||
let recvOps = serverOpsGetNormalCall srvMetadata
|
||||
opResults <- runOps call serverCQ recvOps timeLimit
|
||||
case opResults of
|
||||
Left x -> return $ Left x
|
||||
|
@ -255,3 +260,6 @@ serverHandleNormalCall s@Server{..} timeLimit initMetadata f = do
|
|||
Right _ -> grpcDebug "serverHandleNormalCall: ops done."
|
||||
>> return (Right ())
|
||||
x -> error $ "impossible pattern match: " ++ show x
|
||||
|
||||
_nowarn_unused :: a
|
||||
_nowarn_unused = undefined threadDelay
|
||||
|
|
|
@ -1,13 +1,14 @@
|
|||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module LowLevelTests where
|
||||
module LowLevelTests (lowLevelTests) where
|
||||
|
||||
import Control.Concurrent.Async (withAsync, wait)
|
||||
import Data.ByteString (ByteString)
|
||||
import qualified Data.Map as M
|
||||
import Control.Concurrent.Async
|
||||
import Control.Monad
|
||||
import Data.ByteString (ByteString)
|
||||
import qualified Data.Map as M
|
||||
import Network.GRPC.LowLevel
|
||||
import Test.Tasty
|
||||
import Test.Tasty.HUnit ((@?=), testCase)
|
||||
import Test.Tasty.HUnit as HU (testCase, (@?=))
|
||||
|
||||
lowLevelTests :: TestTree
|
||||
lowLevelTests = testGroup "Unit tests of low-level Haskell library"
|
||||
|
@ -23,85 +24,79 @@ lowLevelTests = testGroup "Unit tests of low-level Haskell library"
|
|||
, testPayloadLowLevelUnregistered
|
||||
]
|
||||
|
||||
dummyMeta :: M.Map ByteString ByteString
|
||||
dummyMeta = M.fromList [("foo","bar")]
|
||||
|
||||
testGRPCBracket :: TestTree
|
||||
testGRPCBracket = testCase "No errors starting and stopping GRPC" $
|
||||
withGRPC $ const $ return ()
|
||||
testGRPCBracket = grpcTest "Start/stop GRPC" nop
|
||||
|
||||
testCompletionQueueCreateDestroy :: TestTree
|
||||
testCompletionQueueCreateDestroy =
|
||||
testCase "No errors creating and destroying a CQ" $ withGRPC $ \grpc ->
|
||||
withCompletionQueue grpc $ const (return ())
|
||||
grpcTest "Create/destroy completion queue" $ \grpc -> do
|
||||
withCompletionQueue grpc nop
|
||||
|
||||
testServerCreateDestroy :: TestTree
|
||||
testServerCreateDestroy =
|
||||
testCase "No errors when starting and stopping a server" $
|
||||
withGRPC $ \grpc -> withServer grpc (ServerConfig "localhost" 50051 [])
|
||||
(const $ return ())
|
||||
grpcTest "Server - start/stop" $ \grpc -> do
|
||||
withServer grpc (ServerConfig "localhost" 50051 []) nop
|
||||
|
||||
testClientCreateDestroy :: TestTree
|
||||
testClientCreateDestroy =
|
||||
testCase "No errors when starting and stopping a client" $
|
||||
withGRPC $ \grpc -> withClient grpc (ClientConfig "localhost" 50051)
|
||||
(const $ return ())
|
||||
grpcTest "Client - start/stop" $ \grpc -> do
|
||||
withClient grpc (ClientConfig "localhost" 50051) nop
|
||||
|
||||
testPayloadLowLevelServer :: GRPC -> IO ()
|
||||
testPayloadLowLevelServer grpc = do
|
||||
payloadLowLevelServer :: TestServer
|
||||
payloadLowLevelServer = TestServer $ \grpc -> do
|
||||
let conf = (ServerConfig "localhost" 50051 [("/foo", "localhost", Normal)])
|
||||
withServer grpc conf $ \server -> do
|
||||
let method = head (registeredMethods server)
|
||||
result <- serverHandleNormalRegisteredCall server method 11 M.empty $
|
||||
\reqBody reqMeta ->
|
||||
\_reqBody _reqMeta ->
|
||||
return ("reply test", dummyMeta, dummyMeta,
|
||||
StatusDetails "details string")
|
||||
case result of
|
||||
Left err -> error $ show err
|
||||
Right _ -> return ()
|
||||
|
||||
testPayloadLowLevelClient :: GRPC -> IO ()
|
||||
testPayloadLowLevelClient grpc =
|
||||
payloadLowLevelClient :: TestClient
|
||||
payloadLowLevelClient = TestClient $ \grpc ->
|
||||
withClient grpc (ClientConfig "localhost" 50051) $ \client -> do
|
||||
method <- clientRegisterMethod client "/foo" "localhost" Normal
|
||||
putStrLn "registered method on client."
|
||||
reqResult <- clientRegisteredRequest client method 10 "Hello!" M.empty
|
||||
case reqResult of
|
||||
Left x -> error $ "Client got error: " ++ show x
|
||||
Right (NormalRequestResult respBody initMeta trailingMeta respCode details) -> do
|
||||
Right (NormalRequestResult respBody _initMeta _trailingMeta respCode details) -> do
|
||||
details @?= "details string"
|
||||
respBody @?= "reply test"
|
||||
respCode @?= GrpcStatusOk
|
||||
|
||||
testPayloadLowLevelClientUnregistered :: GRPC -> IO ()
|
||||
testPayloadLowLevelClientUnregistered grpc = do
|
||||
payloadLowLevelClientUnregistered :: TestClient
|
||||
payloadLowLevelClientUnregistered = TestClient $ \grpc -> do
|
||||
withClient grpc (ClientConfig "localhost" 50051) $ \client -> do
|
||||
reqResult <- clientRequest client "/foo" "localhost" 10 "Hello!" M.empty
|
||||
case reqResult of
|
||||
Left x -> error $ "Client got error: " ++ show x
|
||||
Right (NormalRequestResult
|
||||
respBody initMeta trailingMeta respCode details) -> do
|
||||
respBody _initMeta _trailingMeta respCode details) -> do
|
||||
respBody @?= "reply test"
|
||||
respCode @?= GrpcStatusOk
|
||||
details @?= "details string"
|
||||
|
||||
testPayloadLowLevelServerUnregistered :: GRPC -> IO ()
|
||||
testPayloadLowLevelServerUnregistered grpc = do
|
||||
payloadLowLevelServerUnregistered :: TestServer
|
||||
payloadLowLevelServerUnregistered = TestServer $ \grpc -> do
|
||||
withServer grpc (ServerConfig "localhost" 50051 []) $ \server -> do
|
||||
result <- serverHandleNormalCall server 11 M.empty $
|
||||
\reqBody reqMeta -> return ("reply test", M.empty,
|
||||
StatusDetails "details string")
|
||||
\_reqBody _reqMeta -> return ("reply test", M.empty,
|
||||
StatusDetails "details string")
|
||||
case result of
|
||||
Left x -> error $ show x
|
||||
Right _ -> return ()
|
||||
|
||||
testClientRequestNoServer :: TestTree
|
||||
testClientRequestNoServer = testCase "request times out when no server " $ do
|
||||
withGRPC $ \grpc -> do
|
||||
withClient grpc (ClientConfig "localhost" 50051) $ \client -> do
|
||||
method <- clientRegisterMethod client "/foo" "localhost" Normal
|
||||
reqResult <- clientRegisteredRequest client method 1 "Hello" M.empty
|
||||
reqResult @?= (Left GRPCIOTimeout)
|
||||
testClientRequestNoServer =
|
||||
grpcTest "Client - request timeout when server DNE" $ \grpc -> do
|
||||
withClient grpc (ClientConfig "localhost" 50051) $ \client -> do
|
||||
method <- clientRegisterMethod client "/foo" "localhost" Normal
|
||||
reqResult <- clientRegisteredRequest client method 1 "Hello" M.empty
|
||||
reqResult @?= (Left GRPCIOTimeout)
|
||||
|
||||
testServerAwaitNoClient :: TestTree
|
||||
testServerAwaitNoClient = testCase "server wait times out when no client " $ do
|
||||
|
@ -126,39 +121,52 @@ testServerUnregisteredAwaitNoClient =
|
|||
Right _ -> return ()
|
||||
|
||||
testPayloadLowLevel :: TestTree
|
||||
testPayloadLowLevel = testCase "LowLevel Haskell library request/response " $ do
|
||||
withGRPC $ \grpc -> do
|
||||
withAsync (testPayloadLowLevelServer grpc) $ \a1 -> do
|
||||
withAsync (testPayloadLowLevelClient grpc) $ \a2 -> do
|
||||
wait a1
|
||||
wait a2
|
||||
testPayloadLowLevel =
|
||||
grpcTest "Client/Server - low-level (registered) request/response" $
|
||||
runClientServer payloadLowLevelClient payloadLowLevelServer
|
||||
|
||||
testPayloadLowLevelUnregistered :: TestTree
|
||||
testPayloadLowLevelUnregistered =
|
||||
testCase "LowLevel Haskell library unregistered request/response " $ do
|
||||
withGRPC $ \grpc -> do
|
||||
withAsync (testPayloadLowLevelServerUnregistered grpc) $ \a1 ->
|
||||
withAsync (testPayloadLowLevelClientUnregistered grpc) $ \a2 -> do
|
||||
wait a1
|
||||
wait a2
|
||||
grpcTest "Client/Server - low-level unregistered request/response" $
|
||||
runClientServer payloadLowLevelClientUnregistered payloadLowLevelServerUnregistered
|
||||
|
||||
testWithServerCall :: TestTree
|
||||
testWithServerCall =
|
||||
testCase "Creating and destroying a call: no errors. " $
|
||||
withGRPC $ \grpc -> do
|
||||
let conf = ServerConfig "localhost" 50051 []
|
||||
withServer grpc conf $ \server -> do
|
||||
result <- withServerCall server 1 $ const $ return $ Right ()
|
||||
result @?= Left GRPCIOTimeout
|
||||
grpcTest "Server - Create/destroy call" $ \grpc -> do
|
||||
let conf = ServerConfig "localhost" 50051 []
|
||||
withServer grpc conf $ \server -> do
|
||||
result <- withServerCall server 1 $ const $ return $ Right ()
|
||||
result @?= Left GRPCIOTimeout
|
||||
|
||||
testWithClientCall :: TestTree
|
||||
testWithClientCall =
|
||||
testCase "Creating and destroying a client call: no errors. " $
|
||||
withGRPC $ \grpc -> do
|
||||
let conf = ClientConfig "localhost" 50051
|
||||
withClient grpc conf $ \client -> do
|
||||
result <- withClientCall client "foo" "localhost" 10 $
|
||||
const $ return $ Right ()
|
||||
case result of
|
||||
Left err -> error $ show err
|
||||
Right _ -> return ()
|
||||
grpcTest "Client - Create/destroy call" $ \grpc -> do
|
||||
let conf = ClientConfig "localhost" 50051
|
||||
withClient grpc conf $ \client -> do
|
||||
result <- withClientCall client "foo" "localhost" 10 $
|
||||
const $ return $ Right ()
|
||||
case result of
|
||||
Left err -> error $ show err
|
||||
Right _ -> return ()
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
-- Utilities and helpers
|
||||
|
||||
dummyMeta :: M.Map ByteString ByteString
|
||||
dummyMeta = M.fromList [("foo","bar")]
|
||||
|
||||
nop :: Monad m => a -> m ()
|
||||
nop = const (return ())
|
||||
|
||||
-- | Defines a general-purpose GRPC unit test
|
||||
grpcTest :: TestName -> (GRPC -> IO ()) -> TestTree
|
||||
grpcTest nm = testCase nm . withGRPC
|
||||
|
||||
newtype TestClient = TestClient (GRPC -> IO ())
|
||||
newtype TestServer = TestServer (GRPC -> IO ())
|
||||
|
||||
-- | Concurrently executes the given 'TestClient' and 'TestServer' TODO: We may
|
||||
-- want to add toplevel timeouts and better error reporting here.
|
||||
runClientServer :: TestClient -> TestServer -> GRPC -> IO ()
|
||||
runClientServer (TestClient c) (TestServer s) grpc =
|
||||
void $ s grpc `concurrently` c grpc
|
||||
|
|
|
@ -1,245 +1,9 @@
|
|||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
import Control.Concurrent.Async
|
||||
import Network.GRPC.Unsafe
|
||||
import Network.GRPC.Unsafe.Slice
|
||||
import Network.GRPC.Unsafe.ByteBuffer
|
||||
import Network.GRPC.Unsafe.Time
|
||||
import Network.GRPC.Unsafe.Metadata
|
||||
import Network.GRPC.Unsafe.Op
|
||||
import Network.GRPC.Unsafe.Constants
|
||||
import qualified Data.ByteString as B
|
||||
import Foreign.Marshal.Alloc
|
||||
import Foreign.Storable
|
||||
import Foreign.Ptr
|
||||
import Test.Tasty
|
||||
import Test.Tasty.HUnit as HU
|
||||
|
||||
import LowLevelTests
|
||||
|
||||
roundtripSlice :: B.ByteString -> TestTree
|
||||
roundtripSlice bs = testCase "Slice C bindings roundtrip" $ do
|
||||
slice <- byteStringToSlice bs
|
||||
unslice <- sliceToByteString slice
|
||||
bs HU.@?= unslice
|
||||
freeSlice slice
|
||||
|
||||
roundtripByteBuffer :: B.ByteString -> TestTree
|
||||
roundtripByteBuffer bs = testCase "ByteBuffer C bindings roundtrip" $ do
|
||||
slice <- byteStringToSlice bs
|
||||
buffer <- grpcRawByteBufferCreate slice 1
|
||||
reader <- byteBufferReaderCreate buffer
|
||||
readSlice <- grpcByteBufferReaderReadall reader
|
||||
bs' <- sliceToByteString readSlice
|
||||
bs' HU.@?= bs
|
||||
--clean up
|
||||
freeSlice slice
|
||||
byteBufferReaderDestroy reader
|
||||
grpcByteBufferDestroy buffer
|
||||
freeSlice readSlice
|
||||
|
||||
currTimeMillis :: ClockType -> IO Int
|
||||
currTimeMillis t = do
|
||||
gprT <- gprNow t
|
||||
tMillis <- gprTimeToMillis gprT
|
||||
timespecDestroy gprT
|
||||
return tMillis
|
||||
|
||||
testNow :: TestTree
|
||||
testNow = testCase "create and destroy various clock types" $ do
|
||||
_ <- currTimeMillis GprClockMonotonic
|
||||
_ <- currTimeMillis GprClockRealtime
|
||||
_ <- currTimeMillis GprClockPrecise
|
||||
return ()
|
||||
|
||||
testMetadata :: TestTree
|
||||
testMetadata = testCase "metadata setter/getter C bindings roundtrip" $ do
|
||||
m <- metadataAlloc 3
|
||||
setMetadataKeyVal "hello" "world" m 0
|
||||
setMetadataKeyVal "foo" "bar" m 1
|
||||
setMetadataKeyVal "Haskell" "Curry" m 2
|
||||
k0 <- getMetadataKey m 0
|
||||
v0 <- getMetadataVal m 0
|
||||
k1 <- getMetadataKey m 1
|
||||
v1 <- getMetadataVal m 1
|
||||
k2 <- getMetadataKey m 2
|
||||
v2 <- getMetadataVal m 2
|
||||
k0 HU.@?= "hello"
|
||||
v0 HU.@?= "world"
|
||||
k1 HU.@?= "foo"
|
||||
v1 HU.@?= "bar"
|
||||
k2 HU.@?= "Haskell"
|
||||
v2 HU.@?= "Curry"
|
||||
metadataFree m
|
||||
|
||||
assertCqEventComplete :: Event -> IO ()
|
||||
assertCqEventComplete e = do
|
||||
eventCompletionType e HU.@?= OpComplete
|
||||
eventSuccess e HU.@?= True
|
||||
|
||||
testPayloadClient :: IO ()
|
||||
testPayloadClient = do
|
||||
client <- grpcInsecureChannelCreate "localhost:50051" nullPtr reserved
|
||||
cq <- grpcCompletionQueueCreate reserved
|
||||
withMetadataArrayPtr $ \initialMetadataRecv -> do
|
||||
withMetadataArrayPtr $ \trailingMetadataRecv -> do
|
||||
withByteBufferPtr $ \clientRecvBB -> do
|
||||
deadline <- secondsToDeadline 5
|
||||
pluckDeadline <- secondsToDeadline 10
|
||||
clientCall <- grpcChannelCreateCall
|
||||
client (Call nullPtr) propagateDefaults cq
|
||||
"/foo" "localhost" deadline reserved
|
||||
--send request
|
||||
withOpArray 6 $ \ops -> do
|
||||
opSendInitialMetadataEmpty ops 0
|
||||
withByteStringAsByteBuffer "hello world" $ \requestPayload -> do
|
||||
opSendMessage ops 1 requestPayload
|
||||
opSendCloseClient ops 2
|
||||
opRecvInitialMetadata ops 3 initialMetadataRecv
|
||||
opRecvMessage ops 4 clientRecvBB
|
||||
statusCodePtr <- createStatusCodePtr
|
||||
let cstringCapacity = 32
|
||||
cStringPtr <- malloc
|
||||
cstring <- mallocBytes cstringCapacity
|
||||
poke cStringPtr cstring
|
||||
opRecvStatusClient ops 5 trailingMetadataRecv statusCodePtr
|
||||
cStringPtr
|
||||
cstringCapacity
|
||||
--send client request
|
||||
requestError <- grpcCallStartBatch clientCall ops 6 (tag 1) reserved
|
||||
clientRequestCqEvent <- grpcCompletionQueuePluck
|
||||
cq (tag 1) pluckDeadline reserved
|
||||
assertCqEventComplete clientRequestCqEvent
|
||||
requestError HU.@?= CallOk
|
||||
free cstring
|
||||
free cStringPtr
|
||||
destroyStatusCodePtr statusCodePtr
|
||||
--verify response received
|
||||
responseRecv <- peek clientRecvBB
|
||||
responseRecvBS <- copyByteBufferToByteString responseRecv
|
||||
responseRecvBS HU.@?= "hello you"
|
||||
grpcCompletionQueueShutdown cq
|
||||
grpcCallDestroy clientCall
|
||||
--TODO: the grpc test drains the cq here
|
||||
grpcCompletionQueueDestroy cq
|
||||
grpcChannelDestroy client
|
||||
|
||||
testPayloadServer :: IO ()
|
||||
testPayloadServer = do
|
||||
server <- grpcServerCreate nullPtr reserved
|
||||
cq <- grpcCompletionQueueCreate reserved
|
||||
grpcServerRegisterCompletionQueue server cq reserved
|
||||
_ <- grpcServerAddInsecureHttp2Port server "localhost:50051"
|
||||
grpcServerStart server
|
||||
serverCallPtr <- malloc
|
||||
withMetadataArrayPtr $ \requestMetadataRecv -> do
|
||||
withByteBufferPtr $ \recvBufferPtr -> do
|
||||
callDetails <- createCallDetails
|
||||
requestMetadataRecv' <- peek requestMetadataRecv
|
||||
recvRequestError <- grpcServerRequestCall
|
||||
server serverCallPtr callDetails
|
||||
requestMetadataRecv' cq cq (tag 101)
|
||||
pluckDeadline' <- secondsToDeadline 10
|
||||
requestCallCqEvent <- grpcCompletionQueuePluck cq (tag 101)
|
||||
pluckDeadline'
|
||||
reserved
|
||||
assertCqEventComplete requestCallCqEvent
|
||||
recvRequestError HU.@?= CallOk
|
||||
destroyCallDetails callDetails
|
||||
--receive request
|
||||
withOpArray 2 $ \recvOps -> do
|
||||
opSendInitialMetadataEmpty recvOps 0
|
||||
opRecvMessage recvOps 1 recvBufferPtr
|
||||
serverCall <- peek serverCallPtr
|
||||
recvBatchError <- grpcCallStartBatch serverCall recvOps 2
|
||||
(tag 102) reserved
|
||||
recvBatchError HU.@?= CallOk
|
||||
pluckDeadline'' <- secondsToDeadline 10
|
||||
recvCqEvent <- grpcCompletionQueuePluck cq (tag 102)
|
||||
pluckDeadline''
|
||||
reserved
|
||||
assertCqEventComplete recvCqEvent
|
||||
--send response
|
||||
withOpArray 3 $ \respOps -> do
|
||||
withByteStringAsByteBuffer "hello you" $ \respbb -> do
|
||||
cancelledPtr <- malloc
|
||||
opRecvCloseServer respOps 0 cancelledPtr
|
||||
opSendMessage respOps 1 respbb
|
||||
B.useAsCString "ok" $ \detailsStr ->
|
||||
opSendStatusServer respOps 2 0 (MetadataKeyValPtr nullPtr)
|
||||
GrpcStatusOk detailsStr
|
||||
serverCall <- peek serverCallPtr
|
||||
respBatchError <- grpcCallStartBatch serverCall respOps 3
|
||||
(tag 103) reserved
|
||||
respBatchError HU.@?= CallOk
|
||||
pluckDeadline''' <- secondsToDeadline 10
|
||||
respCqEvent <- grpcCompletionQueuePluck cq (tag 103)
|
||||
pluckDeadline'''
|
||||
reserved
|
||||
assertCqEventComplete respCqEvent
|
||||
--verify data was received
|
||||
serverRecv <- peek recvBufferPtr
|
||||
serverRecvBS <- copyByteBufferToByteString serverRecv
|
||||
serverRecvBS HU.@?= "hello world"
|
||||
--shut down
|
||||
grpcServerShutdownAndNotify server cq (tag 0)
|
||||
pluckDeadline'''' <- secondsToDeadline 10
|
||||
shutdownEvent <- grpcCompletionQueuePluck cq (tag 0) pluckDeadline''''
|
||||
reserved
|
||||
assertCqEventComplete shutdownEvent
|
||||
grpcServerCancelAllCalls server
|
||||
grpcServerDestroy server
|
||||
grpcCompletionQueueShutdown cq
|
||||
grpcCompletionQueueDestroy cq
|
||||
free serverCallPtr
|
||||
|
||||
-- | Straightforward translation of the gRPC core test end2end/tests/payload.c
|
||||
-- This is intended to test the low-level C bindings, so we use only a few
|
||||
-- minimal abstractions on top of it.
|
||||
testPayload :: TestTree
|
||||
testPayload = testCase "low-level C bindings request/response " $ do
|
||||
grpcInit
|
||||
withAsync testPayloadServer $ \a1 -> do
|
||||
withAsync testPayloadClient $ \a2 -> do
|
||||
() <- wait a1
|
||||
wait a2
|
||||
grpcShutdown
|
||||
putStrLn "Done."
|
||||
|
||||
testCreateDestroyMetadata :: TestTree
|
||||
testCreateDestroyMetadata = testCase "create/destroy metadataArrayPtr " $ do
|
||||
grpcInit
|
||||
withMetadataArrayPtr $ const (return ())
|
||||
grpcShutdown
|
||||
|
||||
testCreateDestroyMetadataKeyVals :: TestTree
|
||||
testCreateDestroyMetadataKeyVals = testCase "create/destroy metadata k/vs " $ do
|
||||
grpcInit
|
||||
withMetadataKeyValPtr 10 $ const (return ())
|
||||
grpcShutdown
|
||||
|
||||
testCreateDestroyDeadline :: TestTree
|
||||
testCreateDestroyDeadline = testCase "create/destroy deadline " $ do
|
||||
grpcInit
|
||||
withDeadlineSeconds 10 $ const (return ())
|
||||
grpcShutdown
|
||||
|
||||
unsafeTests :: TestTree
|
||||
unsafeTests = testGroup "Unit tests for unsafe C bindings."
|
||||
[testPayload,
|
||||
roundtripSlice "Hello, world!",
|
||||
roundtripByteBuffer "Hwaet! We gardena in geardagum...",
|
||||
testMetadata,
|
||||
testNow,
|
||||
testCreateDestroyMetadata,
|
||||
testCreateDestroyMetadataKeyVals,
|
||||
testCreateDestroyDeadline
|
||||
]
|
||||
|
||||
allTests :: TestTree
|
||||
allTests = testGroup "All tests"
|
||||
[ unsafeTests,
|
||||
lowLevelTests]
|
||||
import LowLevelTests
|
||||
import Test.Tasty
|
||||
import UnsafeTests
|
||||
|
||||
main :: IO ()
|
||||
main = defaultMain allTests
|
||||
main = defaultMain $ testGroup "GRPC Unit Tests"
|
||||
[ unsafeTests
|
||||
, lowLevelTests
|
||||
]
|
||||
|
|
230
tests/UnsafeTests.hs
Normal file
230
tests/UnsafeTests.hs
Normal file
|
@ -0,0 +1,230 @@
|
|||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module UnsafeTests (unsafeTests) where
|
||||
|
||||
import Control.Concurrent.Async
|
||||
import Control.Exception (bracket_)
|
||||
import Control.Monad
|
||||
import qualified Data.ByteString as B
|
||||
import Foreign.Marshal.Alloc
|
||||
import Foreign.Ptr
|
||||
import Foreign.Storable
|
||||
import Network.GRPC.Unsafe
|
||||
import Network.GRPC.Unsafe.ByteBuffer
|
||||
import Network.GRPC.Unsafe.Constants
|
||||
import Network.GRPC.Unsafe.Metadata
|
||||
import Network.GRPC.Unsafe.Op
|
||||
import Network.GRPC.Unsafe.Slice
|
||||
import Network.GRPC.Unsafe.Time
|
||||
import Test.Tasty
|
||||
import Test.Tasty.HUnit as HU (testCase, (@?=))
|
||||
|
||||
unsafeTests :: TestTree
|
||||
unsafeTests = testGroup "Unit tests for unsafe C bindings"
|
||||
[ roundtripSlice "Hello, world!"
|
||||
, roundtripByteBuffer "Hwaet! We gardena in geardagum..."
|
||||
, testMetadata
|
||||
, testNow
|
||||
, testCreateDestroyMetadata
|
||||
, testCreateDestroyMetadataKeyVals
|
||||
, testCreateDestroyDeadline
|
||||
, testPayload
|
||||
]
|
||||
|
||||
roundtripSlice :: B.ByteString -> TestTree
|
||||
roundtripSlice bs = testCase "ByteString slice roundtrip" $ do
|
||||
slice <- byteStringToSlice bs
|
||||
unslice <- sliceToByteString slice
|
||||
bs HU.@?= unslice
|
||||
freeSlice slice
|
||||
|
||||
roundtripByteBuffer :: B.ByteString -> TestTree
|
||||
roundtripByteBuffer bs = testCase "ByteBuffer roundtrip" $ do
|
||||
slice <- byteStringToSlice bs
|
||||
buffer <- grpcRawByteBufferCreate slice 1
|
||||
reader <- byteBufferReaderCreate buffer
|
||||
readSlice <- grpcByteBufferReaderReadall reader
|
||||
bs' <- sliceToByteString readSlice
|
||||
bs' HU.@?= bs
|
||||
-- clean up
|
||||
freeSlice slice
|
||||
byteBufferReaderDestroy reader
|
||||
grpcByteBufferDestroy buffer
|
||||
freeSlice readSlice
|
||||
|
||||
testMetadata :: TestTree
|
||||
testMetadata = testCase "Metadata setter/getter roundtrip" $ do
|
||||
m <- metadataAlloc 3
|
||||
setMetadataKeyVal "hello" "world" m 0
|
||||
setMetadataKeyVal "foo" "bar" m 1
|
||||
setMetadataKeyVal "Haskell" "Curry" m 2
|
||||
k0 <- getMetadataKey m 0
|
||||
v0 <- getMetadataVal m 0
|
||||
k1 <- getMetadataKey m 1
|
||||
v1 <- getMetadataVal m 1
|
||||
k2 <- getMetadataKey m 2
|
||||
v2 <- getMetadataVal m 2
|
||||
k0 HU.@?= "hello"
|
||||
v0 HU.@?= "world"
|
||||
k1 HU.@?= "foo"
|
||||
v1 HU.@?= "bar"
|
||||
k2 HU.@?= "Haskell"
|
||||
v2 HU.@?= "Curry"
|
||||
metadataFree m
|
||||
|
||||
currTimeMillis :: ClockType -> IO Int
|
||||
currTimeMillis t = do
|
||||
gprT <- gprNow t
|
||||
tMillis <- gprTimeToMillis gprT
|
||||
timespecDestroy gprT
|
||||
return tMillis
|
||||
|
||||
testNow :: TestTree
|
||||
testNow = testCase "Create/destroy various clock types" $ do
|
||||
_ <- currTimeMillis GprClockMonotonic
|
||||
_ <- currTimeMillis GprClockRealtime
|
||||
_ <- currTimeMillis GprClockPrecise
|
||||
return ()
|
||||
|
||||
testCreateDestroyMetadata :: TestTree
|
||||
testCreateDestroyMetadata = testCase "Create/destroy metadataArrayPtr" $ do
|
||||
grpc $ withMetadataArrayPtr $ const $ return ()
|
||||
|
||||
testCreateDestroyMetadataKeyVals :: TestTree
|
||||
testCreateDestroyMetadataKeyVals = testCase "Create/destroy metadata key/values" $ do
|
||||
grpc $ withMetadataKeyValPtr 10 $ const $ return ()
|
||||
|
||||
testCreateDestroyDeadline :: TestTree
|
||||
testCreateDestroyDeadline = testCase "Create/destroy deadline" $ do
|
||||
grpc $ withDeadlineSeconds 10 $ const $ return ()
|
||||
|
||||
assertCqEventComplete :: Event -> IO ()
|
||||
assertCqEventComplete e = do
|
||||
eventCompletionType e HU.@?= OpComplete
|
||||
eventSuccess e HU.@?= True
|
||||
|
||||
payloadClient :: IO ()
|
||||
payloadClient = do
|
||||
client <- grpcInsecureChannelCreate "localhost:50051" nullPtr reserved
|
||||
cq <- grpcCompletionQueueCreate reserved
|
||||
withMetadataArrayPtr $ \initialMetadataRecv -> do
|
||||
withMetadataArrayPtr $ \trailingMetadataRecv -> do
|
||||
withByteBufferPtr $ \clientRecvBB -> do
|
||||
deadline <- secondsToDeadline 5
|
||||
pluckDeadline <- secondsToDeadline 10
|
||||
clientCall <- grpcChannelCreateCall
|
||||
client (Call nullPtr) propagateDefaults cq
|
||||
"/foo" "localhost" deadline reserved
|
||||
--send request
|
||||
withOpArray 6 $ \ops -> do
|
||||
opSendInitialMetadataEmpty ops 0
|
||||
withByteStringAsByteBuffer "hello world" $ \requestPayload -> do
|
||||
opSendMessage ops 1 requestPayload
|
||||
opSendCloseClient ops 2
|
||||
opRecvInitialMetadata ops 3 initialMetadataRecv
|
||||
opRecvMessage ops 4 clientRecvBB
|
||||
statusCodePtr <- createStatusCodePtr
|
||||
let cstringCapacity = 32
|
||||
cStringPtr <- malloc
|
||||
cstring <- mallocBytes cstringCapacity
|
||||
poke cStringPtr cstring
|
||||
opRecvStatusClient ops 5 trailingMetadataRecv statusCodePtr
|
||||
cStringPtr
|
||||
cstringCapacity
|
||||
--send client request
|
||||
requestError <- grpcCallStartBatch clientCall ops 6 (tag 1) reserved
|
||||
clientRequestCqEvent <- grpcCompletionQueuePluck
|
||||
cq (tag 1) pluckDeadline reserved
|
||||
assertCqEventComplete clientRequestCqEvent
|
||||
requestError HU.@?= CallOk
|
||||
free cstring
|
||||
free cStringPtr
|
||||
destroyStatusCodePtr statusCodePtr
|
||||
--verify response received
|
||||
responseRecv <- peek clientRecvBB
|
||||
responseRecvBS <- copyByteBufferToByteString responseRecv
|
||||
responseRecvBS HU.@?= "hello you"
|
||||
grpcCompletionQueueShutdown cq
|
||||
grpcCallDestroy clientCall
|
||||
--TODO: the grpc test drains the cq here
|
||||
grpcCompletionQueueDestroy cq
|
||||
grpcChannelDestroy client
|
||||
|
||||
payloadServer :: IO ()
|
||||
payloadServer = do
|
||||
server <- grpcServerCreate nullPtr reserved
|
||||
cq <- grpcCompletionQueueCreate reserved
|
||||
grpcServerRegisterCompletionQueue server cq reserved
|
||||
_ <- grpcServerAddInsecureHttp2Port server "localhost:50051"
|
||||
grpcServerStart server
|
||||
serverCallPtr <- malloc
|
||||
withMetadataArrayPtr $ \requestMetadataRecv -> do
|
||||
withByteBufferPtr $ \recvBufferPtr -> do
|
||||
callDetails <- createCallDetails
|
||||
requestMetadataRecv' <- peek requestMetadataRecv
|
||||
recvRequestError <- grpcServerRequestCall
|
||||
server serverCallPtr callDetails
|
||||
requestMetadataRecv' cq cq (tag 101)
|
||||
pluckDeadline' <- secondsToDeadline 10
|
||||
requestCallCqEvent <- grpcCompletionQueuePluck cq (tag 101)
|
||||
pluckDeadline'
|
||||
reserved
|
||||
assertCqEventComplete requestCallCqEvent
|
||||
recvRequestError HU.@?= CallOk
|
||||
destroyCallDetails callDetails
|
||||
--receive request
|
||||
withOpArray 2 $ \recvOps -> do
|
||||
opSendInitialMetadataEmpty recvOps 0
|
||||
opRecvMessage recvOps 1 recvBufferPtr
|
||||
serverCall <- peek serverCallPtr
|
||||
recvBatchError <- grpcCallStartBatch serverCall recvOps 2
|
||||
(tag 102) reserved
|
||||
recvBatchError HU.@?= CallOk
|
||||
pluckDeadline'' <- secondsToDeadline 10
|
||||
recvCqEvent <- grpcCompletionQueuePluck cq (tag 102)
|
||||
pluckDeadline''
|
||||
reserved
|
||||
assertCqEventComplete recvCqEvent
|
||||
--send response
|
||||
withOpArray 3 $ \respOps -> do
|
||||
withByteStringAsByteBuffer "hello you" $ \respbb -> do
|
||||
cancelledPtr <- malloc
|
||||
opRecvCloseServer respOps 0 cancelledPtr
|
||||
opSendMessage respOps 1 respbb
|
||||
B.useAsCString "ok" $ \detailsStr ->
|
||||
opSendStatusServer respOps 2 0 (MetadataKeyValPtr nullPtr)
|
||||
GrpcStatusOk detailsStr
|
||||
serverCall <- peek serverCallPtr
|
||||
respBatchError <- grpcCallStartBatch serverCall respOps 3
|
||||
(tag 103) reserved
|
||||
respBatchError HU.@?= CallOk
|
||||
pluckDeadline''' <- secondsToDeadline 10
|
||||
respCqEvent <- grpcCompletionQueuePluck cq (tag 103)
|
||||
pluckDeadline'''
|
||||
reserved
|
||||
assertCqEventComplete respCqEvent
|
||||
--verify data was received
|
||||
serverRecv <- peek recvBufferPtr
|
||||
serverRecvBS <- copyByteBufferToByteString serverRecv
|
||||
serverRecvBS HU.@?= "hello world"
|
||||
--shut down
|
||||
grpcServerShutdownAndNotify server cq (tag 0)
|
||||
pluckDeadline'''' <- secondsToDeadline 10
|
||||
shutdownEvent <- grpcCompletionQueuePluck cq (tag 0) pluckDeadline''''
|
||||
reserved
|
||||
assertCqEventComplete shutdownEvent
|
||||
grpcServerCancelAllCalls server
|
||||
grpcServerDestroy server
|
||||
grpcCompletionQueueShutdown cq
|
||||
grpcCompletionQueueDestroy cq
|
||||
free serverCallPtr
|
||||
|
||||
-- | Straightforward translation of the gRPC core test end2end/tests/payload.c
|
||||
-- This is intended to test the low-level C bindings, so we use only a few
|
||||
-- minimal abstractions on top of it.
|
||||
testPayload :: TestTree
|
||||
testPayload = testCase "Unsafe request/response" $ do
|
||||
grpc $ payloadClient `concurrently` payloadServer
|
||||
|
||||
grpc :: IO a -> IO ()
|
||||
grpc = bracket_ grpcInit grpcShutdown . void
|
Loading…
Reference in a new issue