{-# LANGUAGE DeriveDataTypeable    #-}
{-# LANGUAGE DeriveGeneric         #-}
{-# LANGUAGE StandaloneDeriving    #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE PatternGuards         #-}
{-# LANGUAGE RecordWildCards       #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE EmptyDataDecls        #-}
{-# LANGUAGE TemplateHaskell       #-}
{-# LANGUAGE ImpredicativeTypes    #-}
{-# LANGUAGE UndecidableInstances  #-}
{-# LANGUAGE MultiParamTypeClasses #-}

-- | An exchange type that broadcasts all incomings 'Post' messages.
module Control.Distributed.Process.Execution.Exchange.Broadcast
  (
    broadcastExchange
  , broadcastExchangeT
  , broadcastClient
  , bindToBroadcaster
  , BroadcastExchange
  ) where

import Control.Concurrent.STM (STM, atomically)
import Control.Concurrent.STM.TChan
  ( TChan
  , newBroadcastTChanIO
  , dupTChan
  , readTChan
  , writeTChan
  )
import Control.DeepSeq (NFData)
import Control.Distributed.Process
  ( Process
  , MonitorRef
  , ProcessMonitorNotification(..)
  , ProcessId
  , SendPort
  , processNodeId
  , getSelfPid
  , getSelfNode
  , liftIO
  , newChan
  , sendChan
  , unsafeSend
  , unsafeSendChan
  , receiveWait
  , match
  , matchIf
  , die
  , handleMessage
  , Match
  )
import qualified Control.Distributed.Process as P
import Control.Distributed.Process.Serializable()
import Control.Distributed.Process.Execution.Exchange.Internal
  ( startExchange
  , configureExchange
  , Message(..)
  , Exchange(..)
  , ExchangeType(..)
  , applyHandlers
  )
import Control.Distributed.Process.Extras.Internal.Types
  ( Channel
  , ServerDisconnected(..)
  )
import Control.Distributed.Process.Extras.Internal.Unsafe -- see [note: pcopy]
  ( PCopy
  , pCopy
  , pUnwrap
  , matchChanP
  , InputStream(Null)
  , newInputStream
  )
import Control.Monad (forM_, void)
import Data.Accessor
  ( Accessor
  , accessor
  , (^:)
  )
import Data.Binary
import qualified Data.Foldable as Foldable (toList)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Typeable (Typeable)
import GHC.Generics

-- newtype RoutingTable r =
--  RoutingTable { routes :: (Map String (Map ProcessId r)) }

-- [note: BindSTM, BindPort and safety]
-- We keep these two /bind types/ separate, since only one of them
-- is truly serializable. The risk of unifying them is that at some
-- later time a maintainer might not realise that BindSTM cannot be
-- sent over the wire due to our use of PCopy.
--

data BindPort = BindPort { BindPort -> ProcessId
portClient :: !ProcessId
                         , BindPort -> SendPort Message
portSend   :: !(SendPort Message)
                         } deriving (Typeable, (forall x. BindPort -> Rep BindPort x)
-> (forall x. Rep BindPort x -> BindPort) -> Generic BindPort
forall x. Rep BindPort x -> BindPort
forall x. BindPort -> Rep BindPort x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. BindPort -> Rep BindPort x
from :: forall x. BindPort -> Rep BindPort x
$cto :: forall x. Rep BindPort x -> BindPort
to :: forall x. Rep BindPort x -> BindPort
Generic)
instance Binary BindPort where
instance NFData BindPort where

data BindSTM =
    BindSTM  { BindSTM -> ProcessId
stmClient :: !ProcessId
             , BindSTM -> SendPort (PCopy (InputStream Message))
stmSend   :: !(SendPort (PCopy (InputStream Message)))
             } deriving (Typeable)
{-  | forall r. (Routable r) =>
    BindR    { client :: !ProcessId
             , key    :: !String
             , chanC  :: !r
             }
  deriving (Typeable, Generic)
-}

data OutputStream =
    WriteChan (SendPort Message)
  | WriteSTM  (Message -> STM ())
--  | WriteP    ProcessId
  | NoWrite
  deriving (Typeable)

data Binding = Binding { Binding -> OutputStream
outputStream :: !OutputStream
                       , Binding -> InputStream Message
inputStream  :: !(InputStream Message)
                       }
             | PidBinding !ProcessId
  deriving (Typeable)

data BindOk = BindOk
  deriving (Typeable, (forall x. BindOk -> Rep BindOk x)
-> (forall x. Rep BindOk x -> BindOk) -> Generic BindOk
forall x. Rep BindOk x -> BindOk
forall x. BindOk -> Rep BindOk x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. BindOk -> Rep BindOk x
from :: forall x. BindOk -> Rep BindOk x
$cto :: forall x. Rep BindOk x -> BindOk
to :: forall x. Rep BindOk x -> BindOk
Generic)
instance Binary BindOk where
instance NFData BindOk where

data BindFail = BindFail !String
  deriving (Typeable, (forall x. BindFail -> Rep BindFail x)
-> (forall x. Rep BindFail x -> BindFail) -> Generic BindFail
forall x. Rep BindFail x -> BindFail
forall x. BindFail -> Rep BindFail x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. BindFail -> Rep BindFail x
from :: forall x. BindFail -> Rep BindFail x
$cto :: forall x. Rep BindFail x -> BindFail
to :: forall x. Rep BindFail x -> BindFail
Generic)
instance Binary BindFail where
instance NFData BindFail where

data BindPlease = BindPlease
  deriving (Typeable, (forall x. BindPlease -> Rep BindPlease x)
-> (forall x. Rep BindPlease x -> BindPlease) -> Generic BindPlease
forall x. Rep BindPlease x -> BindPlease
forall x. BindPlease -> Rep BindPlease x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. BindPlease -> Rep BindPlease x
from :: forall x. BindPlease -> Rep BindPlease x
$cto :: forall x. Rep BindPlease x -> BindPlease
to :: forall x. Rep BindPlease x -> BindPlease
Generic)
instance Binary BindPlease where
instance NFData BindPlease where

type BroadcastClients = Map ProcessId Binding
data BroadcastEx =
  BroadcastEx { BroadcastEx -> BroadcastClients
_routingTable   :: !BroadcastClients
              , BroadcastEx -> TChan Message
channel         :: !(TChan Message)
              }

type BroadcastExchange = ExchangeType BroadcastEx

--------------------------------------------------------------------------------
-- Starting/Running the Exchange                                              --
--------------------------------------------------------------------------------

-- | Start a new /broadcast exchange/ and return a handle to the exchange.
broadcastExchange :: Process Exchange
broadcastExchange :: Process Exchange
broadcastExchange = Process BroadcastExchange
broadcastExchangeT Process BroadcastExchange
-> (BroadcastExchange -> Process Exchange) -> Process Exchange
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= BroadcastExchange -> Process Exchange
forall s. ExchangeType s -> Process Exchange
startExchange

-- | The 'ExchangeType' of a broadcast exchange. Can be combined with the
-- @startSupervisedRef@ and @startSupervised@ APIs.
--
broadcastExchangeT :: Process BroadcastExchange
broadcastExchangeT :: Process BroadcastExchange
broadcastExchangeT = do
  TChan Message
ch <- IO (TChan Message) -> Process (TChan Message)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TChan Message)
forall a. IO (TChan a)
newBroadcastTChanIO
  BroadcastExchange -> Process BroadcastExchange
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (BroadcastExchange -> Process BroadcastExchange)
-> BroadcastExchange -> Process BroadcastExchange
forall a b. (a -> b) -> a -> b
$ ExchangeType { name :: String
name        = String
"BroadcastExchange"
                        , state :: BroadcastEx
state       = BroadcastClients -> TChan Message -> BroadcastEx
BroadcastEx BroadcastClients
forall k a. Map k a
Map.empty TChan Message
ch
                        , configureEx :: BroadcastEx -> Message -> Process BroadcastEx
configureEx = BroadcastEx -> Message -> Process BroadcastEx
apiConfigure
                        , routeEx :: BroadcastEx -> Message -> Process BroadcastEx
routeEx     = BroadcastEx -> Message -> Process BroadcastEx
apiRoute
                        }

--------------------------------------------------------------------------------
-- Client Facing API                                                          --
--------------------------------------------------------------------------------

-- | Create a binding to the given /broadcast exchange/ for the calling process
-- and return an 'InputStream' that can be used in the @expect@ and
-- @receiveWait@ family of messaging primitives. This form of client interaction
-- helps avoid cluttering the caller's mailbox with 'Message' data, since the
-- 'InputChannel' provides a separate input stream (in a similar fashion to
-- a typed channel).
-- Example:
--
-- > is <- broadcastClient ex
-- > msg <- receiveWait [ matchInputStream is ]
-- > handleMessage (payload msg)
--
broadcastClient :: Exchange -> Process (InputStream Message)
broadcastClient :: Exchange -> Process (InputStream Message)
broadcastClient ex :: Exchange
ex@Exchange{String
ProcessId
ControlPort ControlMessage
pid :: ProcessId
cchan :: ControlPort ControlMessage
xType :: String
pid :: Exchange -> ProcessId
cchan :: Exchange -> ControlPort ControlMessage
xType :: Exchange -> String
..} = do
  NodeId
myNode <- Process NodeId
getSelfNode
  ProcessId
us     <- Process ProcessId
getSelfPid
  if ProcessId -> NodeId
processNodeId ProcessId
pid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
myNode -- see [note: pcopy]
     then do (SendPort (PCopy (InputStream Message))
sp, ReceivePort (PCopy (InputStream Message))
rp) <- Process
  (SendPort (PCopy (InputStream Message)),
   ReceivePort (PCopy (InputStream Message)))
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
             Exchange -> PCopy BindSTM -> Process ()
forall m. Serializable m => Exchange -> m -> Process ()
configureExchange Exchange
ex (PCopy BindSTM -> Process ()) -> PCopy BindSTM -> Process ()
forall a b. (a -> b) -> a -> b
$ BindSTM -> PCopy BindSTM
forall a. Typeable a => a -> PCopy a
pCopy (ProcessId -> SendPort (PCopy (InputStream Message)) -> BindSTM
BindSTM ProcessId
us SendPort (PCopy (InputStream Message))
sp)
             MonitorRef
mRef <- ProcessId -> Process MonitorRef
P.monitor ProcessId
pid
             Process (InputStream Message)
-> Process () -> Process (InputStream Message)
forall a b. Process a -> Process b -> Process a
P.finally ([Match (InputStream Message)] -> Process (InputStream Message)
forall b. [Match b] -> Process b
receiveWait [ ReceivePort (PCopy (InputStream Message))
-> Match (InputStream Message)
forall m. Typeable m => ReceivePort (PCopy m) -> Match m
matchChanP ReceivePort (PCopy (InputStream Message))
rp
                                    , MonitorRef -> Match (InputStream Message)
handleServerFailure MonitorRef
mRef ])
                       (MonitorRef -> Process ()
P.unmonitor MonitorRef
mRef)
     else do (SendPort Message
sp, ReceivePort Message
rp) <- Process (SendPort Message, ReceivePort Message)
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan :: Process (Channel Message)
             Exchange -> BindPort -> Process ()
forall m. Serializable m => Exchange -> m -> Process ()
configureExchange Exchange
ex (BindPort -> Process ()) -> BindPort -> Process ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> SendPort Message -> BindPort
BindPort ProcessId
us SendPort Message
sp
             MonitorRef
mRef <- ProcessId -> Process MonitorRef
P.monitor ProcessId
pid
             Process (InputStream Message)
-> Process () -> Process (InputStream Message)
forall a b. Process a -> Process b -> Process a
P.finally ([Match (InputStream Message)] -> Process (InputStream Message)
forall b. [Match b] -> Process b
receiveWait [
                           (BindOk -> Process (InputStream Message))
-> Match (InputStream Message)
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(BindOk
_ :: BindOk)   -> InputStream Message -> Process (InputStream Message)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (InputStream Message -> Process (InputStream Message))
-> InputStream Message -> Process (InputStream Message)
forall a b. (a -> b) -> a -> b
$ Either (ReceivePort Message) (STM Message) -> InputStream Message
forall a.
Typeable a =>
Either (ReceivePort a) (STM a) -> InputStream a
newInputStream (Either (ReceivePort Message) (STM Message) -> InputStream Message)
-> Either (ReceivePort Message) (STM Message)
-> InputStream Message
forall a b. (a -> b) -> a -> b
$ ReceivePort Message -> Either (ReceivePort Message) (STM Message)
forall a b. a -> Either a b
Left ReceivePort Message
rp)
                         , (BindFail -> Process (InputStream Message))
-> Match (InputStream Message)
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(BindFail
f :: BindFail) -> BindFail -> Process (InputStream Message)
forall a b. Serializable a => a -> Process b
die BindFail
f)
                         , MonitorRef -> Match (InputStream Message)
handleServerFailure MonitorRef
mRef
                         ])
                       (MonitorRef -> Process ()
P.unmonitor MonitorRef
mRef)

-- | Bind the calling process to the given /broadcast exchange/. For each
-- 'Message' the exchange receives, /only the payload will be sent/
-- to the calling process' mailbox.
--
-- Example:
--
-- (producer)
-- > post ex "Hello"
--
-- (consumer)
-- > bindToBroadcaster ex
-- > expect >>= liftIO . putStrLn
--
bindToBroadcaster :: Exchange -> Process ()
bindToBroadcaster :: Exchange -> Process ()
bindToBroadcaster ex :: Exchange
ex@Exchange{String
ProcessId
ControlPort ControlMessage
pid :: Exchange -> ProcessId
cchan :: Exchange -> ControlPort ControlMessage
xType :: Exchange -> String
pid :: ProcessId
cchan :: ControlPort ControlMessage
xType :: String
..} = do
  ProcessId
us <- Process ProcessId
getSelfPid
  Exchange -> (BindPlease, ProcessId) -> Process ()
forall m. Serializable m => Exchange -> m -> Process ()
configureExchange Exchange
ex ((BindPlease, ProcessId) -> Process ())
-> (BindPlease, ProcessId) -> Process ()
forall a b. (a -> b) -> a -> b
$ (BindPlease
BindPlease, ProcessId
us)

--------------------------------------------------------------------------------
-- Exchage Definition/State & API Handlers                                    --
--------------------------------------------------------------------------------

apiRoute :: BroadcastEx -> Message -> Process BroadcastEx
apiRoute :: BroadcastEx -> Message -> Process BroadcastEx
apiRoute ex :: BroadcastEx
ex@BroadcastEx{BroadcastClients
TChan Message
_routingTable :: BroadcastEx -> BroadcastClients
channel :: BroadcastEx -> TChan Message
_routingTable :: BroadcastClients
channel :: TChan Message
..} Message
msg = do
  IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan Message -> Message -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan Message
channel Message
msg
  [Binding] -> (Binding -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (BroadcastClients -> [Binding]
forall a. Map ProcessId a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Foldable.toList BroadcastClients
_routingTable) ((Binding -> Process ()) -> Process ())
-> (Binding -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ Message -> Binding -> Process ()
routeToClient Message
msg
  BroadcastEx -> Process BroadcastEx
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return BroadcastEx
ex
  where
    routeToClient :: Message -> Binding -> Process ()
routeToClient Message
m (PidBinding ProcessId
p)  = Message -> ProcessId -> Process ()
P.forward (Message -> Message
payload Message
m) ProcessId
p
    routeToClient Message
m b :: Binding
b@(Binding OutputStream
_ InputStream Message
_) = OutputStream -> Message -> Process ()
writeToStream (Binding -> OutputStream
outputStream Binding
b) Message
m

-- TODO: implement unbind!!?

apiConfigure :: BroadcastEx -> P.Message -> Process BroadcastEx
apiConfigure :: BroadcastEx -> Message -> Process BroadcastEx
apiConfigure BroadcastEx
ex Message
msg = do
  -- for unsafe / non-serializable message passing hacks, see [note: pcopy]
  BroadcastEx
-> Message
-> [Message -> Process (Maybe BroadcastEx)]
-> Process BroadcastEx
forall a.
a -> Message -> [Message -> Process (Maybe a)] -> Process a
applyHandlers BroadcastEx
ex Message
msg ([Message -> Process (Maybe BroadcastEx)] -> Process BroadcastEx)
-> [Message -> Process (Maybe BroadcastEx)] -> Process BroadcastEx
forall a b. (a -> b) -> a -> b
$ [ \Message
m -> Message
-> (BindPort -> Process BroadcastEx) -> Process (Maybe BroadcastEx)
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
m (BroadcastEx -> BindPort -> Process BroadcastEx
handleBindPort BroadcastEx
ex)
                         , \Message
m -> BroadcastEx -> Message -> Process (Maybe BroadcastEx)
handleBindSTM BroadcastEx
ex Message
m
                         , \Message
m -> Message
-> ((BindPlease, ProcessId) -> Process BroadcastEx)
-> Process (Maybe BroadcastEx)
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
m (BroadcastEx -> (BindPlease, ProcessId) -> Process BroadcastEx
forall {m :: * -> *}.
Monad m =>
BroadcastEx -> (BindPlease, ProcessId) -> m BroadcastEx
handleBindPlease BroadcastEx
ex)
                         , \Message
m -> Message
-> (ProcessMonitorNotification -> Process BroadcastEx)
-> Process (Maybe BroadcastEx)
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
m (BroadcastEx -> ProcessMonitorNotification -> Process BroadcastEx
forall {m :: * -> *}.
Monad m =>
BroadcastEx -> ProcessMonitorNotification -> m BroadcastEx
handleMonitorSignal BroadcastEx
ex)
                         , (Process (Maybe BroadcastEx)
-> Message -> Process (Maybe BroadcastEx)
forall a b. a -> b -> a
const (Process (Maybe BroadcastEx)
 -> Message -> Process (Maybe BroadcastEx))
-> Process (Maybe BroadcastEx)
-> Message
-> Process (Maybe BroadcastEx)
forall a b. (a -> b) -> a -> b
$ Maybe BroadcastEx -> Process (Maybe BroadcastEx)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe BroadcastEx -> Process (Maybe BroadcastEx))
-> Maybe BroadcastEx -> Process (Maybe BroadcastEx)
forall a b. (a -> b) -> a -> b
$ BroadcastEx -> Maybe BroadcastEx
forall a. a -> Maybe a
Just BroadcastEx
ex)
                         ]
  where
    handleBindPlease :: BroadcastEx -> (BindPlease, ProcessId) -> m BroadcastEx
handleBindPlease BroadcastEx
ex' (BindPlease
BindPlease, ProcessId
p) = do
      case BroadcastEx -> ProcessId -> Maybe Binding
lookupBinding BroadcastEx
ex' ProcessId
p of
        Maybe Binding
Nothing -> BroadcastEx -> m BroadcastEx
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BroadcastEx -> m BroadcastEx) -> BroadcastEx -> m BroadcastEx
forall a b. (a -> b) -> a -> b
$ (Accessor BroadcastEx BroadcastClients
routingTable Accessor BroadcastEx BroadcastClients
-> (BroadcastClients -> BroadcastClients)
-> BroadcastEx
-> BroadcastEx
forall r a. T r a -> (a -> a) -> r -> r
^: ProcessId -> Binding -> BroadcastClients -> BroadcastClients
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert ProcessId
p (ProcessId -> Binding
PidBinding ProcessId
p)) BroadcastEx
ex'
        Just Binding
_  -> BroadcastEx -> m BroadcastEx
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return BroadcastEx
ex'

    handleMonitorSignal :: BroadcastEx -> ProcessMonitorNotification -> m BroadcastEx
handleMonitorSignal BroadcastEx
bx (ProcessMonitorNotification MonitorRef
_ ProcessId
p DiedReason
_) =
      BroadcastEx -> m BroadcastEx
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BroadcastEx -> m BroadcastEx) -> BroadcastEx -> m BroadcastEx
forall a b. (a -> b) -> a -> b
$ (Accessor BroadcastEx BroadcastClients
routingTable Accessor BroadcastEx BroadcastClients
-> (BroadcastClients -> BroadcastClients)
-> BroadcastEx
-> BroadcastEx
forall r a. T r a -> (a -> a) -> r -> r
^: ProcessId -> BroadcastClients -> BroadcastClients
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete ProcessId
p) BroadcastEx
bx

    handleBindSTM :: BroadcastEx -> Message -> Process (Maybe BroadcastEx)
handleBindSTM ex' :: BroadcastEx
ex'@BroadcastEx{BroadcastClients
TChan Message
_routingTable :: BroadcastEx -> BroadcastClients
channel :: BroadcastEx -> TChan Message
_routingTable :: BroadcastClients
channel :: TChan Message
..} Message
msg' = do
      Maybe BindSTM
bind' <- Message -> Process (Maybe BindSTM)
forall m. Typeable m => Message -> Process (Maybe m)
pUnwrap Message
msg' :: Process (Maybe BindSTM) -- see [note: pcopy]
      case Maybe BindSTM
bind' of
        Maybe BindSTM
Nothing -> Maybe BroadcastEx -> Process (Maybe BroadcastEx)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe BroadcastEx
forall a. Maybe a
Nothing
        Just BindSTM
s  -> do
          let binding :: Maybe Binding
binding = BroadcastEx -> ProcessId -> Maybe Binding
lookupBinding BroadcastEx
ex' (BindSTM -> ProcessId
stmClient BindSTM
s)
          case Maybe Binding
binding of
            Maybe Binding
Nothing -> BroadcastEx -> BindSTM -> Process BroadcastEx
createBinding BroadcastEx
ex' BindSTM
s Process BroadcastEx
-> (BroadcastEx -> Process (Maybe BroadcastEx))
-> Process (Maybe BroadcastEx)
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \BroadcastEx
ex'' -> BroadcastEx -> Message -> Process (Maybe BroadcastEx)
handleBindSTM BroadcastEx
ex'' Message
msg'
            Just Binding
b  -> SendPort (PCopy (InputStream Message)) -> Binding -> Process ()
sendBinding (BindSTM -> SendPort (PCopy (InputStream Message))
stmSend BindSTM
s) Binding
b Process ()
-> Process (Maybe BroadcastEx) -> Process (Maybe BroadcastEx)
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe BroadcastEx -> Process (Maybe BroadcastEx)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (BroadcastEx -> Maybe BroadcastEx
forall a. a -> Maybe a
Just BroadcastEx
ex')

    createBinding :: BroadcastEx -> BindSTM -> Process BroadcastEx
createBinding bEx' :: BroadcastEx
bEx'@BroadcastEx{BroadcastClients
TChan Message
_routingTable :: BroadcastEx -> BroadcastClients
channel :: BroadcastEx -> TChan Message
_routingTable :: BroadcastClients
channel :: TChan Message
..} BindSTM{SendPort (PCopy (InputStream Message))
ProcessId
stmClient :: BindSTM -> ProcessId
stmSend :: BindSTM -> SendPort (PCopy (InputStream Message))
stmClient :: ProcessId
stmSend :: SendPort (PCopy (InputStream Message))
..} = do
      Process MonitorRef -> Process ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Process MonitorRef -> Process ())
