servant/servant-client/test/Servant/StreamSpec.hs
Oleg Grenrus 45c1cbdfd5 Refactor Stream stuff
- Introduce SourceT, which is simple variant of "correct `ListT`".
  There are another variants possible (like in `streaming`),
  but I'm not sure there's much real difference.

- Introduce `Codensity`. There's a flag if people don't want to depend
  on `kan-extensions`.

- `StreamGenerator` and `ResultStream` are both `SourceT`.
  `Stream` combinator in `servant-client` uses `Codensity` for CPS.

- Add servant-machines, servant-conduit, servant-pipes
- Add streaming cookbook: just code, no explanations.
- Add a script to run streaming 'benchmarks'
2018-11-05 15:48:47 +02:00

175 lines
5.8 KiB
Haskell

{-# LANGUAGE CPP #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE FunctionalDependencies #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -freduction-depth=100 #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# OPTIONS_GHC -fno-warn-name-shadowing #-}
module Servant.StreamSpec (spec) where
import Control.Monad
(when)
import Control.Monad.Codensity
(Codensity (..))
import Control.Monad.IO.Class
(MonadIO (..))
import Control.Monad.Trans.Except
import qualified Data.ByteString as BS
import Data.Proxy
import qualified Data.TDigest as TD
import qualified Network.HTTP.Client as C
import Prelude ()
import Prelude.Compat
import Servant.API
((:<|>) ((:<|>)), (:>), JSON, NetstringFraming,
NewlineFraming, NoFraming, OctetStream, SourceIO, StreamGet)
import Servant.Client
import Servant.ClientSpec
(Person (..))
import qualified Servant.ClientSpec as CS
import Servant.Server
import Servant.Types.SourceT
import System.Entropy
(getEntropy, getHardwareEntropy)
import System.IO.Unsafe
(unsafePerformIO)
import System.Mem
(performGC)
import Test.Hspec
#if MIN_VERSION_base(4,10,0)
import GHC.Stats
(gc, gcdetails_live_bytes, getRTSStats)
#else
import GHC.Stats
(currentBytesUsed, getGCStats)
#endif
spec :: Spec
spec = describe "Servant.Stream" $ do
streamSpec
type StreamApi =
"streamGetNewline" :> StreamGet NewlineFraming JSON (SourceIO Person)
:<|> "streamGetNetstring" :> StreamGet NetstringFraming JSON (SourceIO Person)
:<|> "streamALot" :> StreamGet NoFraming OctetStream (SourceIO BS.ByteString)
api :: Proxy StreamApi
api = Proxy
getGetNL, getGetNS :: ClientM (Codensity IO (SourceIO Person))
getGetALot :: ClientM (Codensity IO (SourceIO BS.ByteString))
getGetNL :<|> getGetNS :<|> getGetALot = client api
alice :: Person
alice = Person "Alice" 42
bob :: Person
bob = Person "Bob" 25
server :: Application
server = serve api
$ return (source [alice, bob, alice])
:<|> return (source [alice, bob, alice])
-- 2 ^ (18 + 10) = 256M
:<|> return (SourceT ($ lots (powerOfTwo 18)))
where
lots n
| n < 0 = Stop
| otherwise = Effect $ do
let size = powerOfTwo 10
mbs <- getHardwareEntropy size
bs <- maybe (getEntropy size) pure mbs
return (Yield bs (lots (n - 1)))
powerOfTwo :: Int -> Int
powerOfTwo = (2 ^)
{-# NOINLINE manager' #-}
manager' :: C.Manager
manager' = unsafePerformIO $ C.newManager C.defaultManagerSettings
runClient :: ClientM a -> BaseUrl -> IO (Either ServantError a)
runClient x baseUrl' = runClientM x (mkClientEnv manager' baseUrl')
testRunSourceIO :: Codensity IO (SourceIO a)
-> IO (Either String [a])
testRunSourceIO = runExceptT . runSourceT . joinCodensitySourceT
joinCodensitySourceT :: Codensity m (SourceT m a) -> SourceT m a
joinCodensitySourceT cod =
SourceT $ \r ->
runCodensity cod $ \src ->
unSourceT src r
streamSpec :: Spec
streamSpec = beforeAll (CS.startWaiApp server) $ afterAll CS.endWaiApp $ do
it "works with Servant.API.StreamGet.Newline" $ \(_, baseUrl) -> do
Right res <- runClient getGetNL baseUrl
testRunSourceIO res `shouldReturn` Right [alice, bob, alice]
it "works with Servant.API.StreamGet.Netstring" $ \(_, baseUrl) -> do
Right res <- runClient getGetNS baseUrl
testRunSourceIO res `shouldReturn` Right [alice, bob, alice]
{-
it "streams in constant memory" $ \(_, baseUrl) -> do
Right rs <- runClient getGetALot baseUrl
performGC
-- usage0 <- getUsage
-- putStrLn $ "Start: " ++ show usage0
tdigest <- memoryUsage $ joinCodensitySourceT rs
-- putStrLn $ "Median: " ++ show (TD.median tdigest)
-- putStrLn $ "Mean: " ++ show (TD.mean tdigest)
-- putStrLn $ "Stddev: " ++ show (TD.stddev tdigest)
-- forM_ [0.01, 0.1, 0.2, 0.5, 0.8, 0.9, 0.99] $ \q ->
-- putStrLn $ "q" ++ show q ++ ": " ++ show (TD.quantile q tdigest)
let Just stddev = TD.stddev tdigest
-- standard deviation of 100k is ok, we generate 256M of data after all.
-- On my machine deviation is 40k-50k
stddev `shouldSatisfy` (< 100000)
memoryUsage :: SourceT IO BS.ByteString -> IO (TD.TDigest 25)
memoryUsage src = unSourceT src $ loop mempty (0 :: Int)
where
loop !acc !_ Stop = return acc
loop !_ !_ (Error err) = fail err -- !
loop !acc !n (Skip s) = loop acc n s
loop !acc !n (Effect ms) = ms >>= loop acc n
loop !acc !n (Yield _bs s) = do
usage <- liftIO getUsage
-- We perform GC in between as we generate garbage.
when (n `mod` 1024 == 0) $ liftIO performGC
loop (TD.insert usage acc) (n + 1) s
getUsage :: IO Double
getUsage = fromIntegral .
#if MIN_VERSION_base(4,10,0)
gcdetails_live_bytes . gc <$> getRTSStats
#else
currentBytesUsed <$> getGCStats
#endif
memUsed `shouldSatisfy` (< megabytes 22)
megabytes :: Num a => a -> a
megabytes n = n * (1000 ^ (2 :: Int))
-}