使用AkkaTyped构建响应式应用程序
立即解锁
发布时间: 2025-08-19 00:05:41 订阅数: 9 


Scala 2.13编程实战与进阶
### 使用 Akka Typed 构建响应式应用程序
#### 1. 问题背景与 Actor 监控
在系统运行过程中,可能会出现 Actor 初始化异常,例如 `akka.actor.ActorInitializationException`。当 `Mixer` 出现 `MotorOverheatException` 时,监督者尝试重启该 Actor 四次后放弃并停止了它。由于故障发生在设置块中,该 Actor 既无法接收 `Mix` 命令,也无法接收生命周期事件通知。
此时系统会陷入停滞,因为 `Chef` 仍在等待故障的 `Mixer` 完成工作。为了解决这个问题,我们需要一种机制来通知 `Chef` 哪些 `Mixer` 已终止。Akka Typed 提供了监控机制,具体实现如下:
```scala
val mixers = for (i <- 1 to eggs)
yield context.spawn(Mixer.controlledMix, s"Mixer_$i")
mixers.foreach(mixer => context.watchWith(mixer, BrokenMixer(mixer)))
```
这里,`context.watchWith` 的第一个参数是要监控的 Actor,第二个参数是消息适配器。`BrokenMixer` 消息的定义和处理如下:
```scala
case class BrokenMixer(mixer: ActorRef[Mixer.Mix]) extends Command
def mixing(...): Behavior[Command] = Behaviors.receivePartial {
case (context, BrokenMixer(m)) =>
context.log.warning("Broken mixer detected {}", m)
context.self ! Collect(Dough(0), m)
Behaviors.same
}
```
当检测到子 Actor 终止时,`Chef` 会记录日志并给自己发送一条消息以补偿丢失的工作。
#### 2. Cook、Baker 与相关机制
在准备好面团后,需要 `Cook` 将面团制成生饼干,`Baker` 将生饼干放入烤箱烘烤。`Cook` 的实现较为简单,它只是将面团转换为一定数量的生饼干并发送给管理者。
`Baker` 则更为复杂,它需要一个烤箱。首先,我们使用一个特殊的行为来启动烤箱:
```scala
def turnOvenOn: Behavior[Command] = Behaviors.setup { context =>
val oven = context.spawn(Oven.empty, "Oven")
idle(oven)
}
```
然后定义 `idle` 行为,等待工作:
```scala
def idle(oven: ActorRef[Oven.Command]): Behavior[Command] =
Behaviors.receivePartial {
case (context, BakeCookies(rawCookies, manager)) =>
oven ! Put(rawCookies.count, context.self)
Behaviors.withTimers { timers =>
timers.startSingleTimer(TimerKey, CheckOven, DefaultBakingTime)
baking(oven, manager)
}
}
```
这里使用了 `withTimers` 行为构造函数,它提供了对 `TimerScheduler` 的访问,我们可以使用它来定义周期性和一次性定时器。
此外,由于烤箱容量有限,`Baker` 需要批量烘烤生饼干,因此需要管理一个生饼干队列。我们使用 `stash` 来实现这一点:
```scala
def baking(oven: ActorRef[Oven.Command],
manager: ActorRef[Manager.Command]): Behavior[Command] =
Behaviors.setup[Command] { context =>
val buffer = StashBuffer[Command](capacity = 100)
Behaviors.receiveMessage {
case CheckOven =>
oven ! Extract(context.self)
Behaviors.same
case c: TooManyCookies=>
buffer.stash(BakeCookies(c.raw, manager))
Behaviors.same
case c : BakeCookies =>
buffer.stash(c)
Behaviors.same
case CookiesReady(cookies) =>
manager ! ReceiveReadyCookies(cookies)
buffer.unstashAll(context, idle(oven))
}
}
```
`stash` 会将当前行为无法处理的消息缓冲起来,并在切换到可以处理这些消息的替代行为之前重新播放它们。
#### 3. 管理者行为组合
在定义了面包店的各个工人后,我们需要一个管理者。在 Akka Typed 中,我们可以通过定义每个状态的原子行为,然后根据需要返回适当的行为来实现管理者的功能:
```scala
def waitingForGroceries = receiveMessagePartial[Command] {
case ReceiveGroceries(g) =>
context.log.info("Mixing {}", g)
chef ! Chef.Mix(g, context.self)
waitingForPastry
}
def waitingForPastry = receiveMessagePartial[Command] {
case ReceivePastry(p) =>
context.log.info("Forming {}", p)
cook ! Cook.FormCookies(p, context.self)
waitingForRawCookies
}
```
我们还可以将这些行为组合起来,实现管理者的并行版本:
```scala
def manage(chef: ActorRef[Chef.Command],
cook: ActorRef[Cook.FormCookies],
baker: ActorRef[Baker.Command]): Behavior[Command] =
...
def sendBoyShopping = receiveMessagePartial ...
def waitingForGroceries = receivePartial[Command] {
...
manage(chef, cook, baker)
}
def waitingForPastry = receiveMessagePartial[Command] {
...
manage(chef, cook, baker)
}
def waitingForRawCookies = receiveMessagePartial[Command] {
case ReceiveRawCookies(c) =>
baker ! Baker.BakeCookies(c, context.self)
manage(chef, cook, baker)
}
def waitingForReadyCookies = receiveMessagePartial[Command] {
case ReceiveReadyCookies(c) =>
context.log.info("Done baking cookies: {}", c)
manage(chef, cook, baker)
}
lookupSeller orElse
sendBoyShopping orElse
waitingForGroceries orElse
waitingForPastry orElse
waitingForRawCookies orElse
waitingForReadyCookies
}
```
这样,管理者就能够在任何处理状态下处理每个消息。
#### 4. 集群配置
为了让杂货店作为一个单独的 Actor 系统运行,我们可以使用 Akka Typed 的集群功能。首先,需要在 `build.sbt` 中添加以下依赖:
```scala
"com.typesafe.akka" %% "akka-cluster-typed" % akkaVersion,
```
然后在 `application.conf` 中添加以下配置:
```
akka {
actor.provider = "cluster"
remote {
netty.tcp {
```
0
0
复制全文
相关推荐