-> Process MonitorRef -> Process ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> Process MonitorRef
P.monitor ProcessId
stmClient
      TChan Message
nch <- IO (TChan Message) -> Process (TChan Message)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TChan Message) -> Process (TChan Message))
-> IO (TChan Message) -> Process (TChan Message)
forall a b. (a -> b) -> a -> b
$ STM (TChan Message) -> IO (TChan Message)
forall a. STM a -> IO a
atomically (STM (TChan Message) -> IO (TChan Message))
-> STM (TChan Message) -> IO (TChan Message)
forall a b. (a -> b) -> a -> b
$ TChan Message -> STM (TChan Message)
forall a. TChan a -> STM (TChan a)
dupTChan TChan Message
channel
      let istr :: InputStream Message
istr = Either (ReceivePort Message) (STM Message) -> InputStream Message
forall a.
Typeable a =>
Either (ReceivePort a) (STM a) -> InputStream a
newInputStream (Either (ReceivePort Message) (STM Message) -> InputStream Message)
-> Either (ReceivePort Message) (STM Message)
-> InputStream Message
forall a b. (a -> b) -> a -> b
$ STM Message -> Either (ReceivePort Message) (STM Message)
forall a b. b -> Either a b
Right (TChan Message -> STM Message
forall a. TChan a -> STM a
readTChan TChan Message
nch)
      let ostr :: OutputStream
