上一篇(ch.10)

下一篇(ch.12)

第十一章 流处理系统

  批处理有一个重要的假设,即输入是有界的,会有完成的时刻;

  而流处理,则是将事件视为一种数据管理机制:无界,持续增量处理的方式;

事件流

  输入是字节序列时,先将其解析为事件序列,单个事件或者说记录本质上都是 一个的,独立的,不可变的对象,内含一个墙上时钟的时间戳;

消息系统

  消息系统可以方便向消费者通知新事件,unix 管道和 tcp 连接都受限于一个生产者和一个消费者,而消息系统支持多生产者和单消费者或者多消费者;

  AMQP/JMS 风格的消息代理:代理将单个消息发送给消费者,消费者成功处理后会确认每一条消息,消息被确认后在代理中会被删除,适合作为一种异步 rpc,不需要顺序性和读取历史数据;

  基于日志的消息代理:将分区中的所有消息分配给相同的消费者节点,保证分区内的顺序,分区间可并行发送数据,消费者通过偏移量检查进度,代理通过日志追加将数据保存在磁盘中,方便回溯重新消费;

  不同的消息系统有不一样的实现策略,大体通过解决两个问题的策略区分:

  • 生产者速度大于消费者时的处理策略:
  • 一般三种解决方案,直接丢弃消息,将消息缓存在消息系统中,激活背压(即触发流量控制,阻止生产者发送更多数据,如 tcp 的发送窗口限制);

节点崩溃或者离线时,消息丢失的可能:

  一般选择写入磁盘做持久化,或者副本化;

  需要考虑到持久化和副本化都有额外的开销,要和性能进行平衡选择;

  此处就和批处理系统有不同了,批处理系统可靠性更高,当发生错误时,只需要丢弃掉失败的部分并重新计算即可,但流处理系统直接丢失消息时,可能导致结果计算出错;

生产者和消费者间直接传递消息

  udp 组播,应用于金融行业,速度快,容错需要在应用层中完善;(即生产者必须记住塔发送的数据包,以重发数据包)

  无代理的消息库,通过 tcp 或 ip 多播实现 pub/sub;

  使用不可靠的 udp 消息收集网络中的数据,如 statsD 监控机器数据指标等,这里是近似准确的计数;

  通过 http 和 rpc,生产者可通过消费者提供的 api 去传递消息,回调 url 就是经典的实现;

消息代理

  本质上是针对信息流优化的数据库,有持久化数据,即缓存消息的能力,生产者和消费者作为客户端连接到消息队列的服务端,即解耦了生产者消费者,消费者以异步的方式运行,如果存在队列积压,会有明显的延迟;

数据库和消息队列的对比

  数据库会保留数据直到明确要求删除,而消息队列的持久化是有限期的(如 kafka 默认存7天),或者当消息被消费后,没有消费者需要的消息也可能被直接删除;

  如果消息队列的数据挤压过多,导致数据被换入磁盘,那么吞吐量会进一步下降;

  数据库有查询机制,但消息队列是以 topic 接收数据,方法不同但本质上确实是:客户端获得需要的数据;

  消息队列有通知的机制,而数据库需要二次查询;

多个消费者

  负载均衡式(消息交给一个消费者处理,即多个消费者消费同一个 topic) ,扇出式(消息被广播给所有订阅的消费者);

ack和重传机制

  (kafka) 一般消息队列支持自动 ack 和手动 ack,当收到数据后,消费者可自动立刻返回ack或者当处理完该消息后再手动返回 ack;

  如果消费者断开连接,消息队列此时无法确定该消息是否被正确消费了,需要引入原子提交协议(或者 ack = all ) ,否则会有重复消费的风险;

  当使用负载均衡模式时,很明显有顺序性的问题,当消息 a 被消费失败后,消息b已经被开始消费了,消息 a 只能重新被发到新消费者上消费,原本的 ab 变成 ba 了;(kafka 用消费者组解决这个问题,同一分区的主题同一时间内只能有一个消费者消费)

基于日志的消息存储

  即生产者将消息追加到日志的末尾以发送消息,消费者依次读取日志来接收消息,当读到日志末尾后,就等待新消息被追加的通知,如 unix 工具 tail -f;

  以此将所有消息写入磁盘,通过多台机器分区,可实现百万级别的吞吐量,通过复制消息实现容错;

  单个磁盘所能提供的吞吐量有限,所以可以采取对日志分区,不同节点负责不同的分区,每个分区都成为独立的日志,如 kafka 的分区内保证有序;

  每个消息都有一个偏移量,以保证分区内的顺序性,偏移量也类似于序列号,当消费者组的节点消费失败时,同组的另一个节点会从该偏移量开始重新消费;

  通过偏移量,消息队列也方便从自定义的历史时刻开始重新消费;

数据库和流

  对于数据库,将写入对象可看做事件流,而数据库底层也有日志机制实现节点间的复制(如 mysql 的 binlog,redis 的环形缓存区等),通常不会暴露其高级 api 供用户使用;

  故引出 CDC(change data capture),变更数据捕获,即 cdc 负责记录写入数据库的所有更改,此时连续写入的事件(顺序),也可看作事件流,cdc 可以将变更对外提供,

  如 canal 就是基于 mysqlcdc,原理是伪装成 mysql 节点的从节点,通过订阅 binlog 获取变更数据;

  mongoriver 读取 mongodb 的 oplog 等;

  但很明显,基于这种方式的同步一样有复制滞后的问题;

  并且,日志(写入事件)不可能被允许一直增长,所以基本使用日志的应用基本都需要支持快照+日志压缩的机制,在后台通过只保留数据的最新更改实现日志压缩,并且通过快照实现 checkpoint 机制,当节点过于落后时,不是慢慢通过日志追赶,而是直接应用快照作为自己最新的状态,在此基础之上再应用增量的日志;

