Akka 指南 之「第 5 部分: 查询设备组」
第 5 部分: 查询设备组
依赖
在你项目中添加如下依赖:
<!-- Maven -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.5.19</version>
</dependency>
<!-- Gradle -->
dependencies {
compile group: 'com.typesafe.akka', name: 'akka-actor_2.11', version: '2.5.19'
}
<!-- sbt -->
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.19"
简介
到目前为止,我们所看到的对话模式很简单,因为它们要求 Actor 保持很少或根本就没有状态。明确地:
- 设备 Actor 返回一个不需要状态更改的读取
- 记录温度,更新单个字段
- 设备组 Actor 通过添加或删除映射中的条目来维护组成员身份
在本部分中,我们将使用一个更复杂的示例。由于房主会对整个家庭的温度感兴趣,我们的目标是能够查询一个组中的所有设备参与者。让我们先研究一下这样的查询 API 应该如何工作。
处理可能的情况
我们面临的第一个问题是,一个组的成员是动态的。每个传感器设备都由一个可以随时停止的 Actor 表示。在查询开始时,我们可以询问所有现有设备 Actor 当前的温度。但是,在查询的生命周期中:
- 设备 Actor 可能会停止工作,无法用温度读数做出响应。
- 一个新的设备 Actor 可能会启动,并且不会包含在查询中,因为我们不知道它。
这些问题可以用许多不同的方式来解决,但重要的是要解决所期望的行为。以下工作对于我们的用例是很有用的:
- 当查询到达时,组 Actor 将获取现有设备 Actor 的快照(
snapshot
),并且只向这些 Actor 询问温度。 - 查询到达后启动的 Actor 可以被忽略。
- 如果快照中的某个 Actor 在查询期间停止而没有应答,我们将向查询消息的发送者报告它停止的事实。
除了设备 Actor 动态地变化之外,一些 Actor 可能需要很长时间来响应。例如,它们可能被困在一个意外的无限循环中,或者由于一个 bug 而失败,并放弃我们的请求。我们不希望查询无限期地继续,因此在以下任何一种情况下,我们都会认为它是完成的:
- 快照中的所有 Actor 要么已响应,要么确认已停止。
- 我们达到了预定的(
pre-defined
)最后期限。
考虑到这些决定,再加上快照中的设备可能刚刚启动但尚未接收到要记录的温度,我们可以针对温度查询为每个设备 Actor 定义四种状态:
- 它有一个可用的温度:
Temperature
。 - 它已经响应,但还没有可用的温度:
TemperatureNotAvailable
。 - 它在响应之前已停止:
DeviceNotAvailable
。 - 它在最后期限之前没有响应:
DeviceTimedOut
。
在消息类型中汇总这些信息,我们可以将以下代码添加到DeviceGroup
:
public static final class RequestAllTemperatures {
final long requestId;
public RequestAllTemperatures(long requestId) {
this.requestId = requestId;
}
}
public static final class RespondAllTemperatures {
final long requestId;
final Map<String, TemperatureReading> temperatures;
public RespondAllTemperatures(long requestId, Map<String, TemperatureReading> temperatures) {
this.requestId = requestId;
this.temperatures = temperatures;
}
}
public static interface TemperatureReading {
}
public static final class Temperature implements TemperatureReading {
public final double value;
public Temperature(double value) {
this.value = value;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Temperature that = (Temperature) o;
return Double.compare(that.value, value) == 0;
}
@Override
public int hashCode() {
long temp = Double.doubleToLongBits(value);
return (int) (temp ^ (temp >>> 32));
}
@Override
public String toString() {
return "Temperature{" +
"value=" + value +
'}';
}
}
public enum TemperatureNotAvailable implements TemperatureReading {
INSTANCE
}
public enum DeviceNotAvailable implements TemperatureReading {
INSTANCE
}
public enum DeviceTimedOut implements TemperatureReading {
INSTANCE
}
实现查询功能
实现查询的一种方法是向组设备 Actor 添加代码。然而,在实践中,这可能非常麻烦并且容易出错。请记住,当我们启动查询时,我们需要获取当前设备的快照并启动计时器,以便强制执行截止时间。同时,另一个查询可以到达。对于第二个查询,我们需要跟踪完全相同的信息,但与前一个查询隔离。这将要求我们在查询和设备 Actor 之间维护单独的映射。
相反,我们将实现一种更简单、更优雅的方法。我们将创建一个表示单个查询的 Actor,并代表组 Actor 执行完成查询所需的任务。到目前为止,我们已经创建了属于典型域对象(classical domain
)的 Actor,但是现在,我们将创建一个表示流程或任务而不是实体的 Actor。我们通过保持我们的组设备 Actor 简单和能够更好地隔离测试查询功能而受益。
定义查询 Actor
首先,我们需要设计查询 Actor 的生命周期。这包括识别其初始状态、将要采取的第一个操作以及清除(如果需要)。查询 Actor 需要以下信息:
- 要查询的活动设备 Actor 的快照和 ID。
- 启动查询的请求的 ID(以便我们可以在响应中包含它)。
- 发送查询的 Actor 的引用。我们会直接给这个 Actor 响应。
- 指示查询等待响应的期限。将其作为参数将简化测试。
设置查询超时
由于我们需要一种方法来指示我们愿意等待响应的时间,现在是时候引入一个我们还没有使用的新的 Akka 特性,即内置的调度器(built-in scheduler
)功能了。使用调度器(scheduler
)很简单:
- 我们可以从
ActorSystem
中获取调度器,而ActorSystem
又可以从 Actor 的上下文中访问:getContext().getSystem().scheduler()
。这需要一个ExecutionContext
,它是将执行计时器任务本身的线程池。在我们的示例中,我们通过传入getContext().dispatcher()
来使用与 Actor 相同的调度器。 scheduler.scheduleOnce(time, actorRef, message, executor, sender)
方法将在指定的time
将消息message
调度到将来(future
),并将其发送给 Actor 的ActorRef
。
我们需要创建一个表示查询超时的消息。为此,我们创建了一个没有任何参数的简单消息CollectionTimeout
。scheduleOnce
的返回值是Cancellable
,如果查询及时成功完成,可以使用它取消定时器。在查询开始时,我们需要询问每个设备 Actor 当前的温度。为了能够快速检测那些在ReadTemperature
信息之前停止的设备,我们还将观察每个 Actor。这样,对于那些在查询生命周期中停止的消息,我们就可以得到Terminated
消息,因此我们不需要等到超时时再将这些消息标记为不可用。
综上所述,DeviceGroupQuery
Actor 的代码大致如下:
public class DeviceGroupQuery extends AbstractActor {
public static final class CollectionTimeout {
}
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
final Map<ActorRef, String> actorToDeviceId;
final long requestId;
final ActorRef requester;
Cancellable queryTimeoutTimer;
public DeviceGroupQuery(Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) {
this.actorToDeviceId = actorToDeviceId;
this.requestId = requestId;
this.requester = requester;
queryTimeoutTimer = getContext().getSystem().scheduler().scheduleOnce(
timeout, getSelf(), new CollectionTimeout(), getContext().dispatcher(), getSelf()
);
}
public static Props props(Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) {
return Props.create(DeviceGroupQuery.class, () -> new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout));
}
@Override
public void preStart() {
for (ActorRef deviceActor : actorToDeviceId.keySet()) {
getContext().watch(deviceActor);
deviceActor.tell(new Device.ReadTemperature(0L), getSelf());
}
}
@Override
public void postStop() {
queryTimeoutTimer.cancel();
}
}
跟踪 Actor 状态
除了挂起的定时器之外,查询 Actor 还有一个状态方面,它跟踪一组 Actor:已回复、已停止或未回复。跟踪此状态的一种方法是在 Actor 中创建可变字段。另一种方法利用改变 Actor 对消息的响应方式的能力。Receive
是一个可以从另一个函数返回的函数(如果你愿意的话,也可以是对象)。默认情况下,receive
块定义了 Actor 的行为,但在 Actor 的生命周期中可以多次更改它。我们调用context.become(newBehavior)
,其中newBehavior
是任何类型的Receive
。我们将利用此功能跟踪 Actor 的状态。
对于我们的用例:
- 我们不直接定义
receive
,而是委托waitingForReplies
函数来创建Receive
。 waitingForReplies
函数将跟踪两个更改的值:- 已收到响应的
Map
; - 我们还在等待 Actors 响应的
Set
。
- 已收到响应的
我们有三件事要做:
- 我们可以从其中一个设备接收
RespondTemperature
。 - 我们可以为同时被停止的设备 Actor 接收
Terminated
的消息。 - 我们可以达到截止时间(
deadline
)并收到一个CollectionTimeout
消息。
在前两种情况下,我们需要跟踪响应,现在我们将其委托给receivedResponse
方法,稍后我们将讨论该方法。在超时的情况下,我们需要简单地把所有还没有响应的 Actors(集合stillWaiting
的成员)放在DeviceTimedOut
中作为最终响应的状态。然后我们用收集到的结果回复查询提交者,并停止查询 Actor。
要完成此操作,请将以下代码添加到DeviceGroupQuery
源文件中:
@Override
public Receive createReceive() {
return waitingForReplies(new HashMap<>(), actorToDeviceId.keySet());
}
public Receive waitingForReplies(
Map<String, DeviceGroup.TemperatureReading> repliesSoFar,
Set<ActorRef> stillWaiting) {
return receiveBuilder()
.match(Device.RespondTemperature.class, r -> {
ActorRef deviceActor = getSender();
DeviceGroup.TemperatureReading reading = r.value
.map(v -> (DeviceGroup.TemperatureReading) new DeviceGroup.Temperature(v))
.orElse(DeviceGroup.TemperatureNotAvailable.INSTANCE);
receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar);
})
.match(Terminated.class, t -> {
receivedResponse(t.getActor(), DeviceGroup.DeviceNotAvailable.INSTANCE, stillWaiting, repliesSoFar);
})
.match(CollectionTimeout.class, t -> {
Map<String, DeviceGroup.TemperatureReading> replies = new HashMap<>(repliesSoFar);
for (ActorRef deviceActor : stillWaiting) {
String deviceId = actorToDeviceId.get(deviceActor);
replies.put(deviceId, DeviceGroup.DeviceTimedOut.INSTANCE);
}
requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, replies), getSelf());
getContext().stop(getSelf());
})
.build();
}
目前还不清楚我们将如何“改变”repliesSoFar
和stillWaiting
数据结构。需要注意的一点是,waitingForReplies
函数不能直接处理消息。它返回一个Receive
函数来处理消息。这意味着,如果我们使用不同的参数再次调用waitingForReplies
,那么它将返回一个全新的Receive
,该Receive
将使用这些新参数。
我们已经看到了如何通过从receive
的返回来安装(install
)初始化Receive
。例如,为了安装一个新的Receive
,为了记录一个新的回复,我们需要一些机制。此机制是方法context.become(newReceive)
,它将 Actor 的消息处理函数更改为提供的newReceive
函数。可以想象,在开始之前,Actor 会自动调用context.become(receive)
,即安装从receive
返回的Receive
函数。这是另一个重要的观察:处理消息的不是receive
,而是返回一个实际处理消息的Receive
函数。
我们现在必须弄清楚在receivedResponse
中该怎么做。首先,我们需要在repliesSoFar
中记录新的结果,并将 Actor 从stillWaiting
中移除。下一步是检查是否还有其他我们正在等待的 Actors。如果没有,我们将查询结果发送给原始请求者并停止查询 Actor。否则,我们需要更新repliesSoFar
和stillWaiting
结构并等待更多的消息。
在之前的代码中,我们将Terminated
视为隐式响应DeviceNotAvailable
,因此receivedResponse
不需要执行任何特殊操作。但是,还有一个小任务我们仍然需要做。我们可能从设备 Actor 那里接收到正确的响应,但是在查询的生命周期中,它会停止。我们不希望此第二个事件覆盖已收到的响应。换句话说,我们不希望在记录响应之后接收Terminated
。这很容易通过调用context.unwatch(ref)
实现。此方法还确保我们不会接收已经在 Actor 邮箱中的Terminated
事件。多次调用此函数也是安全的,只有第一次调用才会有任何效果,其余的调用将被忽略
通过以上的分析,我们创建receivedResponse
方法为:
public void receivedResponse(ActorRef deviceActor,
DeviceGroup.TemperatureReading reading,
Set<ActorRef> stillWaiting,
Map<String, DeviceGroup.TemperatureReading> repliesSoFar) {
getContext().unwatch(deviceActor);
String deviceId = actorToDeviceId.get(deviceActor);
Set<ActorRef> newStillWaiting = new HashSet<>(stillWaiting);
newStillWaiting.remove(deviceActor);
Map<String, DeviceGroup.TemperatureReading> newRepliesSoFar = new HashMap<>(repliesSoFar);
newRepliesSoFar.put(deviceId, reading);
if (newStillWaiting.isEmpty()) {
requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar), getSelf());
getContext().stop(getSelf());
} else {
getContext().become(waitingForReplies(newRepliesSoFar, newStillWaiting));
}
}
在这一点上,我们很自然地会问,使用context.become()
技巧,而不是使repliesSoFar
和stillWaiting
结构成为 Actor 的可变字段(例如,vars
),我们获得了什么?在这个简单的例子中,没有那么多。当你突然有更多的状态时,这种状态保持的价值变得更加明显。由于每个状态可能都有与其自身相关的临时数据,因此将这些数据作为字段保存会污染 Actor 的全局状态,也就是说,不清楚在什么状态下使用了哪些字段。使用参数化的Receive
“工厂”方法,我们可以保持仅与状态相关的数据私有化。使用可变字段而不是context.become()
重写查询仍然是一个很好的练习。但是,建议你熟悉我们在这里使用的解决方案,因为它有助于以更干净和更可维护的方式构造更复杂的 Actor 代码。
现在,我们的查询 Actor 完成了,代码如下:
public class DeviceGroupQuery extends AbstractActor {
public static final class CollectionTimeout {
}
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
final Map<ActorRef, String> actorToDeviceId;
final long requestId;
final ActorRef requester;
Cancellable queryTimeoutTimer;
public DeviceGroupQuery(Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) {
this.actorToDeviceId = actorToDeviceId;
this.requestId = requestId;
this.requester = requester;
queryTimeoutTimer = getContext().getSystem().scheduler().scheduleOnce(
timeout, getSelf(), new CollectionTimeout(), getContext().dispatcher(), getSelf()
);
}
public static Props props(Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) {
return Props.create(DeviceGroupQuery.class, () -> new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout));
}
@Override
public void preStart() {
for (ActorRef deviceActor : actorToDeviceId.keySet()) {
getContext().watch(deviceActor);
deviceActor.tell(new Device.ReadTemperature(0L), getSelf());
}
}
@Override
public void postStop() {
queryTimeoutTimer.cancel();
}
@Override
public Receive createReceive() {
return waitingForReplies(new HashMap<>(), actorToDeviceId.keySet());
}
public Receive waitingForReplies(
Map<String, DeviceGroup.TemperatureReading> repliesSoFar,
Set<ActorRef> stillWaiting) {
return receiveBuilder()
.match(Device.RespondTemperature.class, r -> {
ActorRef deviceActor = getSender();
DeviceGroup.TemperatureReading reading = r.value
.map(v -> (DeviceGroup.TemperatureReading) new DeviceGroup.Temperature(v))
.orElse(DeviceGroup.TemperatureNotAvailable.INSTANCE);
receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar);
})
.match(Terminated.class, t -> {
receivedResponse(t.getActor(), DeviceGroup.DeviceNotAvailable.INSTANCE, stillWaiting, repliesSoFar);
})
.match(CollectionTimeout.class, t -> {
Map<String, DeviceGroup.TemperatureReading> replies = new HashMap<>(repliesSoFar);
for (ActorRef deviceActor : stillWaiting) {
String deviceId = actorToDeviceId.get(deviceActor);
replies.put(deviceId, DeviceGroup.DeviceTimedOut.INSTANCE);
}
requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, replies), getSelf());
getContext().
- Java 实现线程死锁
- Java 四种线程池的用法分析
- 深入了解Java对象序列化
- 在Java EE7框架中使用MongoDB
- 用事实说话,成熟的ORM性能不是瓶颈,灵活性不是问题:EF5.0、PDF.NET5.0、Dapper原理分析与测试手记
- iPhone的Wi-Fi芯片漏洞利用POC公布,赶紧更新系统吧
- No.003 Longest Substring Without Repeating Characters
- 【Spark研究】极简 Spark 入门笔记——安装和第一个回归程序
- 通常Java开发人员如何进行数据排序?
- 消息服务框架使用案例之--大文件上传(断点续传)功能
- Java中三种Set类型用法、性能大比拼
- Android基础总结(5)——数据存储,持久化技术
- 如何突破Windows环境限制打开“命令提示符”
- 【Spark研究】Spark之工作原理
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 一个合格的中级前端工程师应该掌握的 20 个 Vue 技巧
- Vue 3 任意传送门——Teleport
- @JsonCreator自定义反序列化函数-JSON框架Jackson精解第5篇
- 不靠谱的 console
- 使用ThreadLocal和ArgumentResolver方便开发
- 【Java8新特性】05 使用Optional取代null
- 在不被spring容器管理的类中使用ApplicationContext应用上下文bean
- Spring boot 在静态类中注入spring组件
- Spring boot 自定义配置文件
- 数据分析 常见技巧和经验总结
- Go by Example 中文版: Base64 编码
- Django3.0+supervisor+uvicorn+nginx进行线上部署
- 前端杂货铺上新
- 短视频系统源代码,实现前置摄像头水平翻转
- linux配置SOCK5代理