ostr = OutputStream
NoWrite -- we write to our own channel, not the broadcast
      let bnd :: Binding
bnd = OutputStream -> InputStream Message -> Binding
Binding OutputStream
ostr InputStream Message
istr
      BroadcastEx -> Process BroadcastEx
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (BroadcastEx -> Process BroadcastEx)
-> BroadcastEx -> Process BroadcastEx
forall a b. (a -> b) -> a -> b
$ (Accessor BroadcastEx BroadcastClients
routingTable Accessor BroadcastEx BroadcastClients
-> (BroadcastClients -> BroadcastClients)
-> BroadcastEx
-> BroadcastEx
forall r a. T r a -> (a -> a) -> r -> r
^: ProcessId -> Binding -> BroadcastClients -> BroadcastClients
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert ProcessId
stmClient Binding
bnd) BroadcastEx
bEx'

    sendBinding :: SendPort (PCopy (InputStream Message)) -> Binding -> Process ()
sendBinding SendPort (PCopy (InputStream Message))
sp' Binding
bs = SendPort (PCopy (InputStream Message))
-> PCopy (InputStream Message) -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
unsafeSendChan SendPort (PCopy (InputStream Message))
sp' (PCopy (InputStream Message) -> Process ())
-> PCopy (InputStream Message) -> Process ()
forall a b. (a -> b) -> a -> b
$ InputStream Message -> PCopy (InputStream Message)
forall a. Typeable a => a -> PCopy a
pCopy (Binding -> InputStream Message
inputStream Binding
bs)

    handleBindPort :: BroadcastEx -> BindPort -> Process BroadcastEx
    handleBindPort :: BroadcastEx -> BindPort -> Process BroadcastEx
