servant/servant-client/test/Servant/StreamSpec.hs
2019-03-31 13:21:17 +01:00

183 lines
6.4 KiB
Haskell

{-# LANGUAGE CPP #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE FunctionalDependencies #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -freduction-depth=100 #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# OPTIONS_GHC -fno-warn-name-shadowing #-}
module Servant.StreamSpec (spec) where
import Control.Monad
(when)
import Control.Monad.Codensity
(Codensity (..))
import Control.Monad.IO.Class
(MonadIO (..))
import Control.Monad.Trans.Except
import qualified Data.ByteString as BS
import Data.Proxy
import qualified Data.TDigest as TD
import qualified Network.HTTP.Client as C
import Prelude ()
import Prelude.Compat
import Servant.API
((:<|>) ((:<|>)), (:>), JSON, NetstringFraming, StreamBody,
NewlineFraming, NoFraming, OctetStream, SourceIO, StreamGet,
)
import Servant.Client.Streaming
import Servant.Server
import Servant.Test.ComprehensiveAPI
import Servant.Types.SourceT
import System.Entropy
(getEntropy, getHardwareEntropy)
import System.IO.Unsafe
(unsafePerformIO)
import System.Mem
(performGC)
import Test.Hspec
import Servant.ClientTestUtils (Person(..))
import qualified Servant.ClientTestUtils as CT
#if MIN_VERSION_base(4,10,0)
import GHC.Stats
(gc, gcdetails_live_bytes, getRTSStats)
#else
import GHC.Stats
(currentBytesUsed, getGCStats)
#endif
-- This declaration simply checks that all instances are in place.
-- Note: this is streaming client
_ = client comprehensiveAPI
spec :: Spec
spec = describe "Servant.Client.Streaming" $ do
streamSpec
type StreamApi =
"streamGetNewline" :> StreamGet NewlineFraming JSON (SourceIO Person)
:<|> "streamGetNetstring" :> StreamGet NetstringFraming JSON (SourceIO Person)
:<|> "streamALot" :> StreamGet NoFraming OctetStream (SourceIO BS.ByteString)
:<|> "streamBody" :> StreamBody NoFraming OctetStream (SourceIO BS.ByteString) :> StreamGet NoFraming OctetStream (SourceIO BS.ByteString)
api :: Proxy StreamApi
api = Proxy
getGetNL, getGetNS :: ClientM (SourceIO Person)
getGetALot :: ClientM (SourceIO BS.ByteString)
getStreamBody :: SourceT IO BS.ByteString -> ClientM (SourceIO BS.ByteString)
getGetNL :<|> getGetNS :<|> getGetALot :<|> getStreamBody = client api
alice :: Person
alice = Person "Alice" 42
bob :: Person
bob = Person "Bob" 25
server :: Application
server = serve api
$ return (source [alice, bob, alice])
:<|> return (source [alice, bob, alice])
-- 2 ^ (18 + 10) = 256M
:<|> return (SourceT ($ lots (powerOfTwo 18)))
:<|> return
where
lots n
| n < 0 = Stop
| otherwise = Effect $ do
let size = powerOfTwo 10
mbs <- getHardwareEntropy size
bs <- maybe (getEntropy size) pure mbs
return (Yield bs (lots (n - 1)))
powerOfTwo :: Int -> Int
powerOfTwo = (2 ^)
{-# NOINLINE manager' #-}
manager' :: C.Manager
manager' = unsafePerformIO $ C.newManager C.defaultManagerSettings
withClient :: ClientM a -> BaseUrl -> (Either ClientError a -> IO r) -> IO r
withClient x baseUrl' = withClientM x (mkClientEnv manager' baseUrl')
testRunSourceIO :: SourceIO a
-> IO (Either String [a])
testRunSourceIO = runExceptT . runSourceT
streamSpec :: Spec
streamSpec = beforeAll (CT.startWaiApp server) $ afterAll CT.endWaiApp $ do
it "works with Servant.API.StreamGet.Newline" $ \(_, baseUrl) -> do
withClient getGetNL baseUrl $ \(Right res) ->
testRunSourceIO res `shouldReturn` Right [alice, bob, alice]
it "works with Servant.API.StreamGet.Netstring" $ \(_, baseUrl) -> do
withClient getGetNS baseUrl $ \(Right res) ->
testRunSourceIO res `shouldReturn` Right [alice, bob, alice]
it "works with Servant.API.StreamBody" $ \(_, baseUrl) -> do
withClient (getStreamBody (source input)) baseUrl $ \(Right res) ->
testRunSourceIO res `shouldReturn` Right output
where
input = ["foo", "", "bar"]
output = ["foo", "bar"]
{-
it "streams in constant memory" $ \(_, baseUrl) -> do
Right rs <- runClient getGetALot baseUrl
performGC
-- usage0 <- getUsage
-- putStrLn $ "Start: " ++ show usage0
tdigest <- memoryUsage $ joinCodensitySourceT rs
-- putStrLn $ "Median: " ++ show (TD.median tdigest)
-- putStrLn $ "Mean: " ++ show (TD.mean tdigest)
-- putStrLn $ "Stddev: " ++ show (TD.stddev tdigest)
-- forM_ [0.01, 0.1, 0.2, 0.5, 0.8, 0.9, 0.99] $ \q ->
-- putStrLn $ "q" ++ show q ++ ": " ++ show (TD.quantile q tdigest)
let Just stddev = TD.stddev tdigest
-- standard deviation of 100k is ok, we generate 256M of data after all.
-- On my machine deviation is 40k-50k
stddev `shouldSatisfy` (< 100000)
memoryUsage :: SourceT IO BS.ByteString -> IO (TD.TDigest 25)
memoryUsage src = unSourceT src $ loop mempty (0 :: Int)
where
loop !acc !_ Stop = return acc
loop !_ !_ (Error err) = fail err -- !
loop !acc !n (Skip s) = loop acc n s
loop !acc !n (Effect ms) = ms >>= loop acc n
loop !acc !n (Yield _bs s) = do
usage <- liftIO getUsage
-- We perform GC in between as we generate garbage.
when (n `mod` 1024 == 0) $ liftIO performGC
loop (TD.insert usage acc) (n + 1) s
getUsage :: IO Double
getUsage = fromIntegral .
#if MIN_VERSION_base(4,10,0)
gcdetails_live_bytes . gc <$> getRTSStats
#else
currentBytesUsed <$> getGCStats
#endif
memUsed `shouldSatisfy` (< megabytes 22)
megabytes :: Num a => a -> a
megabytes n = n * (1000 ^ (2 :: Int))
-}