| Copyright | (c) Tim Watson 2013 - 2014 |
|---|---|
| License | BSD3 (see the file LICENSE) |
| Maintainer | Tim Watson <[email protected]> |
| Stability | experimental |
| Portability | non-portable (requires concurrency) |
| Safe Haskell | None |
| Language | Haskell2010 |
Control.Distributed.Process.Execution
Description
- Inter-Process Traffic Management
The Execution Framework provides tools for load regulation, workload shedding and remote hand-off. The currently implementation provides only a subset of the plumbing required, comprising tools for event management, mailbox buffering and message routing.
Synopsis
- __remoteTable :: RemoteTable -> RemoteTable
- acceptEverything :: Closure (Message -> Process FilterResult)
- acceptMatching :: Closure (Closure (Message -> Process FilterResult) -> Message -> Process FilterResult)
- active :: Mailbox -> Filter -> Process ()
- createMailbox :: BufferType -> Limit -> Process Mailbox
- deliver :: Mailbox -> Process ()
- monitor :: Mailbox -> Process MonitorRef
- notify :: Mailbox -> Process ()
- resize :: Mailbox -> Integer -> Process ()
- startMailbox :: ProcessId -> BufferType -> Limit -> Process Mailbox
- startSupervisedMailbox :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process Mailbox
- statistics :: Mailbox -> Process MailboxStats
- data BufferType
- data Delivery = Delivery {}
- data FilterResult
- type Limit = Integer
- data Mailbox
- data MailboxStats = MailboxStats {}
- data NewMail = NewMail !Mailbox !Integer
- bindToBroadcaster :: Exchange -> Process ()
- broadcastClient :: Exchange -> Process (InputStream Message)
- broadcastExchange :: Process Exchange
- broadcastExchangeT :: Process BroadcastExchange
- applyHandlers :: a -> Message -> [Message -> Process (Maybe a)] -> Process a
- configureExchange :: Serializable m => Exchange -> m -> Process ()
- createMessage :: Serializable m => String -> [(String, String)] -> m -> Message
- post :: Serializable a => Exchange -> a -> Process ()
- postMessage :: Exchange -> Message -> Process ()
- runExchange :: ExchangeType s -> MVar (ControlPort ControlMessage) -> Process ()
- startExchange :: ExchangeType s -> Process Exchange
- startSupervisedRef :: ExchangeType s -> SupervisorPid -> Process (ProcessId, Message)
- bindHeader :: HeaderName -> String -> Exchange -> Process ()
- bindKey :: String -> Exchange -> Process ()
- headerContentRouter :: RelayType -> HeaderName -> Process Exchange
- messageKeyRouter :: RelayType -> Process Exchange
- route :: Serializable m => Exchange -> m -> Process ()
- routeMessage :: Exchange -> Message -> Process ()
- router :: Bindable k => RelayType -> BindingSelector k -> Process Exchange
- supervisedRouter :: Bindable k => RelayType -> BindingSelector k -> SupervisorPid -> Process Exchange
- type BroadcastExchange = ExchangeType BroadcastEx
- data Exchange
- data ExchangeType s = ExchangeType {}
- data Message = Message {}
- class (Hashable k, Eq k, Serializable k) => Bindable k
- data Binding
- = BindKey {
- bindingKey :: !String
- | BindHeader {
- bindingKey :: !String
- headerName :: !HeaderName
- | BindNone
- = BindKey {
- type BindingSelector k = Message -> Process k
- type HeaderName = String
- data RelayType
Mailbox Buffering
acceptEverything :: Closure (Message -> Process FilterResult) Source #
A do-nothing filter that accepts all messages (i.e., returns Keep
for any input).
acceptMatching :: Closure (Closure (Message -> Process FilterResult) -> Message -> Process FilterResult) Source #
A filter that takes a Closure (Message -> Process FilterResult) holding
the filter function and applies it remotely (i.e., in the mailbox's own
managed process).
active :: Mailbox -> Filter -> Process () Source #
Instructs the mailbox to send a Delivery as soon as any mail is
available, or immediately (if the buffer already contains data).
NB: signals are only delivered to the mailbox's owning process.
createMailbox :: BufferType -> Limit -> Process Mailbox Source #
Start a mailbox for the calling process.
create = getSelfPid >>= start
deliver :: Mailbox -> Process () Source #
Instructs the mailbox to deliver all pending messages to the owner.
resize :: Mailbox -> Integer -> Process () Source #
Alters the mailbox's limit - this might cause messages to be dropped!
startMailbox :: ProcessId -> BufferType -> Limit -> Process Mailbox Source #
Start a mailbox for the supplied ProcessId.
start = spawnLocal $ run
startSupervisedMailbox :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process Mailbox Source #
As startMailbox, but suitable for use in supervisor child specs.
statistics :: Mailbox -> Process MailboxStats Source #
Obtain statistics (from/to anywhere) about a mailbox.
data BufferType Source #
Describes the different types of buffer.
Constructors
| Queue | FIFO buffer, limiter drops the eldest message (queue head) |
| Stack | unordered buffer, limiter drops the newest (top) message |
| Ring | FIFO buffer, limiter refuses (i.e., drops) new messages |
Instances
| Show BufferType Source # | |
Defined in Control.Distributed.Process.Execution.Mailbox Methods showsPrec :: Int -> BufferType -> ShowS # show :: BufferType -> String # showList :: [BufferType] -> ShowS # | |
| Eq BufferType Source # | |
Mail delivery.
Constructors
| Delivery | |
Instances
data FilterResult Source #
Instances
| Binary FilterResult Source # | |||||
| Generic FilterResult Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox Associated Types
| |||||
| type Rep FilterResult Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox type Rep FilterResult = D1 ('MetaData "FilterResult" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-3i3thiBwSqKDSHlFxSRcIy" 'False) (C1 ('MetaCons "Keep" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "Skip" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Send" 'PrefixI 'False) (U1 :: Type -> Type))) | |||||
Represents the maximum number of messages the internal buffer can hold.
Opaque handle to a mailbox.
Instances
| Binary Mailbox Source # | |
| Addressable Mailbox Source # | |
| Linkable Mailbox Source # | |
| Resolvable Mailbox Source # | |
| Routable Mailbox Source # | |
Defined in Control.Distributed.Process.Execution.Mailbox Methods sendTo :: (Serializable m, Resolvable Mailbox) => Mailbox -> m -> Process () # unsafeSendTo :: (NFSerializable m, Resolvable Mailbox) => Mailbox -> m -> Process () # | |
| Generic Mailbox Source # | |
| Show Mailbox Source # | |
| Eq Mailbox Source # | |
| type Rep Mailbox Source # | |
data MailboxStats Source #
Bundle of statistics data, available on request via
the mailboxStats API call.
Constructors
| MailboxStats | |
Fields | |
Instances
| Binary MailboxStats Source # | |||||
| Generic MailboxStats Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox Associated Types
| |||||
| Show MailboxStats Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox Methods showsPrec :: Int -> MailboxStats -> ShowS # show :: MailboxStats -> String # showList :: [MailboxStats] -> ShowS # | |||||
| type Rep MailboxStats Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox type Rep MailboxStats = D1 ('MetaData "MailboxStats" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-3i3thiBwSqKDSHlFxSRcIy" 'False) (C1 ('MetaCons "MailboxStats" 'PrefixI 'True) ((S1 ('MetaSel ('Just "pendingMessages") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer) :*: S1 ('MetaSel ('Just "droppedMessages") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer)) :*: (S1 ('MetaSel ('Just "currentLimit") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Limit) :*: S1 ('MetaSel ('Just "owningProcess") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 ProcessId)))) | |||||
Marker message indicating to the owning process that mail has arrived.
Instances
| Binary NewMail Source # | |||||
| Generic NewMail Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox Associated Types
| |||||
| Show NewMail Source # | |||||
| type Rep NewMail Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox type Rep NewMail = D1 ('MetaData "NewMail" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-3i3thiBwSqKDSHlFxSRcIy" 'False) (C1 ('MetaCons "NewMail" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Mailbox) :*: S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Integer))) | |||||
Message Exchanges
bindToBroadcaster :: Exchange -> Process () Source #
broadcastClient :: Exchange -> Process (InputStream Message) Source #
Create a binding to the given broadcast exchange for the calling process
and return an InputStream that can be used in the expect and
receiveWait family of messaging primitives. This form of client interaction
helps avoid cluttering the caller's mailbox with Message data, since the
InputChannel provides a separate input stream (in a similar fashion to
a typed channel).
Example:
is <- broadcastClient ex msg <- receiveWait [ matchInputStream is ] handleMessage (payload msg)
broadcastExchange :: Process Exchange Source #
Start a new broadcast exchange and return a handle to the exchange.
broadcastExchangeT :: Process BroadcastExchange Source #
The ExchangeType of a broadcast exchange. Can be combined with the
startSupervisedRef and startSupervised APIs.
applyHandlers :: a -> Message -> [Message -> Process (Maybe a)] -> Process a Source #
Utility for custom exchange type authors - evaluates a set of primitive
message handlers from left to right, returning the first which evaluates
to Just a, or the initial e value if all the handlers yield Nothing.
configureExchange :: Serializable m => Exchange -> m -> Process () Source #
Sends an arbitrary Serializable datum to an exchange, for use as a
configuration change - see configureEx for details.
createMessage :: Serializable m => String -> [(String, String)] -> m -> Message Source #
post :: Serializable a => Exchange -> a -> Process () Source #
Posts an arbitrary Serializable datum to an exchange. The raw datum is
wrapped in the Message data type, with its key set to "" and its
headers to [].
runExchange :: ExchangeType s -> MVar (ControlPort ControlMessage) -> Process () Source #
startExchange :: ExchangeType s -> Process Exchange Source #
Starts an exchange process with the given ExchangeType.
startSupervisedRef :: ExchangeType s -> SupervisorPid -> Process (ProcessId, Message) Source #
Starts an exchange as part of a supervision tree.
Example: > childSpec = toChildStart $ startSupervisedRef exType
bindHeader :: HeaderName -> String -> Exchange -> Process () Source #
Add a binding (for the calling process) to a headerContentRouter exchange.
bindKey :: String -> Exchange -> Process () Source #
Add a binding (for the calling process) to a messageKeyRouter exchange.
headerContentRouter :: RelayType -> HeaderName -> Process Exchange Source #
A router that matches on a specific (named) header. To bind a client
Process to such an exchange, use the bindHeader function.
route :: Serializable m => Exchange -> m -> Process () Source #
Send a Serializable message to the supplied Exchange. The given datum
will be converted to a Message, with the key set to "" and the
headers to [].
The routing behaviour will be dependent on the choice of BindingSelector
given when initialising the router.
routeMessage :: Exchange -> Message -> Process () Source #
Send a Message to the supplied Exchange.
The routing behaviour will be dependent on the choice of BindingSelector
given when initialising the router.
router :: Bindable k => RelayType -> BindingSelector k -> Process Exchange Source #
Defines a router exchange. The BindingSelector is used to construct
a binding (i.e., an instance of the Bindable type k) for each incoming
Message. Such bindings are matched against bindings stored in the exchange.
Clients of a router exchange are identified by a binding, mapped to
one or more ProcessIds.
The format of the bindings, nature of their storage and mechanism for
submitting new bindings is implementation dependent (i.e., will vary by
exchange type). For example, the messageKeyRouter and headerContentRouter
implementations both use the Binding data type, which can represent a
Message key or a HeaderName and content. As with all custom exchange
types, bindings should be submitted by evaluating configureExchange with
a suitable data type.
supervisedRouter :: Bindable k => RelayType -> BindingSelector k -> SupervisorPid -> Process Exchange Source #
Defines a router that can be used in a supervision tree.
type BroadcastExchange = ExchangeType BroadcastEx Source #
Opaque handle to an exchange.
Instances
| Binary Exchange Source # | |
| Linkable Exchange Source # | |
| Resolvable Exchange Source # | |
| Generic Exchange Source # | |
| Show Exchange Source # | |
| Eq Exchange Source # | |
| type Rep Exchange Source # | |
data ExchangeType s Source #
Different exchange types are defined using record syntax.
The configureEx and routeEx API functions are called during the exchange
lifecycle when incoming traffic arrives. Configuration messages are
completely arbitrary types and the exchange type author is entirely
responsible for decoding them. Messages posted to the exchange (see the
Message data type) are passed to the routeEx API function along with the
exchange type's own internal state. Both API functions return a new
(potentially updated) state and run in the Process monad.
Constructors
| ExchangeType | |
Messages sent to an exchange can optionally provide a routing key and a list of (key, value) headers in addition to the underlying payload.
Constructors
| Message | |
Instances
| Binary Message Source # | |||||
| NFData Message Source # | |||||
| Generic Message Source # | |||||
Defined in Control.Distributed.Process.Execution.Exchange.Internal Associated Types
| |||||
| Show Message Source # | |||||
| type Rep Message Source # | |||||
Defined in Control.Distributed.Process.Execution.Exchange.Internal type Rep Message = D1 ('MetaData "Message" "Control.Distributed.Process.Execution.Exchange.Internal" "distributed-process-execution-0.1.5.0-3i3thiBwSqKDSHlFxSRcIy" 'False) (C1 ('MetaCons "Message" 'PrefixI 'True) (S1 ('MetaSel ('Just "key") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String) :*: (S1 ('MetaSel ('Just "headers") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 [(String, String)]) :*: S1 ('MetaSel ('Just "payload") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Message)))) | |||||
class (Hashable k, Eq k, Serializable k) => Bindable k Source #
Things that can be used as binding keys in a router.
The binding key used by the built-in key and header based routers.
Constructors
| BindKey | |
Fields
| |
| BindHeader | |
Fields
| |
| BindNone | |
Instances
| Binary Binding Source # | |||||
| NFData Binding Source # | |||||
| Generic Binding Source # | |||||
Defined in Control.Distributed.Process.Execution.Exchange.Router Associated Types
| |||||
| Show Binding Source # | |||||
| Eq Binding Source # | |||||
| Hashable Binding Source # | |||||
| type Rep Binding Source # | |||||
Defined in Control.Distributed.Process.Execution.Exchange.Router type Rep Binding = D1 ('MetaData "Binding" "Control.Distributed.Process.Execution.Exchange.Router" "distributed-process-execution-0.1.5.0-3i3thiBwSqKDSHlFxSRcIy" 'False) (C1 ('MetaCons "BindKey" 'PrefixI 'True) (S1 ('MetaSel ('Just "bindingKey") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String)) :+: (C1 ('MetaCons "BindHeader" 'PrefixI 'True) (S1 ('MetaSel ('Just "bindingKey") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String) :*: S1 ('MetaSel ('Just "headerName") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 HeaderName)) :+: C1 ('MetaCons "BindNone" 'PrefixI 'False) (U1 :: Type -> Type))) | |||||
type BindingSelector k = Message -> Process k Source #
type HeaderName = String Source #
Given to a router to indicate whether clients should
receive Message payloads only, or the whole Message object
itself.
Constructors
| PayloadOnly | |
| WholeMessage |