handleBindPort x :: BroadcastEx
x@BroadcastEx{BroadcastClients
TChan Message
_routingTable :: BroadcastEx -> BroadcastClients
channel :: BroadcastEx -> TChan Message
_routingTable :: BroadcastClients
channel :: TChan Message
..} BindPort{SendPort Message
ProcessId
portClient :: BindPort -> ProcessId
portSend :: BindPort -> SendPort Message
portClient :: ProcessId
portSend :: SendPort Message
..} = do
      let binding :: Maybe Binding
binding = BroadcastEx -> ProcessId -> Maybe Binding
lookupBinding BroadcastEx
x ProcessId
portClient
      case Maybe Binding
binding of
        Just Binding
_  -> ProcessId -> BindFail -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
unsafeSend ProcessId
portClient (String -> BindFail
BindFail String
"DuplicateBinding") Process () -> Process BroadcastEx -> Process BroadcastEx
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> BroadcastEx -> Process BroadcastEx
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return BroadcastEx
x
        Maybe Binding
Nothing -> do
          let istr :: InputStream a
istr = InputStream a
forall a. InputStream a
Null
          let ostr :: OutputStream
ostr = SendPort Message -> OutputStream
WriteChan SendPort Message
portSend
          let bound :: Binding
bound = OutputStream -> InputStream Message -> Binding
Binding OutputStream
ostr InputStream Message
forall a. InputStream a
istr
          Process MonitorRef -> Process ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Process MonitorRef -> Process ())
