gRPC-haskell/src/Network/GRPC/LowLevel/Op.hs
Joel Stanley 96d12c1e6c Preliminary streaming mode support (client streaming, server streaming, bidirectional) (#37)
* Tweak runOps param order, inline common op sequences, clean up serverHandleNormalCall

* More ops sequence inlining for clarity, experimenting with Managed

* Checkpoint: preliminary support for all streaming modes; much cleanup/refactoring and api design still needed

* Use mempty for default StatusDetails; tweak bad status matching mechanism

* Preliminary user-facing, server-streaming, low-level api and test

* renaming wibbles

* Preliminary user-facing, client-streaming, low-level api and test

* Move sendMsgs comb to Network.GRPC.LowLevel.Op; misc cleanup/DCR

* Modify bidi streaming to omit request payload

* Add transformers dep

* Preliminary user-facing low-level bidirectional streaming api and test

* Fix missing peek import

* Remove TimeoutSeconds params on streaming mode functions

* Fix serverHandleNormalCall rebase wart

* Fix rebase warts; minor hlint fixes and wibbles

* Post-rebase tweaks to optional payload use in serverRequestCall (i.e., now respects payloadHandling again)

* Cleanup/refactor serverRequestCall

* Fix comment

* Change ServerRWHandler type so that handler does not have to invoke a finalizer

* Change ServerReaderHandler type so that handler does not have to invoke a finalizer

* Simplify serverWriter interface and ServerWriterHandler structure

* Simplify serverRW (get rid of exec param), improve bidi streaming tests

* Use ExceptT in serverRW impl

* Change ServerRWHandler type to pass recv/send operations.

* Renaming

* Define ClientRWHandler, pass recv/send ops

* wibbles

* Use ExceptT in clientRW impl

* Add DataKinded phantom typing to RegisteredMethod; misc cleanup

* Simplify sendMsgs interface; add SingleSend type and related helpers

* Rename SingleSend to SendSingle, use ExceptT to clean up {client,server}Writer and sendMsgs

* More ExceptT cleanup in clientWriter

* Factor out reusable bits of clientWriter

* Shrink ServerReaderHandler

* Delete stale comments

* Use common machinery for all streaming modes; make handler types more consistent

* wibbles
2016-07-06 08:59:38 -05:00

357 lines
15 KiB
Haskell

{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ViewPatterns #-}
module Network.GRPC.LowLevel.Op where
import Control.Arrow
import Control.Exception
import Control.Monad
import Control.Monad.Trans.Class (MonadTrans(lift))
import Control.Monad.Trans.Except
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes)
import Foreign.C.String (CString)
import Foreign.C.Types (CInt)
import Foreign.Marshal.Alloc (free, malloc,
mallocBytes)
import Foreign.Ptr (Ptr, nullPtr)
import Foreign.Storable (peek, poke)
import Network.GRPC.LowLevel.CompletionQueue
import Network.GRPC.LowLevel.GRPC
import qualified Network.GRPC.Unsafe as C (Call)
import qualified Network.GRPC.Unsafe.ByteBuffer as C
import qualified Network.GRPC.Unsafe.Metadata as C
import qualified Network.GRPC.Unsafe.Op as C
import qualified Network.GRPC.Unsafe.Slice as C (Slice, freeSlice)
import Pipes ((>->))
import qualified Pipes as P
import qualified Pipes.Core as P
-- | Sum describing all possible send and receive operations that can be batched
-- and executed by gRPC. Usually these are processed in a handful of
-- combinations depending on the 'MethodType' of the call being run.
data Op = OpSendInitialMetadata MetadataMap
| OpSendMessage B.ByteString
| OpSendCloseFromClient
| OpSendStatusFromServer MetadataMap C.StatusCode StatusDetails
| OpRecvInitialMetadata
| OpRecvMessage
| OpRecvStatusOnClient
| OpRecvCloseOnServer
deriving (Eq, Show)
-- | Container holding the pointers to the C and gRPC data needed to execute the
-- corresponding 'Op'. These are obviously unsafe, and should only be used with
-- 'withOpContexts'.
data OpContext =
OpSendInitialMetadataContext C.MetadataKeyValPtr Int
| OpSendMessageContext (C.ByteBuffer, C.Slice)
| OpSendCloseFromClientContext
| OpSendStatusFromServerContext C.MetadataKeyValPtr Int C.StatusCode
B.ByteString
| OpRecvInitialMetadataContext (Ptr C.MetadataArray)
| OpRecvMessageContext (Ptr C.ByteBuffer)
| OpRecvStatusOnClientContext (Ptr C.MetadataArray) (Ptr C.StatusCode)
(Ptr CString)
| OpRecvCloseOnServerContext (Ptr CInt)
deriving Show
-- | Length we pass to gRPC for receiving status details
-- when processing 'OpRecvStatusOnClient'. It appears that gRPC actually ignores
-- this length and reallocates a longer string if necessary.
defaultStatusStringLen :: Int
defaultStatusStringLen = 128
-- | Allocates and initializes the 'Opcontext' corresponding to the given 'Op'.
createOpContext :: Op -> IO OpContext
createOpContext (OpSendInitialMetadata m) =
OpSendInitialMetadataContext
<$> C.createMetadata m
<*> return (M.size m)
createOpContext (OpSendMessage bs) =
fmap OpSendMessageContext (C.createByteBuffer bs)
createOpContext (OpSendCloseFromClient) = return OpSendCloseFromClientContext
createOpContext (OpSendStatusFromServer m code (StatusDetails str)) =
OpSendStatusFromServerContext
<$> C.createMetadata m
<*> return (M.size m)
<*> return code
<*> return str
createOpContext OpRecvInitialMetadata =
fmap OpRecvInitialMetadataContext C.metadataArrayCreate
createOpContext OpRecvMessage =
fmap OpRecvMessageContext C.createReceivingByteBuffer
createOpContext OpRecvStatusOnClient = do
pmetadata <- C.metadataArrayCreate
pstatus <- C.createStatusCodePtr
pstr <- malloc
cstring <- mallocBytes defaultStatusStringLen
poke pstr cstring
return $ OpRecvStatusOnClientContext pmetadata pstatus pstr
createOpContext OpRecvCloseOnServer =
fmap OpRecvCloseOnServerContext $ malloc
-- | Mutates the given raw array of ops at the given index according to the
-- given 'OpContext'.
setOpArray :: C.OpArray -> Int -> OpContext -> IO ()
setOpArray arr i (OpSendInitialMetadataContext kvs l) =
C.opSendInitialMetadata arr i kvs l
setOpArray arr i (OpSendMessageContext (bb,_)) =
C.opSendMessage arr i bb
setOpArray arr i OpSendCloseFromClientContext =
C.opSendCloseClient arr i
setOpArray arr i (OpSendStatusFromServerContext kvs l code details) =
B.useAsCString details $ \cstr ->
C.opSendStatusServer arr i l kvs code cstr
setOpArray arr i (OpRecvInitialMetadataContext pmetadata) =
C.opRecvInitialMetadata arr i pmetadata
setOpArray arr i (OpRecvMessageContext pbb) =
C.opRecvMessage arr i pbb
setOpArray arr i (OpRecvStatusOnClientContext pmetadata pstatus pstr) = do
C.opRecvStatusClient arr i pmetadata pstatus pstr defaultStatusStringLen
setOpArray arr i (OpRecvCloseOnServerContext pcancelled) = do
C.opRecvCloseServer arr i pcancelled
-- | Cleans up an 'OpContext'.
freeOpContext :: OpContext -> IO ()
freeOpContext (OpSendInitialMetadataContext m _) = C.metadataFree m
freeOpContext (OpSendMessageContext (bb, s)) =
C.grpcByteBufferDestroy bb >> C.freeSlice s
freeOpContext OpSendCloseFromClientContext = return ()
freeOpContext (OpSendStatusFromServerContext metadata _ _ _) =
C.metadataFree metadata
freeOpContext (OpRecvInitialMetadataContext metadata) =
C.metadataArrayDestroy metadata
freeOpContext (OpRecvMessageContext pbb) =
C.destroyReceivingByteBuffer pbb
freeOpContext (OpRecvStatusOnClientContext metadata pcode pstr) = do
C.metadataArrayDestroy metadata
C.destroyStatusCodePtr pcode
str <- peek pstr
free str
free pstr
freeOpContext (OpRecvCloseOnServerContext pcancelled) =
grpcDebug ("freeOpContext: freeing pcancelled: " ++ show pcancelled)
>> free pcancelled
-- | Allocates an `OpArray` and a list of `OpContext`s from the given list of
-- `Op`s.
withOpArrayAndCtxts :: [Op] -> ((C.OpArray, [OpContext]) -> IO a) -> IO a
withOpArrayAndCtxts ops = bracket setup teardown
where setup = do ctxts <- mapM createOpContext ops
let l = length ops
arr <- C.opArrayCreate l
sequence_ $ zipWith (setOpArray arr) [0..l-1] ctxts
return (arr, ctxts)
teardown (arr, ctxts) = do C.opArrayDestroy arr (length ctxts)
mapM_ freeOpContext ctxts
-- | Container holding GC-managed results for 'Op's which receive data.
data OpRecvResult =
OpRecvInitialMetadataResult MetadataMap
| OpRecvMessageResult (Maybe B.ByteString)
-- ^ If a streaming call is in progress and the stream terminates normally,
-- or If the client or server dies, we might not receive a response body, in
-- which case this will be 'Nothing'.
| OpRecvStatusOnClientResult MetadataMap C.StatusCode B.ByteString
| OpRecvCloseOnServerResult Bool -- ^ True if call was cancelled.
deriving (Eq, Show)
-- | For the given 'OpContext', if the 'Op' receives data, copies the data out
-- of the 'OpContext' and into GC-managed Haskell types. After this, it is safe
-- to destroy the 'OpContext'.
resultFromOpContext :: OpContext -> IO (Maybe OpRecvResult)
resultFromOpContext (OpRecvInitialMetadataContext pmetadata) = do
grpcDebug "resultFromOpContext: OpRecvInitialMetadataContext"
metadata <- peek pmetadata
metadataMap <- C.getAllMetadataArray metadata
return $ Just $ OpRecvInitialMetadataResult metadataMap
resultFromOpContext (OpRecvMessageContext pbb) = do
grpcDebug "resultFromOpContext: OpRecvMessageContext"
bb@(C.ByteBuffer bbptr) <- peek pbb
if bbptr == nullPtr
then return $ Just $ OpRecvMessageResult Nothing
else do bs <- C.copyByteBufferToByteString bb
grpcDebug "resultFromOpContext: bb copied."
return $ Just $ OpRecvMessageResult (Just bs)
resultFromOpContext (OpRecvStatusOnClientContext pmetadata pcode pstr) = do
grpcDebug "resultFromOpContext: OpRecvStatusOnClientContext"
metadata <- peek pmetadata
metadataMap <- C.getAllMetadataArray metadata
code <- C.derefStatusCodePtr pcode
cstr <- peek pstr
statusInfo <- B.packCString cstr
return $ Just $ OpRecvStatusOnClientResult metadataMap code statusInfo
resultFromOpContext (OpRecvCloseOnServerContext pcancelled) = do
grpcDebug "resultFromOpContext: OpRecvCloseOnServerContext"
cancelled <- fmap (\x -> if x > 0 then True else False)
(peek pcancelled)
return $ Just $ OpRecvCloseOnServerResult cancelled
resultFromOpContext _ = do
grpcDebug "resultFromOpContext: saw non-result op type."
return Nothing
-- | For a given call, run the given 'Op's on the given completion queue with
-- the given tag. Blocks until the ops are complete or the deadline on the
-- associated call has been reached.
-- TODO: now that we distinguish between different types
-- of calls at the type level, we could try to limit the input 'Op's more
-- appropriately. E.g., we don't use an 'OpRecvInitialMetadata' when receiving a
-- registered call, because gRPC handles that for us.
-- TODO: the list of 'Op's type is less specific than it could be. There are
-- only a few different sequences of 'Op's we will see in practice. Once we
-- figure out what those are, we should create a more specific sum
-- type. However, since ops can fail, the list of 'OpRecvResult' returned by
-- 'runOps' can vary in their contents and are perhaps less amenable to
-- simplification. In the meantime, from looking at the core tests, it looks
-- like it is safe to say that we always get a
-- GRPC_CALL_ERROR_TOO_MANY_OPERATIONS error if we use the same 'Op' twice in
-- the same batch, so we might want to change the list to a set. I don't think
-- order matters within a batch. Need to check.
runOps :: C.Call
-- ^ 'Call' that this batch is associated with. One call can be
-- associated with many batches.
-> CompletionQueue
-- ^ Queue on which our tag will be placed once our ops are done
-- running.
-> [Op]
-- ^ The list of 'Op's to execute.
-> IO (Either GRPCIOError [OpRecvResult])
runOps call cq ops =
let l = length ops in
withOpArrayAndCtxts ops $ \(opArray, contexts) -> do
grpcDebug $ "runOps: allocated op contexts: " ++ show contexts
tag <- newTag cq
grpcDebug $ "runOps: tag: " ++ show tag
callError <- startBatch cq call opArray l tag
grpcDebug $ "runOps: called start_batch. callError: "
++ (show callError)
case callError of
Left x -> return $ Left x
Right () -> do
ev <- pluck cq tag Nothing
grpcDebug $ "runOps: pluck returned " ++ show ev
case ev of
Right () -> do
grpcDebug "runOps: got good op; starting."
fmap (Right . catMaybes) $ mapM resultFromOpContext contexts
Left err -> return $ Left err
runOps' :: C.Call
-> CompletionQueue
-> [Op]
-> ExceptT GRPCIOError IO [OpRecvResult]
runOps' c cq = ExceptT . runOps c cq
-- | If response status info is present in the given 'OpRecvResult's, returns
-- a tuple of trailing metadata, status code, and status details.
extractStatusInfo :: [OpRecvResult]
-> Maybe (MetadataMap, C.StatusCode, B.ByteString)
extractStatusInfo [] = Nothing
extractStatusInfo (OpRecvStatusOnClientResult meta code details:_) =
Just (meta, code, details)
extractStatusInfo (_:xs) = extractStatusInfo xs
--------------------------------------------------------------------------------
-- Types and helpers for common ops batches
type SendSingle a
= C.Call
-> CompletionQueue
-> a
-> ExceptT GRPCIOError IO ()
type RecvSingle a
= C.Call
-> CompletionQueue
-> ExceptT GRPCIOError IO a
sendSingle :: SendSingle Op
sendSingle c cq op = void (runOps' c cq [op])
sendInitialMetadata :: SendSingle MetadataMap
sendInitialMetadata c cq = sendSingle c cq . OpSendInitialMetadata
sendStatusFromServer :: SendSingle (MetadataMap, C.StatusCode, StatusDetails)
sendStatusFromServer c cq (md, st, ds) =
sendSingle c cq (OpSendStatusFromServer md st ds)
recvInitialMetadata :: RecvSingle MetadataMap
recvInitialMetadata c cq = runOps' c cq [OpRecvInitialMetadata] >>= \case
[OpRecvInitialMetadataResult md]
-> return md
_ -> throwE (GRPCIOInternalUnexpectedRecv "recvInitialMetadata")
recvStatusOnClient :: RecvSingle (MetadataMap, C.StatusCode, StatusDetails)
recvStatusOnClient c cq = runOps' c cq [OpRecvStatusOnClient] >>= \case
[OpRecvStatusOnClientResult md st ds]
-> return (md, st, StatusDetails ds)
_ -> throwE (GRPCIOInternalUnexpectedRecv "recvStatusOnClient")
--------------------------------------------------------------------------------
-- Streaming types and helpers
-- | Requests use Nothing to denote read, Just to denote
-- write. Right-constructed responses use Just to indicate a successful read,
-- and Nothing to denote end of stream when reading or a successful write.
type Streaming a =
P.Client (Maybe ByteString) (Either GRPCIOError (Maybe ByteString)) IO a
-- | Run the given 'Streaming' operation via an appropriate upstream
-- proxy. I.e., if called on the client side, the given 'Streaming' operation
-- talks to a server proxy, and vice versa.
runStreamingProxy :: String
-- ^ context string for including in errors
-> C.Call
-- ^ the call associated with this streaming operation
-> CompletionQueue
-- ^ the completion queue for ops batches
-> Streaming a
-- ^ the requesting side of the streaming operation
-> ExceptT GRPCIOError IO a
runStreamingProxy nm c cq
= ExceptT . P.runEffect . (streamingProxy nm c cq P.+>>) . fmap Right
streamingProxy :: String
-- ^ context string for including in errors
-> C.Call
-- ^ the call associated with this streaming operation
-> CompletionQueue
-- ^ the completion queue for ops batches
-> Maybe ByteString
-- ^ the request to the proxy
-> P.Server
(Maybe ByteString)
(Either GRPCIOError (Maybe ByteString))
IO (Either GRPCIOError a)
streamingProxy nm c cq = maybe recv send
where
recv = run [OpRecvMessage] >>= \case
RecvMsgRslt mr -> rsp mr >>= streamingProxy nm c cq
Right{} -> err (urecv "recv")
Left e -> err e
send msg = run [OpSendMessage msg] >>= \case
Right [] -> rsp Nothing >>= streamingProxy nm c cq
Right _ -> err (urecv "send")
Left e -> err e
err e = P.respond (Left e) >> return (Left e)
rsp = P.respond . Right
run = lift . runOps c cq
urecv = GRPCIOInternalUnexpectedRecv . (nm ++)
type StreamRecv = Streaming (Either GRPCIOError (Maybe ByteString))
streamRecv :: StreamRecv
streamRecv = P.request Nothing
type StreamSend = ByteString -> Streaming (Either GRPCIOError ())
streamSend :: StreamSend
streamSend = fmap void . P.request . Just
pattern RecvMsgRslt mmsg <- Right [OpRecvMessageResult mmsg]