gRPC-haskell/core/src/Network/GRPC/LowLevel/Op.hs

328 lines
14 KiB
Haskell
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ViewPatterns #-}
module Network.GRPC.LowLevel.Op where
import Control.Exception
import Control.Monad
import Control.Monad.Trans.Except
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import Data.Maybe (catMaybes)
import Foreign.C.Types (CInt)
import Foreign.Marshal.Alloc (free, malloc)
import Foreign.Ptr (Ptr, nullPtr)
import Foreign.Storable (peek)
import Network.GRPC.LowLevel.CompletionQueue
import Network.GRPC.LowLevel.GRPC
import qualified Network.GRPC.Unsafe as C (Call)
import qualified Network.GRPC.Unsafe.ByteBuffer as C
import qualified Network.GRPC.Unsafe.Metadata as C
import qualified Network.GRPC.Unsafe.Op as C
import qualified Network.GRPC.Unsafe.Slice as C
import Network.GRPC.Unsafe.Slice (Slice)
-- | Sum describing all possible send and receive operations that can be batched
-- and executed by gRPC. Usually these are processed in a handful of
-- combinations depending on the 'MethodType' of the call being run.
data Op = OpSendInitialMetadata MetadataMap
| OpSendMessage B.ByteString
| OpSendCloseFromClient
| OpSendStatusFromServer MetadataMap C.StatusCode StatusDetails
| OpRecvInitialMetadata
| OpRecvMessage
| OpRecvStatusOnClient
| OpRecvCloseOnServer
deriving (Show)
-- | Container holding the pointers to the C and gRPC data needed to execute the
-- corresponding 'Op'. These are obviously unsafe, and should only be used with
-- 'withOpContexts'.
data OpContext =
OpSendInitialMetadataContext C.MetadataKeyValPtr Int
| OpSendMessageContext (C.ByteBuffer, C.Slice)
| OpSendCloseFromClientContext
| OpSendStatusFromServerContext C.MetadataKeyValPtr Int C.StatusCode Slice
| OpRecvInitialMetadataContext (Ptr C.MetadataArray)
| OpRecvMessageContext (Ptr C.ByteBuffer)
| OpRecvStatusOnClientContext (Ptr C.MetadataArray) (Ptr C.StatusCode) Slice
| OpRecvCloseOnServerContext (Ptr CInt)
deriving Show
-- | Length we pass to gRPC for receiving status details
-- when processing 'OpRecvStatusOnClient'. It appears that gRPC actually ignores
-- this length and reallocates a longer string if necessary.
defaultStatusStringLen :: Int
defaultStatusStringLen = 128
-- | Allocates and initializes the 'Opcontext' corresponding to the given 'Op'.
createOpContext :: Op -> IO OpContext
createOpContext (OpSendInitialMetadata m) =
uncurry OpSendInitialMetadataContext <$> C.createMetadata m
createOpContext (OpSendMessage bs) =
fmap OpSendMessageContext (C.createByteBuffer bs)
createOpContext (OpSendCloseFromClient) = return OpSendCloseFromClientContext
createOpContext (OpSendStatusFromServer m code (StatusDetails str)) =
uncurry OpSendStatusFromServerContext
<$> C.createMetadata m
<*> return code
<*> C.byteStringToSlice str
createOpContext OpRecvInitialMetadata =
fmap OpRecvInitialMetadataContext C.metadataArrayCreate
createOpContext OpRecvMessage =
fmap OpRecvMessageContext C.createReceivingByteBuffer
createOpContext OpRecvStatusOnClient = do
pmetadata <- C.metadataArrayCreate
pstatus <- C.createStatusCodePtr
slice <- C.grpcSliceMalloc defaultStatusStringLen
return $ OpRecvStatusOnClientContext pmetadata pstatus slice
createOpContext OpRecvCloseOnServer =
fmap OpRecvCloseOnServerContext $ malloc
-- | Mutates the given raw array of ops at the given index according to the
-- given 'OpContext'.
setOpArray :: C.OpArray -> Int -> OpContext -> IO ()
setOpArray arr i (OpSendInitialMetadataContext kvs l) =
C.opSendInitialMetadata arr i kvs l
setOpArray arr i (OpSendMessageContext (bb,_)) =
C.opSendMessage arr i bb
setOpArray arr i OpSendCloseFromClientContext =
C.opSendCloseClient arr i
setOpArray arr i (OpSendStatusFromServerContext kvs l code details) =
C.opSendStatusServer arr i l kvs code details
setOpArray arr i (OpRecvInitialMetadataContext pmetadata) =
C.opRecvInitialMetadata arr i pmetadata
setOpArray arr i (OpRecvMessageContext pbb) =
C.opRecvMessage arr i pbb
setOpArray arr i (OpRecvStatusOnClientContext pmetadata pstatus slice) =
C.opRecvStatusClient arr i pmetadata pstatus slice
setOpArray arr i (OpRecvCloseOnServerContext pcancelled) = do
C.opRecvCloseServer arr i pcancelled
-- | Cleans up an 'OpContext'.
freeOpContext :: OpContext -> IO ()
freeOpContext (OpSendInitialMetadataContext m _) = C.metadataFree m
freeOpContext (OpSendMessageContext (bb, s)) =
C.grpcByteBufferDestroy bb >> C.freeSlice s
freeOpContext OpSendCloseFromClientContext = return ()
freeOpContext (OpSendStatusFromServerContext metadata _ _ s) =
C.metadataFree metadata >> C.freeSlice s
freeOpContext (OpRecvInitialMetadataContext metadata) =
C.metadataArrayDestroy metadata
freeOpContext (OpRecvMessageContext pbb) =
C.destroyReceivingByteBuffer pbb
freeOpContext (OpRecvStatusOnClientContext metadata pcode slice) = do
C.metadataArrayDestroy metadata
C.destroyStatusCodePtr pcode
C.freeSlice slice
freeOpContext (OpRecvCloseOnServerContext pcancelled) =
grpcDebug ("freeOpContext: freeing pcancelled: " ++ show pcancelled)
>> free pcancelled
-- | Allocates an `OpArray` and a list of `OpContext`s from the given list of
-- `Op`s.
withOpArrayAndCtxts :: [Op] -> ((C.OpArray, [OpContext]) -> IO a) -> IO a
withOpArrayAndCtxts ops = bracket setup teardown
where setup = do ctxts <- mapM createOpContext ops
let l = length ops
arr <- C.opArrayCreate l
sequence_ $ zipWith (setOpArray arr) [0..l-1] ctxts
return (arr, ctxts)
teardown (arr, ctxts) = do C.opArrayDestroy arr (length ctxts)
mapM_ freeOpContext ctxts
-- | Container holding GC-managed results for 'Op's which receive data.
data OpRecvResult =
OpRecvInitialMetadataResult MetadataMap
| OpRecvMessageResult (Maybe B.ByteString)
-- ^ If a streaming call is in progress and the stream terminates normally,
-- or If the client or server dies, we might not receive a response body, in
-- which case this will be 'Nothing'.
| OpRecvStatusOnClientResult MetadataMap C.StatusCode B.ByteString
| OpRecvCloseOnServerResult Bool -- ^ True if call was cancelled.
deriving (Show)
-- | For the given 'OpContext', if the 'Op' receives data, copies the data out
-- of the 'OpContext' and into GC-managed Haskell types. After this, it is safe
-- to destroy the 'OpContext'.
resultFromOpContext :: OpContext -> IO (Maybe OpRecvResult)
resultFromOpContext (OpRecvInitialMetadataContext pmetadata) = do
grpcDebug "resultFromOpContext: OpRecvInitialMetadataContext"
metadata <- peek pmetadata
metadataMap <- C.getAllMetadataArray metadata
return $ Just $ OpRecvInitialMetadataResult metadataMap
resultFromOpContext (OpRecvMessageContext pbb) = do
grpcDebug "resultFromOpContext: OpRecvMessageContext"
bb@(C.ByteBuffer bbptr) <- peek pbb
if bbptr == nullPtr
then do grpcDebug "resultFromOpContext: WARNING: got empty message."
return $ Just $ OpRecvMessageResult Nothing
else do bs <- C.copyByteBufferToByteString bb
grpcDebug $ "resultFromOpContext: bb copied: " ++ show bs
return $ Just $ OpRecvMessageResult (Just bs)
resultFromOpContext (OpRecvStatusOnClientContext pmetadata pcode pstr) = do
grpcDebug "resultFromOpContext: OpRecvStatusOnClientContext"
metadata <- peek pmetadata
metadataMap <- C.getAllMetadataArray metadata
code <- C.derefStatusCodePtr pcode
statusInfo <- C.sliceToByteString pstr
return $ Just $ OpRecvStatusOnClientResult metadataMap code statusInfo
resultFromOpContext (OpRecvCloseOnServerContext pcancelled) = do
grpcDebug "resultFromOpContext: OpRecvCloseOnServerContext"
cancelled <- fmap (\x -> if x > 0 then True else False)
(peek pcancelled)
return $ Just $ OpRecvCloseOnServerResult cancelled
resultFromOpContext _ = do
grpcDebug "resultFromOpContext: saw non-result op type."
return Nothing
-- | For a given call, run the given 'Op's on the given completion queue with
-- the given tag. Blocks until the ops are complete or the deadline on the
-- associated call has been reached.
-- TODO: now that we distinguish between different types
-- of calls at the type level, we could try to limit the input 'Op's more
-- appropriately. E.g., we don't use an 'OpRecvInitialMetadata' when receiving a
-- registered call, because gRPC handles that for us.
-- TODO: the list of 'Op's type is less specific than it could be. There are
-- only a few different sequences of 'Op's we will see in practice. Once we
-- figure out what those are, we should create a more specific sum
-- type. However, since ops can fail, the list of 'OpRecvResult' returned by
-- 'runOps' can vary in their contents and are perhaps less amenable to
-- simplification. In the meantime, from looking at the core tests, it looks
-- like it is safe to say that we always get a
-- GRPC_CALL_ERROR_TOO_MANY_OPERATIONS error if we use the same 'Op' twice in
-- the same batch, so we might want to change the list to a set. I don't think
-- order matters within a batch. Need to check.
runOps :: C.Call
-- ^ 'Call' that this batch is associated with. One call can be
-- associated with many batches.
-> CompletionQueue
-- ^ Queue on which our tag will be placed once our ops are done
-- running.
-> [Op]
-- ^ The list of 'Op's to execute.
-> IO (Either GRPCIOError [OpRecvResult])
runOps call cq ops =
let l = length ops in
-- It is crucial to mask exceptions here. If we dont do this, we can
-- run into the following situation:
--
-- 1. We allocate an OpContext, e.g., OpRecvMessageContext and the corresponding ByteBuffer.
-- 2. We pass the buffer to gRPC in startBatch.
-- 3. If we now get an exception we will free the ByteBuffer.
-- 4. gRPC can now end up writing to the freed ByteBuffer and we get a heap corruption.
withOpArrayAndCtxts ops $ \(opArray, contexts) -> mask_ $ do
grpcDebug $ "runOps: allocated op contexts: " ++ show contexts
tag <- newTag cq
grpcDebug $ "runOps: tag: " ++ show tag
callError <- startBatch cq call opArray l tag
grpcDebug $ "runOps: called start_batch. callError: "
++ (show callError)
case callError of
Left x -> return $ Left x
Right () -> do
ev <- pluck cq tag Nothing
grpcDebug $ "runOps: pluck returned " ++ show ev
case ev of
Right () -> do
grpcDebug "runOps: got good op; starting."
fmap (Right . catMaybes) $ mapM resultFromOpContext contexts
Left err -> return $ Left err
runOps' :: C.Call
-> CompletionQueue
-> [Op]
-> ExceptT GRPCIOError IO [OpRecvResult]
runOps' c cq = ExceptT . runOps c cq
-- | If response status info is present in the given 'OpRecvResult's, returns
-- a tuple of trailing metadata, status code, and status details.
extractStatusInfo :: [OpRecvResult]
-> Maybe (MetadataMap, C.StatusCode, B.ByteString)
extractStatusInfo [] = Nothing
extractStatusInfo (OpRecvStatusOnClientResult meta code details:_) =
Just (meta, code, details)
extractStatusInfo (_:xs) = extractStatusInfo xs
--------------------------------------------------------------------------------
-- Types and helpers for common ops batches
type SendSingle a
= C.Call
-> CompletionQueue
-> a
-> ExceptT GRPCIOError IO ()
type RecvSingle a
= C.Call
-> CompletionQueue
-> ExceptT GRPCIOError IO a
pattern RecvMsgRslt :: Maybe ByteString -> Either a [OpRecvResult]
pattern RecvMsgRslt mmsg <- Right [OpRecvMessageResult mmsg]
sendSingle :: SendSingle Op
sendSingle c cq op = void (runOps' c cq [op])
sendInitialMetadata :: SendSingle MetadataMap
sendInitialMetadata c cq = sendSingle c cq . OpSendInitialMetadata
sendStatusFromServer :: SendSingle (MetadataMap, C.StatusCode, StatusDetails)
sendStatusFromServer c cq (md, st, ds) =
sendSingle c cq (OpSendStatusFromServer md st ds)
recvInitialMessage :: RecvSingle ByteString
recvInitialMessage c cq = ExceptT (streamRecvPrim c cq ) >>= \case
Nothing -> throwE (GRPCIOInternalUnexpectedRecv "recvInitialMessage: no message.")
Just bs -> return bs
recvInitialMetadata :: RecvSingle MetadataMap
recvInitialMetadata c cq = runOps' c cq [OpRecvInitialMetadata] >>= \case
[OpRecvInitialMetadataResult md]
-> return md
_ -> throwE (GRPCIOInternalUnexpectedRecv "recvInitialMetadata")
recvInitialMsgMD :: RecvSingle (Maybe ByteString, MetadataMap)
recvInitialMsgMD c cq = runOps' c cq [OpRecvInitialMetadata, OpRecvMessage] >>= \case
[ OpRecvInitialMetadataResult md, OpRecvMessageResult mmsg]
-> return (mmsg, md)
_ -> throwE (GRPCIOInternalUnexpectedRecv "recvInitialMsgMD")
recvStatusOnClient :: RecvSingle (MetadataMap, C.StatusCode, StatusDetails)
recvStatusOnClient c cq = runOps' c cq [OpRecvStatusOnClient] >>= \case
[OpRecvStatusOnClientResult md st ds]
-> return (md, st, StatusDetails ds)
_ -> throwE (GRPCIOInternalUnexpectedRecv "recvStatusOnClient")
--------------------------------------------------------------------------------
-- Streaming types and helpers
type StreamRecv a = IO (Either GRPCIOError (Maybe a))
streamRecvPrim :: C.Call -> CompletionQueue -> StreamRecv ByteString
streamRecvPrim c cq = f <$> runOps c cq [OpRecvMessage]
where
f (RecvMsgRslt mmsg) = Right mmsg
f Right{} = Left (GRPCIOInternalUnexpectedRecv "streamRecvPrim")
f (Left e) = Left e
type StreamSend a = a -> IO (Either GRPCIOError ())
streamSendPrim :: C.Call -> CompletionQueue -> StreamSend ByteString
streamSendPrim c cq bs = f <$> runOps c cq [OpSendMessage bs]
where
f (Right []) = Right ()
f Right{} = Left (GRPCIOInternalUnexpectedRecv "streamSendPrim")
f (Left e) = Left e
type WritesDone = IO (Either GRPCIOError ())
writesDonePrim :: C.Call -> CompletionQueue -> WritesDone
writesDonePrim c cq = f <$> runOps c cq [OpSendCloseFromClient]
where
f (Right []) = Right ()
f Right{} = Left (GRPCIOInternalUnexpectedRecv "writesDonePrim")
f (Left e) = Left e