Akka-CQRS(6)- read-side
前面我们全面介绍了在akka-cluster环境下实现的CQRS写端write-side。简单来说就是把发生事件描述作为对象严格按发生时间顺序写入数据库。这些事件对象一般是按照二进制binary方式如blob存入数据库的。cassandra-plugin的表结构如下:
CREATE KEYSPACE IF NOT EXISTS akka
WITH REPLICATION = { 'class' : 'SimpleStrategy','replication_factor':1 };
CREATE TABLE IF NOT EXISTS akka.messages (
used boolean static,
persistence_id text,
partition_nr bigint,
sequence_nr bigint,
timestamp timeuuid,
timebucket text,
writer_uuid text,
ser_id int,
ser_manifest text,
event_manifest text,
event blob,
meta_ser_id int,
meta_ser_manifest text,
meta blob,
message blob,
tags set<text>,
PRIMARY KEY ((persistence_id, partition_nr), sequence_nr, timestamp, timebucket))
WITH gc_grace_seconds =864000
AND compaction = {
'class' : 'SizeTieredCompactionStrategy',
'enabled' : true,
'tombstone_compaction_interval' : 86400,
'tombstone_threshold' : 0.2,
'unchecked_tombstone_compaction' : false,
'bucket_high' : 1.5,
'bucket_low' : 0.5,
'max_threshold' : 32,
'min_threshold' : 4,
'min_sstable_size' : 50
};
事件对象是存放在event里的,是个blob类型字段。下面是个典型的写动作示范:
val receiveCommand: Receive = {
case Cmd(data) =>
persist(ActionGo) { event =>
updateState(event)
}
}
这些事件描述的存写即写这个ActionGo时不会影响到实际业务数据状态。真正发生作用,改变当前业务数据状态的是在读端read-side。也就是说在另一个线程里有个程序也按时间顺序把这些二进制格式的对象读出来、恢复成某种结构如ActionGo类型、然后按照结构内的操作指令对业务数据进行实际操作处理,这时才会产生对业务数据的影响。做个假设:如果这些事件不会依赖时间顺序的话是不是可以偷偷懒直接用一种pub/sub模式把reader放在订阅subscriber端,如下:
//写端
import DistributedPubSubMediator.Publish
val mediator = DistributedPubSub(context.system).mediator
val receiveCommand: Receive = {
case Cmd(data) =>
persist(DataUpdated) { event =>
updateState(event)
mediator ! Publish(persistentId, event,sendOneMessageToEachGroup = true)
}
}
//读端
val mediator = DistributedPubSub(context.system).mediator
mediator ! Subscribe(persistentId, self)
def receive = {
case DataUpdated: Event ⇒
updateDataTables()
}
这种pub/sub模式的特点是消息收发双方耦合度非常松散,但同时也存在订阅方sub即reader十分难以控制的问题,而且可以肯定的是订阅到达消息无法保证是按发出时间顺序接收的,我们无法控制akka传递消息的过程。因为业务逻辑中一个动作的发生时间顺序往往会对周围业务数据产生不同的影响,所以现在只能考虑事件源event-sourcing这种模式了。es方式的CQRS是通过数据库表作为读写间隔实现写端程序和读端程序的分离。写端只管往数据库写数据操作指令,读端从同一数据库位置读出指令进行实质的数据处理操作,所以读写过程中会产生一定的延迟,读端需要不断从数据库抽取pull事件。而具体pull的时段间隔如何设定也是一个比较棘手的问题。无论如何,akka提供了Persistence-Query作为一种CQRS读端工具。我们先从一个简单的cassandra-persistence-query用例开始:
// obtain read journal by plugin id
val readJournal =
PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
// issue query to journal
val source: Source[EventEnvelope, NotUsed] =
readJournal.eventsByPersistenceId("user-1337", 0, Long.MaxValue)
// materialize stream, consuming events
implicit val mat = ActorMaterializer()
source.runForeach { pack =>
updateDatabase(pack.event)
}
eventsByPersistenceId(...)构建了一个akka-stream的Source[EventEnvelope,_]。这个EventEnvelope类定义如下:
/**
* Event wrapper adding meta data for the events in the result stream of
* [[akka.persistence.query.scaladsl.EventsByTagQuery]] query, or similar queries.
*/
final case class EventEnvelope(
offset: Offset,
persistenceId: String,
sequenceNr: Long,
event: Any)
上面这个event字段就是从数据库读取的事件对象。EventEnvelope是以流元素的形式从数据库中提出。eventsByPersistenceId(...)启动了一个数据流,然后akka-persistence-query会按refresh-interval时间间隔重复运算这个流stream。refresh-interval可以在配置文件中设置,如下面的cassandra-plugin配置:
cassandra-query-journal {
# Implementation class of the Cassandra ReadJournalProvider
class = "akka.persistence.cassandra.query.CassandraReadJournalProvider"
# Absolute path to the write journal plugin configuration section
write-plugin = "cassandra-journal"
# New events are retrieved (polled) with this interval.
refresh-interval = 3s
...
}
以上描述的是一种接近实时的读取模式。一般来讲,为了实现高效、安全的事件存写,我们会尽量简化事件结构,这样就会高概率出现一个业务操作单位需要多个事件来描述,那么如果在完成一项业务操作单元的所有事件存写后才开始读端的动作不就简单多了吗?而且还比较容易控制。虽然这样会造成某种延迟,但如果以业务操作为衡量单位,这种延迟应该是很正常的,可以接受的。现在每当完成一项业务的所有事件存写后在读端一次性成批把事件读出来然后进行实质的数据操作,应当可取。akka-persistence-query提供了下面这个函数:
/**
* Same type of query as `eventsByPersistenceId` but the event stream
* is completed immediately when it reaches the end of the "result set". Events that are
* stored after the query is completed are not included in the event stream.
*/
override def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[EventEnvelope, NotUsed] = ...
我们可以run这个stream把数据读入一个集合里,然后可以在任何一个线程里用这个集合演算业务逻辑(如我们前面提到的写端状态转变维护过程),可以用router/routee模式来实现一种在集群节点中负载均衡式的分配reader-actor作业节点。
下一篇准备对应前面的CQRS Writer Actor 示范里的akka-cluster-pos进行rCQRS-Reader-Actor示范。
- 微信文件微起底
- Go语言TCP Socket编程--1
- Go语言TCP Socket编程--2
- 服务器 数据库设计技巧--1
- CVE-2015-0235:Linux glibc高危漏洞的检测及修复方法
- zabbix监控在lnmp环境下编译安装小记
- 【重磅】百度开源分布式深度学习平台,挑战TensorFlow (教程)
- WordPress评论ajax动态加载,解决静态缓存下评论不更新问题
- WordPress显示访客UA信息:Show UserAgent纯代码轻度汉化版
- WordPress开启颜色评论但不造成XSS漏洞的小方法
- WordPress强迫症技巧:让文章(ID)地址完美连续(障眼法)
- iOS内存管理:从MRC到ARC实践
- MySQL错误修复:Table xx is marked as crashed and last (automatic?) repair failed
- PHP跨站脚本攻击(XSS)漏洞修复方法(一)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 开源 - Java接口API授权认证与规范
- 亿及流量多级缓存 - 客户端缓存
- 亿及流量多级缓存 - 一致性哈希负载均衡与模板渲染
- 关于友情链接或者其他外部链接的建议
- 总结Js方法工具类库,总有你需要的方法
- [docker]安装Mysql
- [Centos7]linux运行django项目报错no module named _ssl
- [Centos7]在非标准端口上运行SSH
- [Centos7]安装及配置bind(DNS服务)
- [Centos7.2]关于crontab报错
- [Centos7.2]关于升级python后防火墙无法启动
- [Centos7]关于限制IP通过ssh登陆
- Apache安装SSL证证书
- 打卡群刷题总结0717——不同路径 II
- [Centos7.2]Django挂载后台运行