{-# LANGUAGE DeriveDataTypeable    #-}
{-# LANGUAGE DeriveGeneric         #-}
{-# LANGUAGE StandaloneDeriving    #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE PatternGuards         #-}
{-# LANGUAGE RecordWildCards       #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE EmptyDataDecls        #-}
{-# LANGUAGE TemplateHaskell       #-}
{-# LANGUAGE ImpredicativeTypes    #-}
{-# LANGUAGE UndecidableInstances  #-}
{-# LANGUAGE MultiParamTypeClasses #-}

-- | A simple API for /routing/, using a custom exchange type.
module Control.Distributed.Process.Execution.Exchange.Router
  ( -- * Types
    HeaderName
  , Binding(..)
  , Bindable
  , BindingSelector
  , RelayType(..)
    -- * Starting a Router
  , router
  , supervisedRouter
  , supervisedRouterRef
    -- * Client (Publishing) API
  , route
  , routeMessage
    -- * Routing via message/binding keys
  , messageKeyRouter
  , bindKey
    -- * Routing via message headers
  , headerContentRouter
  , bindHeader
  ) where

import Control.DeepSeq (NFData)
import Control.Distributed.Process
  ( Process
  , ProcessMonitorNotification(..)
  , ProcessId
  , monitor
  , handleMessage
  , unsafeWrapMessage
  )
import qualified Control.Distributed.Process as P
import Control.Distributed.Process.Serializable (Serializable)
import Control.Distributed.Process.Execution.Exchange.Internal
  ( startExchange
  , startSupervised
  , configureExchange
  , Message(..)
  , Exchange
  , ExchangeType(..)
  , post
  , postMessage
  , applyHandlers
  )
import Control.Distributed.Process.Extras.Internal.Primitives
  ( deliver
  , Resolvable(..)
  )
import Control.Distributed.Process.Supervisor (SupervisorPid)
import Data.Binary
import Data.Foldable (forM_)
import Data.Hashable
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as Map
import Data.HashSet (HashSet)
import qualified Data.HashSet as Set
import Data.Typeable (Typeable)
import GHC.Generics

type HeaderName = String

-- | The binding key used by the built-in key and header based
-- routers.
data Binding =
    BindKey    { Binding -> String
bindingKey :: !String }
  | BindHeader { bindingKey :: !String
               , Binding -> String
headerName :: !HeaderName
               }
  | BindNone
  deriving (Typeable, (forall x. Binding -> Rep Binding x)
-> (forall x. Rep Binding x -> Binding) -> Generic Binding
forall x. Rep Binding x -> Binding
forall x. Binding -> Rep Binding x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. Binding -> Rep Binding x
from :: forall x. Binding -> Rep Binding x
$cto :: forall x. Rep Binding x -> Binding
to :: forall x. Rep Binding x -> Binding
Generic, Binding -> Binding -> Bool
(Binding -> Binding -> Bool)
-> (Binding -> Binding -> Bool) -> Eq Binding
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Binding -> Binding -> Bool
== :: Binding -> Binding -> Bool
$c/= :: Binding -> Binding -> Bool
/= :: Binding -> Binding -> Bool
Eq, Int -> Binding -> ShowS
[Binding] -> ShowS
Binding -> String
(Int -> Binding -> ShowS)
-> (Binding -> String) -> ([Binding] -> ShowS) -> Show Binding
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Binding -> ShowS
showsPrec :: Int -> Binding -> ShowS
$cshow :: Binding -> String
show :: Binding -> String
$cshowList :: [Binding] -> ShowS
showList :: [Binding] -> ShowS
Show)
instance Binary Binding where
instance NFData Binding where
instance Hashable Binding where

-- | Things that can be used as binding keys in a router.
class (Hashable k, Eq k, Serializable k) => Bindable k
instance (Hashable k, Eq k, Serializable k) => Bindable k

-- | Used to convert a 'Message' into a 'Bindable' routing key.
type BindingSelector k = (Message -> Process k)

-- | Given to a /router/ to indicate whether clients should
-- receive 'Message' payloads only, or the whole 'Message' object
-- itself.
data RelayType = PayloadOnly | WholeMessage

data State k = State { forall k. State k -> HashMap k (HashSet ProcessId)
bindings  :: !(HashMap k (HashSet ProcessId))
                     , forall k. State k -> BindingSelector k
selector  :: !(BindingSelector k)
                     , forall k. State k -> RelayType
relayType :: !RelayType
                     }

type Router k = ExchangeType (State k)

--------------------------------------------------------------------------------
-- Starting/Running the Exchange                                              --
--------------------------------------------------------------------------------

-- | A router that matches on a 'Message' 'key'. To bind a client @Process@ to
-- such an exchange, use the 'bindKey' function.
messageKeyRouter :: RelayType -> Process Exchange
messageKeyRouter :: RelayType -> Process Exchange
messageKeyRouter RelayType
t = RelayType -> BindingSelector Binding -> Process Exchange
forall k.
Bindable k =>
RelayType -> BindingSelector k -> Process Exchange
router RelayType
t BindingSelector Binding
matchOnKey -- (return . BindKey . key)
  where
    matchOnKey :: Message -> Process Binding
    matchOnKey :: BindingSelector Binding
matchOnKey Message
m = Binding -> Process Binding
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Binding -> Process Binding) -> Binding -> Process Binding
forall a b. (a -> b) -> a -> b
$ String -> Binding
BindKey (Message -> String
key Message
m)

-- | A router that matches on a specific (named) header. To bind a client
-- @Process@ to such an exchange, use the 'bindHeader' function.
headerContentRouter :: RelayType -> HeaderName -> Process Exchange
headerContentRouter :: RelayType -> String -> Process Exchange
headerContentRouter RelayType
t String
n = RelayType -> BindingSelector Binding -> Process Exchange
forall k.
Bindable k =>
RelayType -> BindingSelector k -> Process Exchange
router RelayType
t (String -> BindingSelector Binding
forall {m :: * -> *}. Monad m => String -> Message -> m Binding
checkHeaders String
n)
  where
    checkHeaders :: String -> Message -> m Binding
checkHeaders String
hn Message{String
[(String, String)]
Message
key :: Message -> String
key :: String
headers :: [(String, String)]
payload :: Message
headers :: Message -> [(String, String)]
payload :: Message -> Message
..} = do
      case String -> HashMap String String -> Maybe String
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
Map.lookup String
hn ([(String, String)] -> HashMap String String
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
Map.fromList [(String, String)]
headers) of
        Maybe String
Nothing -> Binding -> m Binding
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Binding
BindNone
        Just String
hv -> Binding -> m Binding
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Binding -> m Binding) -> Binding -> m Binding
forall a b. (a -> b) -> a -> b
$ String -> String -> Binding
BindHeader String
hn String
hv

-- | Defines a /router/ exchange. The 'BindingSelector' is used to construct
-- a binding (i.e., an instance of the 'Bindable' type @k@) for each incoming
-- 'Message'. Such bindings are matched against bindings stored in the exchange.
-- Clients of a /router/ exchange are identified by a binding, mapped to
-- one or more 'ProcessId's.
--
-- The format of the bindings, nature of their storage and mechanism for
-- submitting new bindings is implementation dependent (i.e., will vary by
-- exchange type). For example, the 'messageKeyRouter' and 'headerContentRouter'
-- implementations both use the 'Binding' data type, which can represent a
-- 'Message' key or a 'HeaderName' and content. As with all custom exchange
-- types, bindings should be submitted by evaluating 'configureExchange' with
-- a suitable data type.
--
router :: (Bindable k) => RelayType -> BindingSelector k -> Process Exchange
router :: forall k.
Bindable k =>
RelayType -> BindingSelector k -> Process Exchange
router RelayType
t BindingSelector k
s = RelayType -> BindingSelector k -> Process (Router k)
forall k.
Bindable k =>
RelayType -> BindingSelector k -> Process (Router k)
routerT RelayType
t BindingSelector k
s Process (Router k)
-> (Router k -> Process Exchange) -> Process Exchange
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Router k -> Process Exchange
forall s. ExchangeType s -> Process Exchange
startExchange

supervisedRouterRef :: Bindable k
                    => RelayType
                    -> BindingSelector k
                    -> SupervisorPid
                    -> Process (ProcessId, P.Message)
supervisedRouterRef :: forall k.
Bindable k =>
RelayType
-> BindingSelector k -> ProcessId -> Process (ProcessId, Message)
supervisedRouterRef RelayType
t BindingSelector k
sel ProcessId
spid = do
  Exchange
ex <- RelayType -> BindingSelector k -> ProcessId -> Process Exchange
forall k.
Bindable k =>
RelayType -> BindingSelector k -> ProcessId -> Process Exchange
supervisedRouter RelayType
t BindingSelector k
sel ProcessId
spid
  Just ProcessId
pid <- Exchange -> Process (Maybe ProcessId)
forall a. Resolvable a => a -> Process (Maybe ProcessId)
resolve Exchange
ex
  (ProcessId, Message) -> Process (ProcessId, Message)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessId
pid, Exchange -> Message
forall a. Serializable a => a -> Message
unsafeWrapMessage Exchange
ex)

-- | Defines a /router/ that can be used in a supervision tree.
supervisedRouter :: Bindable k
                 => RelayType
                 -> BindingSelector k
                 -> SupervisorPid
                 -> Process Exchange
supervisedRouter :: forall k.
Bindable k =>
RelayType -> BindingSelector k -> ProcessId -> Process Exchange
supervisedRouter RelayType
t BindingSelector k
sel ProcessId
spid =
  RelayType -> BindingSelector k -> Process (Router k)
forall k.
Bindable k =>
RelayType -> BindingSelector k -> Process (Router k)
routerT RelayType
t BindingSelector k
sel Process (Router k)
-> (Router k -> Process Exchange) -> Process Exchange
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Router k
t' -> Router k -> ProcessId -> Process Exchange
forall s. ExchangeType s -> ProcessId -> Process Exchange
startSupervised Router k
t' ProcessId
spid

routerT :: Bindable k
        => RelayType
        -> BindingSelector k
        -> Process (Router k)
routerT :: forall k.
Bindable k =>
RelayType -> BindingSelector k -> Process (Router k)
routerT RelayType
t BindingSelector k
s = do
  Router k -> Process (Router k)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Router k -> Process (Router k)) -> Router k -> Process (Router k)
forall a b. (a -> b) -> a -> b
$ ExchangeType { name :: String
name        = String
"Router"
                        , state :: State k
state       = HashMap k (HashSet ProcessId)
-> BindingSelector k -> RelayType -> State k
forall k.
HashMap k (HashSet ProcessId)
-> BindingSelector k -> RelayType -> State k
State HashMap k (HashSet ProcessId)
forall k v. HashMap k v
Map.empty BindingSelector k
s RelayType
t
                        , configureEx :: State k -> Message -> Process (State k)
configureEx = State k -> Message -> Process (State k)
forall k. Bindable k => State k -> Message -> Process (State k)
apiConfigure
                        , routeEx :: State k -> Message -> Process (State k)
routeEx     = State k -> Message -> Process (State k)
forall k. Bindable k => State k -> Message -> Process (State k)
apiRoute
                        }

--------------------------------------------------------------------------------
-- Client Facing API                                                          --
--------------------------------------------------------------------------------

-- | Add a binding (for the calling process) to a 'messageKeyRouter' exchange.
bindKey :: String -> Exchange -> Process ()
bindKey :: String -> Exchange -> Process ()
bindKey String
k Exchange
ex = do
  ProcessId
self <- Process ProcessId
P.getSelfPid
  Exchange -> (ProcessId, Binding) -> Process ()
forall m. Serializable m => Exchange -> m -> Process ()
configureExchange Exchange
ex (ProcessId
self, String -> Binding
BindKey String
k)

-- | Add a binding (for the calling process) to a 'headerContentRouter' exchange.
bindHeader :: HeaderName -> String -> Exchange -> Process ()
bindHeader :: String -> String -> Exchange -> Process ()
bindHeader String
n String
v Exchange
ex = do
  ProcessId
self <- Process ProcessId
P.getSelfPid
  Exchange -> (ProcessId, Binding) -> Process ()
forall m. Serializable m => Exchange -> m -> Process ()
configureExchange Exchange
ex (ProcessId
self, String -> String -> Binding
BindHeader String
v String
n)

-- | Send a 'Serializable' message to the supplied 'Exchange'. The given datum
-- will be converted to a 'Message', with the 'key' set to @""@ and the
-- 'headers' to @[]@.
--
-- The routing behaviour will be dependent on the choice of 'BindingSelector'
-- given when initialising the /router/.
route :: Serializable m => Exchange -> m -> Process ()
route :: forall m. Serializable m => Exchange -> m -> Process ()
route = Exchange -> m -> Process ()
forall m. Serializable m => Exchange -> m -> Process ()
post

-- | Send a 'Message' to the supplied 'Exchange'.
-- The routing behaviour will be dependent on the choice of 'BindingSelector'
-- given when initialising the /router/.
routeMessage :: Exchange -> Message -> Process ()
routeMessage :: Exchange -> Message -> Process ()
routeMessage = Exchange -> Message -> Process ()
postMessage

--------------------------------------------------------------------------------
-- Exchage Definition/State & API Handlers                                    --
--------------------------------------------------------------------------------

apiRoute :: forall k. Bindable k
         => State k
         -> Message
         -> Process (State k)
apiRoute :: forall k. Bindable k => State k -> Message -> Process (State k)
apiRoute st :: State k
st@State{HashMap k (HashSet ProcessId)
RelayType
BindingSelector k
bindings :: forall k. State k -> HashMap k (HashSet ProcessId)
selector :: forall k. State k -> BindingSelector k
relayType :: forall k. State k -> RelayType
bindings :: HashMap k (HashSet ProcessId)
selector :: BindingSelector k
relayType :: RelayType
..} Message
msg = do
  k
binding <- BindingSelector k
selector Message
msg
  case k -> HashMap k (HashSet ProcessId) -> Maybe (HashSet ProcessId)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
Map.lookup k
binding HashMap k (HashSet ProcessId)
bindings of
    Maybe (HashSet ProcessId)
Nothing -> State k -> Process (State k)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return State k
st
    Just HashSet ProcessId
bs -> HashSet ProcessId -> (ProcessId -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ HashSet ProcessId
bs (RelayType -> Message -> ProcessId -> Process ()
fwd RelayType
relayType Message
msg) Process () -> Process (State k) -> Process (State k)
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> State k -> Process (State k)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return State k
st
  where
    fwd :: RelayType -> Message -> ProcessId -> Process ()
fwd RelayType
WholeMessage Message
m = Message -> ProcessId -> Process ()
forall a m. (Addressable a, Serializable m) => m -> a -> Process ()
deliver Message
m
    fwd RelayType
PayloadOnly  Message
m = Message -> ProcessId -> Process ()
P.forward (Message -> Message
payload Message
m)

-- TODO: implement 'unbind' ???
-- TODO: apiConfigure currently leaks memory if clients die (we don't cleanup)

apiConfigure :: forall k. Bindable k
             => State k
             -> P.Message
             -> Process (State k)
apiConfigure :: forall k. Bindable k => State k -> Message -> Process (State k)
apiConfigure State k
st Message
msg = do
  State k
-> Message
-> [Message -> Process (Maybe (State k))]
-> Process (State k)
forall a.
a -> Message -> [Message -> Process (Maybe a)] -> Process a
applyHandlers State k
st Message
msg ([Message -> Process (Maybe (State k))] -> Process (State k))
-> [Message -> Process (Maybe (State k))] -> Process (State k)
forall a b. (a -> b) -> a -> b
$ [ \Message
m -> Message
-> ((ProcessId, k) -> Process (State k))
-> Process (Maybe (State k))
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
m (State k -> (ProcessId, k) -> Process (State k)
forall {k}.
Hashable k =>
State k -> (ProcessId, k) -> Process (State k)
createBinding State k
st)
                         , \Message
m -> Message
-> (ProcessMonitorNotification -> Process (State k))
-> Process (Maybe (State k))
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
m (State k -> ProcessMonitorNotification -> Process (State k)
forall {m :: * -> *} {k}.
(Monad m, Hashable k) =>
State k -> ProcessMonitorNotification -> m (State k)
handleMonitorSignal State k
st)
                         ]
  where
    createBinding :: State k -> (ProcessId, k) -> Process (State k)
createBinding s :: State k
s@State{HashMap k (HashSet ProcessId)
RelayType
BindingSelector k
bindings :: forall k. State k -> HashMap k (HashSet ProcessId)
selector :: forall k. State k -> BindingSelector k
relayType :: forall k. State k -> RelayType
bindings :: HashMap k (HashSet ProcessId)
selector :: BindingSelector k
relayType :: RelayType
..} (ProcessId
pid, k
bind) = do
      case k -> HashMap k (HashSet ProcessId) -> Maybe (HashSet ProcessId)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
Map.lookup k
bind HashMap k (HashSet ProcessId)
bindings of
        Maybe (HashSet ProcessId)
Nothing -> do MonitorRef
_ <- ProcessId -> Process MonitorRef
monitor ProcessId
pid
                      State k -> Process (State k)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (State k -> Process (State k)) -> State k -> Process (State k)
forall a b. (a -> b) -> a -> b
$ State k
s { bindings = newBind bind pid bindings }
        Just HashSet ProcessId
ps -> State k -> Process (State k)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (State k -> Process (State k)) -> State k -> Process (State k)
forall a b. (a -> b) -> a -> b
$ State k
s { bindings = addBind bind pid bindings ps }

    newBind :: p -> p -> HashMap p (HashSet p) -> HashMap p (HashSet p)
newBind p
b p
p HashMap p (HashSet p)
bs = p -> HashSet p -> HashMap p (HashSet p) -> HashMap p (HashSet p)
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
Map.insert p
b (p -> HashSet p
forall a. Hashable a => a -> HashSet a
Set.singleton p
p) HashMap p (HashSet p)
bs
    addBind :: p
-> p -> HashMap p (HashSet p) -> HashSet p -> HashMap p (HashSet p)
addBind p
b' p
p' HashMap p (HashSet p)
bs' HashSet p
ps = p -> HashSet p -> HashMap p (HashSet p) -> HashMap p (HashSet p)
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
Map.insert p
b' (p -> HashSet p -> HashSet p
forall a. (Eq a, Hashable a) => a -> HashSet a -> HashSet a
Set.insert p
p' HashSet p
ps) HashMap p (HashSet p)
bs'

    handleMonitorSignal :: State k -> ProcessMonitorNotification -> m (State k)
handleMonitorSignal s :: State k
s@State{HashMap k (HashSet ProcessId)
RelayType
BindingSelector k
bindings :: forall k. State k -> HashMap k (HashSet ProcessId)
selector :: forall k. State k -> BindingSelector k
relayType :: forall k. State k -> RelayType
bindings :: HashMap k (HashSet ProcessId)
selector :: BindingSelector k
relayType :: RelayType
..} (ProcessMonitorNotification MonitorRef
_ ProcessId
p DiedReason
_) =
      let bs :: HashMap k (HashSet ProcessId)
bs  = HashMap k (HashSet ProcessId)
bindings
          bs' :: HashMap k (HashSet ProcessId)
bs' = (HashMap k (HashSet ProcessId)
 -> k -> HashSet ProcessId -> HashMap k (HashSet ProcessId))
-> HashMap k (HashSet ProcessId)
-> HashMap k (HashSet ProcessId)
-> HashMap k (HashSet ProcessId)
forall a k v. (a -> k -> v -> a) -> a -> HashMap k v -> a
Map.foldlWithKey' (\HashMap k (HashSet ProcessId)
a k
k HashSet ProcessId
v -> k
-> HashSet ProcessId
-> HashMap k (HashSet ProcessId)
-> HashMap k (HashSet ProcessId)
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
Map.insert k
k (ProcessId -> HashSet ProcessId -> HashSet ProcessId
forall a. (Eq a, Hashable a) => a -> HashSet a -> HashSet a
Set.delete ProcessId
p HashSet ProcessId
v) HashMap k (HashSet ProcessId)
a) HashMap k (HashSet ProcessId)
bs HashMap k (HashSet ProcessId)
bs
      in State k -> m (State k)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (State k -> m (State k)) -> State k -> m (State k)
forall a b. (a -> b) -> a -> b
$ State k
s { bindings = bs' }