-> Process MonitorRef -> Process ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> Process MonitorRef
P.monitor ProcessId
portClient
          ProcessId -> BindOk -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
unsafeSend ProcessId
portClient BindOk
BindOk
          BroadcastEx -> Process BroadcastEx
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (BroadcastEx -> Process BroadcastEx)
-> BroadcastEx -> Process BroadcastEx
forall a b. (a -> b) -> a -> b
$ (Accessor BroadcastEx BroadcastClients
routingTable Accessor BroadcastEx BroadcastClients
-> (BroadcastClients -> BroadcastClients)
-> BroadcastEx
-> BroadcastEx
forall r a. T r a -> (a -> a) -> r -> r
^: ProcessId -> Binding -> BroadcastClients -> BroadcastClients
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert ProcessId
portClient Binding
bound) BroadcastEx
x

    lookupBinding :: BroadcastEx -> ProcessId -> Maybe Binding
lookupBinding BroadcastEx{BroadcastClients
TChan Message
_routingTable :: BroadcastEx -> BroadcastClients
channel :: BroadcastEx -> TChan Message
_routingTable :: BroadcastClients
channel :: TChan Message
..} ProcessId
k = ProcessId -> BroadcastClients -> Maybe Binding
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup ProcessId
k (BroadcastClients -> Maybe Binding)
-> BroadcastClients -> Maybe Binding
forall a b. (a -> b) -> a -> b
$ BroadcastClients
_routingTable