事件溯源

  与 cdc 类似在于使用复制状态机(保存修改操作)而不是状态转移复制(保存数据);

而最大区别在于:

  cdc 是低级别的数据修改操作进行的;

  而事件溯源是按照一个逻辑事件不可变单位进行的,是应用程序级别发生的事件,而不是低级别的数据更改,仅支持追加,不能更新或者删除事件;

  正常使用时,需要利用事件重建系统的状态;

  因为低级别的数据更改可能导致数据无法恢复,无法追踪命令的生命周期,而通过不可变的事件追加,只需要向前回溯找到对应的事件即可恢复;

  事件溯源需要区分命令和事件,用户的第一个请求到达时,它最初是一个命令,需要验证是否符合完整性条件,满足则生成对应持久且不可变的事件;

  相同的事件可以派生出多个视图

并发控制:单分区的情况下,通过事件串行化解决;

流处理

总结上文,讨论了:

  • 流的来源:用户活动事件,传感器,数据库写操作;
  • 流的传输方式:直接传递,通过消息代理和事件日志;

获取流后,主要应用场景有三种:

  1. 将事件(对应的数据)写入数据库,如 sql,nosql,搜索等存储系统;
  2. 将事件推送给用户,直接通知用户或者将事件传入实时仪表盘以进行可视化;
  3. 可以组合流,即将此时的输出流作为输入流再次进行处理,等于形成一个流水线,最终以1和2的形式获取结果;(派生流)

复杂事件处理

匹配用户感兴趣的事件;

  cep,即处理 查询 和 数据 的关系时,和数据库相反,数据库的数据是持久化的,查询是临时的,而 cep 的查询是持久化的,会通过流数据匹配,也就是数据是临时的;

流分析

  类似于 cep,但是侧重点在于统计指标等,对事件本身不关心,关心整体的特征;

流上搜索

  基于一定的条件 监控新事件的发生,如订阅频道,更新视频后用户会收到通知,如es的过滤器功能;

流的时间功能

通常记录三种时钟:

  1. 根据设备的时钟,记录 事件发生 的时间;
  2. 根据设备的时钟,记录将 事件发送到服务器 的时间;
  3. 根据服务器的时间,记录 服务器收到事件 的时间;

窗口类型:

  • 轮转窗口:即固定窗口,硬按照时间截断窗口;
  • 跳跃窗口:即有滑动步长和间隔的滑动窗口,如 1~5,2~6 以此类推;
  • 滑动窗口:没有间隔或者步长,单纯右移的同时左缩;
  • 会话窗口:即发生特点事件的时间窗口,比如用户登录后的执行窗口,当用户处于非活动状态或者离线时截止;

流的join

流和流join

  需要统计一个逻辑:由多个事件组成,如用户查询网页后对什么结果感兴趣(点击进去);

  此时流处理系统就可以 join 这两个事件,查询和对应点击,通过相同的会话 id 和一定的时间限制 join;

流和表join

  即需要根据流 join 到表里,如使用实时流 更新用户的行为数据,

  需要注意如果通过查询远程数据库获取表,那么可能导致数据库过载, 最好获取数据库副本到流处理系统本地,并通过捕获数据变更 cdc,以获取表的增量信息;

表和表join:   如维护两张表,推文和关注者的缓存,当新的推文(本身是数据行)到达时,需要更新谁的时间线,一般通过推文作者 id 和每个用户的关注者 id 集合 join;

  可通过版本号维护事件流的顺序,但同时 不同标识符的 join 记录需要在表中保留,故日志压缩不能很好工作?(它的意思是需要重新处理流时,所以不好直接压缩日志?但即使没有标识符,不也不能直接压缩?)

流处理的容错

  批处理容错很简单,核心就是通过类似血统图,找到执行失败的任务(失败的任务可直接丢掉),并且向上寻找重新开始的任务即可;

维批处理和校验点

  将流分为多个小块,即多个小批处理,如 spark streaming,而 flink 通过定期生成 checkpoint 并将其写入持久化存储,流处理失败后,从检查点开始,重新计算;

  需要注意,如果输出离开了流处理系统,则流处理系统失去对数据的控制权,可能发生重复生产的问题;

原子提交:即需要确保仅当数据处理成功后,才将其输出流处理系统到下游服务或者mq;

幂等性:一方面可以靠操作本身的幂等性,如赋值操作,另一方面可以通过额外的元数据实现,如 kafka 的偏移量,通过偏移量分辨重复的操作;

流处理系统的故障恢复

  如将状态保存在远程数据库,但每次消息都要查询数据库,开销过大;

  另一种方法是将状态保存在本地,定期进行复制;

  如 flink 定期对操作进行快照,写入 hdfs 等持久性存储中;

  kafka 通过将状态更改发到具有日志压缩功能的专用 kafka 主题以保存状态的副本,类似于 cdc;