mirror of
https://github.com/unclechu/gRPC-haskell.git
synced 2024-12-25 11:19:44 +01:00
Merge branch 'master' into joel/bugfix/server-request-registered-call-pluck-permission
Conflicts: src/Network/GRPC/LowLevel/CompletionQueue.hs
This commit is contained in:
commit
d24f69176b
6 changed files with 33 additions and 158 deletions
|
@ -127,6 +127,7 @@ test-suite test
|
|||
, pipes ==4.1.*
|
||||
, transformers
|
||||
, safe
|
||||
, clock ==0.6.*
|
||||
other-modules:
|
||||
LowLevelTests,
|
||||
LowLevelTests.Op,
|
||||
|
|
|
@ -38,10 +38,6 @@ debugServerCall ServerCall{..} = do
|
|||
grpcDebug $ "debugServerCall(U): server call: " ++ show ptr
|
||||
grpcDebug $ "debugServerCall(U): metadata: "
|
||||
++ show requestMetadataRecv
|
||||
forM_ parentPtr $ \parentPtr' -> do
|
||||
grpcDebug $ "debugServerCall(U): parent ptr: " ++ show parentPtr'
|
||||
C.Call parent <- peek parentPtr'
|
||||
grpcDebug $ "debugServerCall(U): parent: " ++ show parent
|
||||
grpcDebug $ "debugServerCall(U): deadline: " ++ show callDeadline
|
||||
grpcDebug $ "debugServerCall(U): method: " ++ show callMethod
|
||||
grpcDebug $ "debugServerCall(U): host: " ++ show callHost
|
||||
|
|
|
@ -49,20 +49,20 @@ import Data.List (intersperse)
|
|||
import Foreign.Marshal.Alloc (free, malloc)
|
||||
import Foreign.Ptr (Ptr, nullPtr)
|
||||
import Foreign.Storable (Storable, peek)
|
||||
import Network.GRPC.LowLevel.Call
|
||||
import Network.GRPC.LowLevel.CompletionQueue.Internal
|
||||
import Network.GRPC.LowLevel.GRPC
|
||||
import qualified Network.GRPC.Unsafe as C
|
||||
import qualified Network.GRPC.Unsafe.ByteBuffer as C
|
||||
import qualified Network.GRPC.Unsafe.Constants as C
|
||||
import qualified Network.GRPC.Unsafe.Metadata as C
|
||||
import qualified Network.GRPC.Unsafe.Op as C
|
||||
import qualified Network.GRPC.Unsafe.Time as C
|
||||
import System.Clock (Clock (..),
|
||||
getTime)
|
||||
import System.Info (os)
|
||||
import System.Timeout (timeout)
|
||||
|
||||
import Network.GRPC.LowLevel.Call
|
||||
import Network.GRPC.LowLevel.CompletionQueue.Internal
|
||||
import Network.GRPC.LowLevel.GRPC
|
||||
import qualified Network.GRPC.Unsafe.ByteBuffer as C
|
||||
|
||||
withCompletionQueue :: GRPC -> (CompletionQueue -> IO a) -> IO a
|
||||
withCompletionQueue grpc = bracket (createCompletionQueue grpc)
|
||||
shutdownCompletionQueue
|
||||
|
@ -184,6 +184,14 @@ serverRequestCall s cq@CompletionQueue{.. } rm =
|
|||
toBS p = peek p >>= \bb@(C.ByteBuffer rawPtr) ->
|
||||
if | rawPtr == nullPtr -> return Nothing
|
||||
| otherwise -> Just <$> C.copyByteBufferToByteString bb
|
||||
convertDeadline deadline = do
|
||||
deadline' <- C.timeSpec <$> peek deadline
|
||||
--On OS X, gRPC gives us a deadline that is just a delta, so we
|
||||
--convert it to an actual deadline.
|
||||
if os == "darwin"
|
||||
then do now <- getTime Monotonic
|
||||
return $ now + deadline'
|
||||
else return deadline'
|
||||
|
||||
-- | Register the server's completion queue. Must be done before the server is
|
||||
-- started.
|
||||
|
|
|
@ -12,6 +12,7 @@ import System.Clock
|
|||
{#context prefix = "grp" #}
|
||||
|
||||
newtype CTimeSpec = CTimeSpec { timeSpec :: TimeSpec }
|
||||
deriving (Eq, Show)
|
||||
|
||||
instance Storable CTimeSpec where
|
||||
sizeOf _ = {#sizeof gpr_timespec #}
|
||||
|
|
|
@ -20,27 +20,7 @@ import Network.GRPC.LowLevel.CompletionQueue
|
|||
|
||||
lowLevelOpTests :: TestTree
|
||||
lowLevelOpTests = testGroup "Synchronous unit tests of low-level Op interface"
|
||||
[testCancelWhileHandling
|
||||
,testCancelFromServer]
|
||||
|
||||
testCancelWhileHandling :: TestTree
|
||||
testCancelWhileHandling =
|
||||
testCase "Client/Server - cancel after handler starts does nothing" $
|
||||
runSerialTest $ \grpc ->
|
||||
withClientServerUnaryCall grpc $
|
||||
\(Client{..}, Server{..}, cc@ClientCall{..}, ServerCall{..}) -> do
|
||||
withOpArrayAndCtxts serverEmptyRecvOps $ \(opArray, ctxts) -> do
|
||||
tag <- newTag serverCQ
|
||||
startBatch serverCQ unServerCall opArray 3 tag
|
||||
pluck serverCQ tag (Just 1)
|
||||
let (OpRecvCloseOnServerContext pcancelled) = last ctxts
|
||||
cancelledBefore <- peek pcancelled
|
||||
cancelledBefore @?= 0
|
||||
clientCallCancel cc
|
||||
threadDelay 1000000
|
||||
cancelledAfter <- peek pcancelled
|
||||
cancelledAfter @?= 0
|
||||
return $ Right ()
|
||||
[testCancelFromServer]
|
||||
|
||||
testCancelFromServer :: TestTree
|
||||
testCancelFromServer =
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
module UnsafeTests (unsafeTests) where
|
||||
|
||||
import Control.Concurrent (threadDelay)
|
||||
import Control.Concurrent.Async
|
||||
import Control.Exception (bracket_)
|
||||
import Control.Monad
|
||||
|
@ -17,8 +18,10 @@ import Network.GRPC.Unsafe.Op
|
|||
import Network.GRPC.Unsafe.Slice
|
||||
import Network.GRPC.Unsafe.Time
|
||||
import Network.GRPC.Unsafe.ChannelArgs
|
||||
import System.Clock
|
||||
import Test.Tasty
|
||||
import Test.Tasty.HUnit as HU (testCase, (@?=))
|
||||
import Test.Tasty.HUnit as HU (testCase, (@?=),
|
||||
assertBool)
|
||||
|
||||
unsafeTests :: TestTree
|
||||
unsafeTests = testGroup "Unit tests for unsafe C bindings"
|
||||
|
@ -26,12 +29,12 @@ unsafeTests = testGroup "Unit tests for unsafe C bindings"
|
|||
, roundtripByteBuffer "Hwaet! We gardena in geardagum..."
|
||||
, roundtripSlice largeByteString
|
||||
, roundtripByteBuffer largeByteString
|
||||
, roundtripTimeSpec (TimeSpec 123 123)
|
||||
, testMetadata
|
||||
, testNow
|
||||
, testCreateDestroyMetadata
|
||||
, testCreateDestroyMetadataKeyVals
|
||||
, testCreateDestroyDeadline
|
||||
, testPayload
|
||||
, testCreateDestroyChannelArgs
|
||||
]
|
||||
|
||||
|
@ -59,6 +62,15 @@ roundtripByteBuffer bs = testCase "ByteBuffer roundtrip" $ do
|
|||
grpcByteBufferDestroy buffer
|
||||
freeSlice readSlice
|
||||
|
||||
roundtripTimeSpec :: TimeSpec -> TestTree
|
||||
roundtripTimeSpec t = testCase "CTimeSpec roundtrip" $ do
|
||||
p <- malloc
|
||||
let c = CTimeSpec t
|
||||
poke p c
|
||||
c' <- peek p
|
||||
c' @?= c
|
||||
free p
|
||||
|
||||
testMetadata :: TestTree
|
||||
testMetadata = testCase "Metadata setter/getter roundtrip" $ do
|
||||
m <- metadataAlloc 3
|
||||
|
@ -115,131 +127,8 @@ assertCqEventComplete e = do
|
|||
eventCompletionType e HU.@?= OpComplete
|
||||
eventSuccess e HU.@?= True
|
||||
|
||||
payloadClient :: IO ()
|
||||
payloadClient = do
|
||||
client <- grpcInsecureChannelCreate "localhost:50051" nullPtr reserved
|
||||
cq <- grpcCompletionQueueCreate reserved
|
||||
withMetadataArrayPtr $ \initialMetadataRecv -> do
|
||||
withMetadataArrayPtr $ \trailingMetadataRecv -> do
|
||||
withByteBufferPtr $ \clientRecvBB -> do
|
||||
deadline <- secondsToDeadline 5
|
||||
pluckDeadline <- secondsToDeadline 10
|
||||
clientCall <- grpcChannelCreateCall
|
||||
client (Call nullPtr) propagateDefaults cq
|
||||
"/foo" "localhost" deadline reserved
|
||||
--send request
|
||||
withOpArray 6 $ \ops -> do
|
||||
opSendInitialMetadataEmpty ops 0
|
||||
withByteStringAsByteBuffer "hello world" $ \requestPayload -> do
|
||||
opSendMessage ops 1 requestPayload
|
||||
opSendCloseClient ops 2
|
||||
opRecvInitialMetadata ops 3 initialMetadataRecv
|
||||
opRecvMessage ops 4 clientRecvBB
|
||||
statusCodePtr <- createStatusCodePtr
|
||||
let cstringCapacity = 32
|
||||
cStringPtr <- malloc
|
||||
cstring <- mallocBytes cstringCapacity
|
||||
poke cStringPtr cstring
|
||||
opRecvStatusClient ops 5 trailingMetadataRecv statusCodePtr
|
||||
cStringPtr
|
||||
cstringCapacity
|
||||
--send client request
|
||||
requestError <- grpcCallStartBatch clientCall ops 6 (tag 1) reserved
|
||||
clientRequestCqEvent <- grpcCompletionQueuePluck
|
||||
cq (tag 1) pluckDeadline reserved
|
||||
assertCqEventComplete clientRequestCqEvent
|
||||
requestError HU.@?= CallOk
|
||||
free cstring
|
||||
free cStringPtr
|
||||
destroyStatusCodePtr statusCodePtr
|
||||
--verify response received
|
||||
responseRecv <- peek clientRecvBB
|
||||
let (ByteBuffer rawPtr) = responseRecv
|
||||
if rawPtr == nullPtr
|
||||
then error "Client got null pointer for received response!"
|
||||
else do responseRecvBS <- copyByteBufferToByteString responseRecv
|
||||
responseRecvBS HU.@?= "hello you"
|
||||
grpcCompletionQueueShutdown cq
|
||||
grpcCallDestroy clientCall
|
||||
--TODO: the grpc test drains the cq here
|
||||
grpcCompletionQueueDestroy cq
|
||||
grpcChannelDestroy client
|
||||
|
||||
payloadServer :: IO ()
|
||||
payloadServer = do
|
||||
server <- grpcServerCreate nullPtr reserved
|
||||
cq <- grpcCompletionQueueCreate reserved
|
||||
grpcServerRegisterCompletionQueue server cq reserved
|
||||
_ <- grpcServerAddInsecureHttp2Port server "localhost:50051"
|
||||
grpcServerStart server
|
||||
serverCallPtr <- malloc
|
||||
withMetadataArrayPtr $ \requestMetadataRecv -> do
|
||||
withByteBufferPtr $ \recvBufferPtr -> do
|
||||
callDetails <- createCallDetails
|
||||
requestMetadataRecv' <- peek requestMetadataRecv
|
||||
recvRequestError <- grpcServerRequestCall
|
||||
server serverCallPtr callDetails
|
||||
requestMetadataRecv' cq cq (tag 101)
|
||||
pluckDeadline' <- secondsToDeadline 10
|
||||
requestCallCqEvent <- grpcCompletionQueuePluck cq (tag 101)
|
||||
pluckDeadline'
|
||||
reserved
|
||||
assertCqEventComplete requestCallCqEvent
|
||||
recvRequestError HU.@?= CallOk
|
||||
destroyCallDetails callDetails
|
||||
--receive request
|
||||
withOpArray 2 $ \recvOps -> do
|
||||
opSendInitialMetadataEmpty recvOps 0
|
||||
opRecvMessage recvOps 1 recvBufferPtr
|
||||
serverCall <- peek serverCallPtr
|
||||
recvBatchError <- grpcCallStartBatch serverCall recvOps 2
|
||||
(tag 102) reserved
|
||||
recvBatchError HU.@?= CallOk
|
||||
pluckDeadline'' <- secondsToDeadline 10
|
||||
recvCqEvent <- grpcCompletionQueuePluck cq (tag 102)
|
||||
pluckDeadline''
|
||||
reserved
|
||||
assertCqEventComplete recvCqEvent
|
||||
--send response
|
||||
withOpArray 3 $ \respOps -> do
|
||||
withByteStringAsByteBuffer "hello you" $ \respbb -> do
|
||||
cancelledPtr <- malloc
|
||||
opRecvCloseServer respOps 0 cancelledPtr
|
||||
opSendMessage respOps 1 respbb
|
||||
B.useAsCString "ok" $ \detailsStr ->
|
||||
opSendStatusServer respOps 2 0 (MetadataKeyValPtr nullPtr)
|
||||
StatusOk detailsStr
|
||||
serverCall <- peek serverCallPtr
|
||||
respBatchError <- grpcCallStartBatch serverCall respOps 3
|
||||
(tag 103) reserved
|
||||
respBatchError HU.@?= CallOk
|
||||
pluckDeadline''' <- secondsToDeadline 10
|
||||
respCqEvent <- grpcCompletionQueuePluck cq (tag 103)
|
||||
pluckDeadline'''
|
||||
reserved
|
||||
assertCqEventComplete respCqEvent
|
||||
--verify data was received
|
||||
serverRecv <- peek recvBufferPtr
|
||||
serverRecvBS <- copyByteBufferToByteString serverRecv
|
||||
serverRecvBS HU.@?= "hello world"
|
||||
--shut down
|
||||
grpcServerShutdownAndNotify server cq (tag 0)
|
||||
pluckDeadline'''' <- secondsToDeadline 10
|
||||
shutdownEvent <- grpcCompletionQueuePluck cq (tag 0) pluckDeadline''''
|
||||
reserved
|
||||
assertCqEventComplete shutdownEvent
|
||||
grpcServerCancelAllCalls server
|
||||
grpcServerDestroy server
|
||||
grpcCompletionQueueShutdown cq
|
||||
grpcCompletionQueueDestroy cq
|
||||
free serverCallPtr
|
||||
|
||||
-- | Straightforward translation of the gRPC core test end2end/tests/payload.c
|
||||
-- This is intended to test the low-level C bindings, so we use only a few
|
||||
-- minimal abstractions on top of it.
|
||||
testPayload :: TestTree
|
||||
testPayload = testCase "Unsafe request/response" $ do
|
||||
grpc $ payloadClient `concurrently` payloadServer
|
||||
|
||||
grpc :: IO a -> IO ()
|
||||
grpc = bracket_ grpcInit grpcShutdown . void
|
||||
|
||||
threadDelaySecs :: Int -> IO ()
|
||||
threadDelaySecs = threadDelay . (* 10^(6::Int))
|
||||
|
|
Loading…
Reference in a new issue