第十一章 流处理系统
批处理有一个重要的假设,即输入是有界的,会有完成的时刻;
而流处理,则是将事件视为一种数据管理机制:无界,持续增量处理的方式;
事件流
输入是字节序列时,先将其解析为事件序列,单个事件或者说记录本质上都是 一个小
的,独立
的,不可变
的对象,内含一个墙上时钟的时间戳;
消息系统
消息系统可以方便向消费者通知新事件,unix 管道和 tcp 连接都受限于一个生产者和一个消费者,而消息系统支持多生产者和单消费者或者多消费者;
AMQP/JMS 风格的消息代理:代理将单个消息发送给消费者,消费者成功处理后会确认每一条消息,消息被确认后在代理中会被删除,适合作为一种异步 rpc,不需要顺序性和读取历史数据;
基于日志的消息代理:将分区中的所有消息分配给相同的消费者节点,保证分区内的顺序,分区间可并行发送数据,消费者通过偏移量检查进度,代理通过日志追加将数据保存在磁盘中,方便回溯重新消费;
不同的消息系统有不一样的实现策略,大体通过解决两个问题的策略区分:
- 生产者速度大于消费者时的处理策略:
- 一般三种解决方案,直接丢弃消息,将消息缓存在消息系统中,激活背压(即触发流量控制,阻止生产者发送更多数据,如 tcp 的发送窗口限制);
节点崩溃或者离线时,消息丢失的可能:
一般选择写入磁盘做持久化,或者副本化;
需要考虑到持久化和副本化都有额外的开销,要和性能进行平衡选择;
此处就和批处理系统有不同了,批处理系统可靠性更高,当发生错误时,只需要丢弃掉失败的部分并重新计算即可,但流处理系统直接丢失消息时,可能导致结果计算出错;
生产者和消费者间直接传递消息
udp 组播,应用于金融行业,速度快,容错需要在应用层中完善;(即生产者必须记住塔发送的数据包,以重发数据包)
无代理的消息库,通过 tcp 或 ip 多播实现 pub/sub;
使用不可靠的 udp 消息收集网络中的数据,如 statsD 监控机器数据指标等,这里是近似准确的计数;
通过 http 和 rpc,生产者可通过消费者提供的 api 去传递消息,回调 url 就是经典的实现;
消息代理
本质上是针对信息流优化的数据库,有持久化数据,即缓存消息的能力,生产者和消费者作为客户端连接到消息队列的服务端,即解耦了生产者消费者,消费者以异步的方式运行,如果存在队列积压,会有明显的延迟;
数据库和消息队列的对比
数据库会保留数据直到明确要求删除,而消息队列的持久化是有限期的(如 kafka 默认存7天),或者当消息被消费后,没有消费者需要的消息也可能被直接删除;
如果消息队列的数据挤压过多,导致数据被换入磁盘,那么吞吐量会进一步下降;
数据库有查询机制,但消息队列是以 topic 接收数据,方法不同但本质上确实是:客户端获得需要的数据;
消息队列有通知的机制,而数据库需要二次查询;
多个消费者:
负载均衡式(消息交给一个消费者处理,即多个消费者消费同一个 topic) ,扇出式(消息被广播给所有订阅的消费者);
ack和重传机制:
(kafka) 一般消息队列支持自动 ack 和手动 ack,当收到数据后,消费者可自动立刻返回ack或者当处理完该消息后再手动返回 ack;
如果消费者断开连接,消息队列此时无法确定该消息是否被正确消费了,需要引入原子提交协议(或者 ack = all ) ,否则会有重复消费的风险;
当使用负载均衡模式时,很明显有顺序性的问题,当消息 a 被消费失败后,消息b已经被开始消费了,消息 a 只能重新被发到新消费者上消费,原本的 ab 变成 ba 了;(kafka 用消费者组解决这个问题,同一分区的主题同一时间内只能有一个消费者消费)
基于日志的消息存储
即生产者将消息追加到日志的末尾以发送消息,消费者依次读取日志来接收消息,当读到日志末尾后,就等待新消息被追加的通知,如 unix 工具 tail -f;
以此将所有消息写入磁盘,通过多台机器分区,可实现百万级别的吞吐量,通过复制消息实现容错;
单个磁盘所能提供的吞吐量有限,所以可以采取对日志分区,不同节点负责不同的分区,每个分区都成为独立的日志,如 kafka 的分区内保证有序;
每个消息都有一个偏移量,以保证分区内的顺序性,偏移量也类似于序列号,当消费者组的节点消费失败时,同组的另一个节点会从该偏移量开始重新消费;
通过偏移量,消息队列也方便从自定义的历史时刻开始重新消费;
数据库和流
对于数据库,将写入对象可看做事件流,而数据库底层也有日志机制实现节点间的复制(如 mysql 的 binlog,redis 的环形缓存区等),通常不会暴露其高级 api 供用户使用;
故引出 CDC(change data capture),变更数据捕获,即 cdc
负责记录写入数据库的所有更改,此时连续写入的事件(顺序),也可看作事件流,cdc
可以将变更对外提供,
如 canal
就是基于 mysql
的 cdc
,原理是伪装成 mysql
节点的从节点,通过订阅 binlog 获取变更数据;
mongoriver 读取 mongodb 的 oplog 等;
但很明显,基于这种方式的同步一样有复制滞后的问题;
并且,日志(写入事件)不可能被允许一直增长,所以基本使用日志的应用基本都需要支持快照+日志压缩的机制,在后台通过只保留数据的最新更改实现日志压缩,并且通过快照实现 checkpoint 机制,当节点过于落后时,不是慢慢通过日志追赶,而是直接应用快照作为自己最新的状态,在此基础之上再应用增量的日志;
事件溯源
与 cdc 类似在于使用复制状态机(保存修改操作)而不是状态转移复制(保存数据);
而最大区别在于:
cdc 是低级别的数据修改操作进行的;
而事件溯源是按照一个逻辑事件为不可变单位进行的,是应用程序级别发生的事件,而不是低级别的数据更改,仅支持追加,不能更新或者删除事件;
正常使用时,需要利用事件重建系统的状态;
因为低级别的数据更改可能导致数据无法恢复,无法追踪命令的生命周期,而通过不可变的事件追加,只需要向前回溯找到对应的事件即可恢复;
事件溯源需要区分命令和事件,用户的第一个请求到达时,它最初是一个命令,需要验证是否符合完整性条件,满足则生成对应持久且不可变的事件;
相同的事件可以派生出多个视图
并发控制:单分区的情况下,通过事件串行化解决;
流处理
总结上文,讨论了:
- 流的来源:用户活动事件,传感器,数据库写操作;
- 流的传输方式:直接传递,通过消息代理和事件日志;
获取流后,主要应用场景有三种:
- 将事件(对应的数据)写入数据库,如 sql,nosql,搜索等存储系统;
- 将事件推送给用户,直接通知用户或者将事件传入实时仪表盘以进行可视化;
- 可以组合流,即将此时的输出流作为输入流再次进行处理,等于形成一个流水线,最终以1和2的形式获取结果;(派生流)
复杂事件处理
匹配用户感兴趣的事件;
cep,即处理 查询 和 数据 的关系时,和数据库相反,数据库的数据是持久化的,查询是临时的,而 cep 的查询是持久化的,会通过流数据匹配,也就是数据是临时的;
流分析
类似于 cep,但是侧重点在于统计指标等,对事件本身不关心,关心整体的特征;
流上搜索
基于一定的条件 监控新事件的发生,如订阅频道,更新视频后用户会收到通知,如es的过滤器功能;
流的时间功能
通常记录三种时钟:
- 根据设备的时钟,记录 事件发生 的时间;
- 根据设备的时钟,记录将 事件发送到服务器 的时间;
- 根据服务器的时间,记录 服务器收到事件 的时间;
窗口类型:
- 轮转窗口:即固定窗口,硬按照时间截断窗口;
- 跳跃窗口:即有滑动步长和间隔的滑动窗口,如 1~5,2~6 以此类推;
- 滑动窗口:没有间隔或者步长,单纯右移的同时左缩;
- 会话窗口:即发生特点事件的时间窗口,比如用户登录后的执行窗口,当用户处于非活动状态或者离线时截止;
流的join
流和流join:
需要统计一个逻辑:由多个事件组成,如用户查询网页后对什么结果感兴趣(点击进去);
此时流处理系统就可以 join 这两个事件,查询和对应点击,通过相同的会话 id 和一定的时间限制 join;
流和表join:
即需要根据流 join 到表里,如使用实时流 更新用户的行为数据,
需要注意如果通过查询远程数据库获取表,那么可能导致数据库过载, 最好获取数据库副本到流处理系统本地,并通过捕获数据变更 cdc,以获取表的增量信息;
表和表join: 如维护两张表,推文和关注者的缓存,当新的推文(本身是数据行)到达时,需要更新谁的时间线,一般通过推文作者 id 和每个用户的关注者 id 集合 join;
可通过版本号维护事件流的顺序,但同时 不同标识符的 join 记录需要在表中保留,故日志压缩不能很好工作?(它的意思是需要重新处理流时,所以不好直接压缩日志?但即使没有标识符,不也不能直接压缩?)
流处理的容错
批处理容错很简单,核心就是通过类似血统图,找到执行失败的任务(失败的任务可直接丢掉),并且向上寻找重新开始的任务即可;
维批处理和校验点
将流分为多个小块,即多个小批处理,如 spark streaming,而 flink 通过定期生成 checkpoint 并将其写入持久化存储,流处理失败后,从检查点开始,重新计算;
需要注意,如果输出离开了流处理系统,则流处理系统失去对数据的控制权,可能发生重复生产的问题;
原子提交:即需要确保仅当数据处理成功后,才将其输出流处理系统到下游服务或者mq;
幂等性:一方面可以靠操作本身的幂等性,如赋值操作,另一方面可以通过额外的元数据实现,如 kafka 的偏移量,通过偏移量分辨重复的操作;
流处理系统的故障恢复
如将状态保存在远程数据库,但每次消息都要查询数据库,开销过大;
另一种方法是将状态保存在本地,定期进行复制;
如 flink 定期对操作进行快照,写入 hdfs 等持久性存储中;
kafka 通过将状态更改发到具有日志压缩功能的专用 kafka 主题以保存状态的副本,类似于 cdc;