mirror of
https://github.com/unclechu/gRPC-haskell.git
synced 2024-11-26 21:19:43 +01:00
Criterion benchmarks (#50)
* begin bench executable * tweak benchmark, fork serverLoop for interruptibility * client streaming benchmarks * add server streaming handler * server streaming benchmark * bidi streaming benchmark * cleanup, create benchmark html * improve error messages * benchmarks: add bounds, remove -g, add -O2 * eliminate explicit sleep at shutdown * bump protobuf-wire version * remove superfluous parens, remove benchmarks.html
This commit is contained in:
parent
c1fa7956c7
commit
0d70a6c960
6 changed files with 193 additions and 1078 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -5,3 +5,4 @@ dist
|
|||
*.o
|
||||
examples/echo/echo-cpp/echo-client
|
||||
examples/echo/echo-cpp/echo-server
|
||||
benchmarks.html
|
||||
|
|
167
bench/Bench.hs
Normal file
167
bench/Bench.hs
Normal file
|
@ -0,0 +1,167 @@
|
|||
{-# LANGUAGE DeriveGeneric #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE BangPatterns #-}
|
||||
|
||||
import Control.Concurrent
|
||||
import Control.Concurrent.Async
|
||||
import Control.Exception (bracket)
|
||||
import Control.Monad
|
||||
import Criterion.Main
|
||||
import Criterion.Types (Config(..))
|
||||
import qualified Data.ByteString.Lazy as BL
|
||||
import Data.Word
|
||||
import Data.Protobuf.Wire.Class
|
||||
import Data.Protobuf.Wire.Types
|
||||
import GHC.Generics (Generic)
|
||||
import Network.GRPC.HighLevel.Server hiding (serverLoop)
|
||||
import Network.GRPC.HighLevel.Server.Unregistered (serverLoop)
|
||||
import Network.GRPC.LowLevel
|
||||
import System.Random (randomRIO)
|
||||
|
||||
data AddRequest = AddRequest {addX :: Fixed Word32
|
||||
, addY :: Fixed Word32}
|
||||
deriving (Show, Eq, Ord, Generic)
|
||||
instance Message AddRequest
|
||||
|
||||
data AddResponse = AddResponse (Fixed Word32)
|
||||
deriving (Show, Eq, Ord, Generic)
|
||||
instance Message AddResponse
|
||||
|
||||
addMethod :: MethodName
|
||||
addMethod = MethodName "/unary"
|
||||
|
||||
addClientStreamMethod :: MethodName
|
||||
addClientStreamMethod = MethodName "/clientstream"
|
||||
|
||||
addServerStreamMethod :: MethodName
|
||||
addServerStreamMethod = MethodName "/serverstream"
|
||||
|
||||
addBiDiMethod :: MethodName
|
||||
addBiDiMethod = MethodName "/bidistream"
|
||||
|
||||
addHandler :: Handler 'Normal
|
||||
addHandler =
|
||||
UnaryHandler addMethod $
|
||||
\c -> do
|
||||
let b = payload c
|
||||
return ( AddResponse $ addX b + addY b
|
||||
, metadata c
|
||||
, StatusOk
|
||||
, StatusDetails ""
|
||||
)
|
||||
|
||||
addClientStreamHandler :: Handler 'ClientStreaming
|
||||
addClientStreamHandler =
|
||||
ClientStreamHandler addClientStreamMethod $
|
||||
\_ recv -> do
|
||||
answer <- go recv 0
|
||||
return (Just answer, mempty, StatusOk, "")
|
||||
where go recv !i = do
|
||||
req <- recv
|
||||
case req of
|
||||
Left _ -> return $ AddResponse i
|
||||
Right Nothing -> return $ AddResponse i
|
||||
Right (Just (AddRequest x y)) -> go recv (i+x+y)
|
||||
|
||||
addServerStreamHandler :: Handler 'ServerStreaming
|
||||
addServerStreamHandler =
|
||||
ServerStreamHandler addServerStreamMethod $
|
||||
\c send -> do
|
||||
let AddRequest (Fixed x) y = payload c
|
||||
replicateM_ (fromIntegral x) $ send $ AddResponse y
|
||||
return (mempty, StatusOk, "")
|
||||
|
||||
addBiDiHandler :: Handler 'BiDiStreaming
|
||||
addBiDiHandler = BiDiStreamHandler addBiDiMethod (go 0)
|
||||
where go :: Fixed Word32 -> ServerRWHandler AddRequest AddResponse
|
||||
go !i c recv send = do
|
||||
req <- recv
|
||||
case req of
|
||||
Left _ -> return (mempty, StatusOk, "")
|
||||
Right Nothing -> return (mempty, StatusOk, "")
|
||||
Right (Just (AddRequest x y)) -> do
|
||||
let curr = i + x + y
|
||||
void $ send $ AddResponse curr
|
||||
go curr c recv send
|
||||
|
||||
|
||||
serverOpts :: ServerOptions
|
||||
serverOpts =
|
||||
defaultOptions{optNormalHandlers = [addHandler]
|
||||
, optClientStreamHandlers = [addClientStreamHandler]
|
||||
, optServerStreamHandlers = [addServerStreamHandler]
|
||||
, optBiDiStreamHandlers = [addBiDiHandler]}
|
||||
|
||||
threadDelaySecs :: Int -> IO ()
|
||||
threadDelaySecs = threadDelay . (* 10^(6::Int))
|
||||
|
||||
main :: IO ()
|
||||
main = bracket startServer stopServer $ const $ withGRPC $ \grpc ->
|
||||
withClient grpc (ClientConfig "localhost" 50051 []) $ \c -> do
|
||||
rmAdd <- clientRegisterMethodNormal c addMethod
|
||||
rmClientStream <- clientRegisterMethodClientStreaming c addClientStreamMethod
|
||||
rmServerStream <- clientRegisterMethodServerStreaming c addServerStreamMethod
|
||||
rmBiDiStream <- clientRegisterMethodBiDiStreaming c addBiDiMethod
|
||||
defaultMainWith
|
||||
defaultConfig{reportFile = Just "benchmarks.html"}
|
||||
[ bench "unary request" $ nfIO (addRequest c rmAdd)
|
||||
, bench "client stream: 100 messages" $ nfIO (addClientStream c rmClientStream 100)
|
||||
, bench "client stream: 1k messages" $ nfIO (addClientStream c rmClientStream 1000)
|
||||
, bench "client stream: 10k messages" $ nfIO (addClientStream c rmClientStream 10000)
|
||||
, bench "server stream: 100 messages" $ nfIO (addServerStream c rmServerStream 100)
|
||||
, bench "server stream: 1k messages" $ nfIO (addServerStream c rmServerStream 1000)
|
||||
, bench "server stream: 10k messages" $ nfIO (addServerStream c rmServerStream 10000)
|
||||
, bench "bidi stream: 50 messages up, 50 down" $ nfIO (bidiStream c rmBiDiStream 50)
|
||||
, bench "bidi stream: 500 message up, 500 down" $ nfIO (bidiStream c rmBiDiStream 500)
|
||||
, bench "bidi stream: 5000 messages up, 5000 down" $ nfIO (bidiStream c rmBiDiStream 5000)]
|
||||
|
||||
where startServer = do
|
||||
sThrd <- async $ serverLoop serverOpts
|
||||
threadDelaySecs 1
|
||||
return sThrd
|
||||
|
||||
stopServer sThrd = cancel sThrd >> void (waitCatch sThrd)
|
||||
|
||||
encode = BL.toStrict . toLazyByteString
|
||||
|
||||
addRequest c rmAdd = do
|
||||
x <- liftM Fixed $ randomRIO (0,1000)
|
||||
y <- liftM Fixed $ randomRIO (0,1000)
|
||||
let addEnc = BL.toStrict . toLazyByteString $ AddRequest x y
|
||||
clientRequest c rmAdd 5 addEnc mempty >>= \case
|
||||
Left e -> fail $ "Got client error on add request: " ++ show e
|
||||
Right r -> case fromByteString (rspBody r) of
|
||||
Left e -> fail $ "failed to decode add response: " ++ show e
|
||||
Right dec
|
||||
| dec == AddResponse (x + y) -> return ()
|
||||
| otherwise -> fail $ "Got wrong add answer: " ++ show dec ++ "; expected: " ++ show x ++ " + " ++ show y ++ " = " ++ show (x+y)
|
||||
|
||||
addClientStream c rm i = do
|
||||
let msg = encode $ AddRequest 1 0
|
||||
Right (Just r,_,_,_,_) <- clientWriter c rm 5 mempty $ \send -> do
|
||||
replicateM_ i $ send msg
|
||||
let decoded = fromByteString r
|
||||
when (decoded /= Right (AddResponse (fromIntegral i))) $
|
||||
fail $ "clientStream: bad answer: " ++ show decoded ++ "; expected: " ++ show i
|
||||
|
||||
addServerStream c rm i = do
|
||||
let msg = encode $ AddRequest (fromIntegral i) 2
|
||||
Right (_, _, sd) <- clientReader c rm 5 msg mempty $ \_ recv ->
|
||||
replicateM_ i $ do
|
||||
Right (Just bs) <- recv
|
||||
let Right decoded = fromByteString bs
|
||||
when (decoded /= AddResponse 2) $
|
||||
fail $ "serverStream: bad response of " ++ show decoded ++ "; expected 2."
|
||||
when (sd /= mempty) $ fail $ "bad status details: " ++ show sd
|
||||
|
||||
bidiStream c rm i = do
|
||||
Right (_, _, sd) <- clientRW c rm 5 mempty $ \_ recv send ->
|
||||
forM_ (take i [2,4..]) $ \n -> do
|
||||
void $ send $ encode $ AddRequest 1 1
|
||||
Right (Just bs) <- recv
|
||||
let Right decoded = fromByteString bs
|
||||
when (decoded /= AddResponse n) $
|
||||
fail $ "bidiStream: got: " ++ show decoded ++ "expected: " ++ show n
|
||||
when (sd /= mempty) $ fail $ "bad StatusDetails: " ++ show sd
|
1074
benchmarks.html
1074
benchmarks.html
File diff suppressed because one or more lines are too long
|
@ -36,10 +36,10 @@ library
|
|||
, proto3-wire
|
||||
, protobuf-wire
|
||||
|
||||
, async
|
||||
, async ==2.1.*
|
||||
, tasty >= 0.11 && <0.12
|
||||
, tasty-hunit >= 0.9 && <0.10
|
||||
, safe
|
||||
, safe ==0.3.*
|
||||
, vector
|
||||
|
||||
c-sources:
|
||||
|
@ -198,3 +198,21 @@ test-suite test
|
|||
if flag(debug)
|
||||
CPP-Options: -DDEBUG
|
||||
CC-Options: -DGRPC_HASKELL_DEBUG
|
||||
|
||||
benchmark bench
|
||||
type: exitcode-stdio-1.0
|
||||
build-depends:
|
||||
base ==4.8.*
|
||||
, grpc-haskell
|
||||
, async ==2.1.*
|
||||
, criterion ==1.1.*
|
||||
, protobuf-wire
|
||||
, bytestring ==0.10.*
|
||||
, random >=1.0.0
|
||||
hs-source-dirs: bench
|
||||
main-is: Bench.hs
|
||||
ghc-options: -Wall -O2 -threaded -rtsopts -with-rtsopts=-N
|
||||
if flag(debug)
|
||||
CPP-Options: -DDEBUG
|
||||
CC-Options: -DGRPC_HASKELL_DEBUG
|
||||
default-language: Haskell2010
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
|
||||
module Network.GRPC.HighLevel.Server.Unregistered where
|
||||
|
||||
import Control.Concurrent
|
||||
import Control.Arrow
|
||||
import qualified Control.Exception as CE
|
||||
import Control.Monad
|
||||
|
@ -81,7 +82,9 @@ dispatchLoop server meta hN hC hS hB =
|
|||
|
||||
serverLoop :: ServerOptions -> IO ()
|
||||
serverLoop ServerOptions{..} =
|
||||
withGRPC $ \grpc ->
|
||||
-- We run the loop in a new thread so that we can kill the serverLoop thread.
|
||||
-- Without this fork, we block on a foreign call, which can't be interrupted.
|
||||
void $ forkIO $ withGRPC $ \grpc ->
|
||||
withServer grpc config $ \server -> do
|
||||
dispatchLoop server
|
||||
optInitialMetadata
|
||||
|
|
|
@ -9,7 +9,7 @@ packages:
|
|||
- '.'
|
||||
- location:
|
||||
git: git@github.mv.awakenetworks.net:awakenetworks/protobuf-wire.git
|
||||
commit: 62aa5b92f21883d14bf8d3beed5645f84da01ad6
|
||||
commit: 676a99af41a664660d269c475832301873062a37
|
||||
extra-dep: true
|
||||
- location:
|
||||
git: git@github.com:awakenetworks/proto3-wire.git
|
||||
|
|
Loading…
Reference in a new issue