{- [note: pcopy]

We rely on risky techniques here, in order to allow for sharing useful
data that is not really serializable. For Cloud Haskell generally, this is
a bad idea, since we want message passing to work both locally and in a
distributed setting. In this case however, what we're really attempting is
an optimisation, since we only use unsafe PCopy based techniques when dealing
with exchange clients residing on our (local) node.

The PCopy mechanism is defined in the (aptly named) "Unsafe" module.

-}

-- TODO: move handleServerFailure into Primitives.hs

writeToStream :: OutputStream -> Message -> Process ()
writeToStream :: OutputStream -> Message -> Process ()
writeToStream (WriteChan SendPort Message
sp) = SendPort Message -> Message -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort Message
sp  -- see [note: safe remote send]
writeToStream (WriteSTM Message -> STM ()
stm) = IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ())
-> (Message -> IO ()) -> Message -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (Message -> STM ()) -> Message -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Message -> STM ()
stm
writeToStream OutputStream
NoWrite        = Process () -> Message -> Process ()
forall a b. a -> b -> a
const (Process () -> Message -> Process ())
-> Process () -> Message -> Process ()
forall a b. (a -> b) -> a -> b
$ () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
{-# INLINE writeToStream #-}

{- [note: safe remote send]

Although we go to great lengths here to avoid serialization and/or copying
overheads, there are some activities for which we prefer to play it safe.
Chief among these is delivering messages to remote clients. Thankfully, our
unsafe @sendChan@ primitive will crash the caller/sender if there are any
encoding problems, however it is only because we /know/ for certain that
our recipient is remote, that we've chosen to write via a SendPort in the
first place! It makes sense therefore, to use the safe @sendChan@ operation
here, since for a remote call we /cannot/ avoid the overhead of serialization
anyway.

-}

handleServerFailure :: MonitorRef -> Match (InputStream Message)
handleServerFailure :: MonitorRef -> Match (InputStream Message)
handleServerFailure MonitorRef
mRef =
  (ProcessMonitorNotification -> Bool)
-> (ProcessMonitorNotification -> Process (InputStream Message))
-> Match (InputStream Message)
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(ProcessMonitorNotification MonitorRef
r ProcessId
_ DiedReason
_) -> MonitorRef
r MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
mRef)
          (\(ProcessMonitorNotification MonitorRef
_ ProcessId
_ DiedReason
d) -> ServerDisconnected -> Process (InputStream Message)
forall a b. Serializable a => a -> Process b
die (ServerDisconnected -> Process (InputStream Message))
-> ServerDisconnected -> Process (InputStream Message)
forall a b. (a -> b) -> a -> b
$ DiedReason -> ServerDisconnected
ServerDisconnected DiedReason
d)

routingTable :: Accessor BroadcastEx BroadcastClients
routingTable :: Accessor BroadcastEx BroadcastClients
routingTable = (BroadcastEx -> BroadcastClients)
-> (BroadcastClients -> BroadcastEx -> BroadcastEx)
-> Accessor BroadcastEx BroadcastClients
forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor BroadcastEx -> BroadcastClients
_routingTable (\BroadcastClients
r BroadcastEx
e -> BroadcastEx
e { _routingTable = r })