分布式系统中的不可靠复制问题

在《Designing Data-Intensive Application》一书中为当今大多数的互联网系统下了个定义,即数据密集型系统。在数据密集型系统中我们要应对的是数据的爆炸性增长问题。

为了应对问题我们采用了一些手段,比如 partition、replication,但这些手段,在一些场景或语言受限的情况下会带来一些麻烦的问题。本文主要讲一讲复制方面的问题。

说到复制 replication,可能大多数人第一印象都是存储系统中的复制,比如 MySQL 中基于 binlog 的复制,redis 中基于操作日志的复制,zk/etcd/kafka 中基于特定算法的复制(实质上也是基于 log 的复制),等等。但复制的概念本身实际上是很广的,比如某个流程中我们的下游系统需要上游提供数据,那上游系统拼装结构请求下游,也算得上是一种 复制

在基于交易的这些互联网公司中,订单的状态流是需要在各个系统之间进行复制传播的。一个比较典型的订单状态流可能是下面这样:

如果业务想要对平台上的订单指标进行管理,那么就需要对各个环节的数据进行统计。统计嘛,说着高大上,实际上就是在整个状态流转的过程中挂 trigger。

不过大多数公司的系统不需要这么实时的数据,我们的数据只要能一天一跑,或者一月一跑就行。这种情况下可以把所有的数据聚合任务交给离线系统来写 SQL。但凡事总有例外,有一些业务是需要实时来做的,特别是一些促销类型的系统,比如各大公司的折扣系统,券系统,有实时数据支持的话,可以做到千人千面。

这时候,这些能准实时触发的 trigger 就是必须的了。但是 trigger 本身需要跟着订单的状态流走,一般一个大一些的电商公司的订单流程都会非常复杂,我这里画的图可能也就只是占整体流程的几十分之一,这时候怎么办呢?有几种常见的解法:

  1. 把状态机和对应的统计工作拆分到不同的系统中去完成。
  2. 统计和业务分离。

两者各有优劣,前者的话,业务团队需要关心自己的流程中所产生的指标。这样数据指标会被分散在各个团队内,好处是每个团队对于自己的数据指标负责,所以可以结合业务代码来一起保证数据的完整和准确性。如果公司想要有统一的数据报表,比如看看什么漏斗模型之类的。那可能就会很头痛,需要对齐各个团队产生的数据定义、字段、内容,如果对不齐,那让谁来推进这种事也是很蛋疼的问题。

所以大多数公司选择的都是后者为主,前者为辅的模式。状态机过于复杂的话我也会拆一部分到另外的系统,在必要的时候接受回调即可,比如电商系统中一般都会有独立的 TMS、WMS 系统,关注商品的运输流程(也会有状态流转),在库房内的中转流程(同样有状态机),在内部流程运转完成之后,再统一通知主流程。从而达到复杂性拆解的目的。

统计方面业务就不想关心了,如果字段方面有需求,你可以让我在我的流程中加一些字段,再把这些流程数据以某种方式传给你。对业务来说比较有用的,我就进 MySQL,你自己去 hive 里捞,如果我不太关心的数据,那我就在流程中通过各种 拼装 然后通过 RPC 或者异步的 MQ 消息传给你。

可以看到,这里有一个 拼装 过程。实际上在业务真的不关心的情况下,这个 拼装 过程往往也是不想帮助下游来做的。这也是大公司里上下游经常撕逼的话题之一。

从合理性的角度来讲,这件事情确实应该由数据消费方来做,因为消费方的指标增加理论上是不受控的,随着指标越来越多,需要的字段也自然就会越来越多。如果每次我们有新的统计需求,都需要去改主流程的代码,那想必是不合适的。所以在上下游之间,偏下游侧,应该有一个 ETL 层。这里的 ETL 是基本实时的,和离线里的概念稍有不同。本质一般是通过 RPC 调用其它系统,从而补全缺失字段。就像下面这张图一样:


标上颜色的是数据主要流向,同时标蓝色表示这两个小模块都由下游负责。不过这是我们理想的状态,实际上现状是 ETL 过程很有可能是在主流程业务系统中完成的,对于该系统来说,订单的状态变动记录,以及当时的快照可能并不是它所需要关心的内容,所以每当变动时,它要做的就是 CALLCALLCALL,然后把消息通过 MQ 发给 kafka。至于 kafka 后面是谁,一般情况下是不关心的。从设计上来讲,这样已经达到了一半的解耦目的。所以现状是这样的:

这次 ETL 被归入到上游系统了,我们在数据没有拼对的时候可以把锅甩给上游(误

当然了,我们脑筋也不要这么死,根据大名鼎鼎的康威定律,项目架构取决于公司的架构,既然存在了,就说明从当时的公司架构来讲这么做是最合理的。如果只讲功能完备性的话,这两种方式都是可以达到目的的。从责任划分的角度上来讲,你可能会觉得把 ETL 交给下游更合理一些,不过凡事也无绝对。如果下游数据相关的系统有多个,那么每个系统都需要干一遍 ETL 的事情,由上游来做这件事情能达成一定程度的“收敛”目的。如果 ETL 全由下游来做,那对于提供补充字段的系统来说,压力可能就不是那么容易承受的了,他们面临的情况可能是当前的 QPS * N。所以凡事无绝对。

上面的两幅图是基于消息和 MQ 的,有时基于延迟或数据的准确性考虑,我们也会要求上游直接通过 RPC,把数据结果告诉我们。整体的流程也差不多,就不画图了。

在计算系统中,有一些指标是需要公开给用户的,特别是那些提供平台让用户赚钱的系统,为了对优质用户和普通用户区别对待,需要对其各项业务指标进行统计,同时为了程序正义,必须要将你的规则,计算结果,关联数据都展示给用户,以提供异常情况下的用户申诉。这种情况下对数据的准确性会有更高的要求。

为了满足要求,除了前面提到的 ETL 模块究竟该归谁来管的问题,我们还应该考虑另一个更麻烦的问题:请求失败。在分布式系统中,失败实在是常见不过,网络抖一抖,磁盘抖一抖,机柜抖一抖,都可能导致请求失败。失败的表现最多是超时,其次是高并发时带来的各种 connection refused,address already in used 等等可能问题。保证接口、消息重复下不会有问题的前提下,上游一般都会有重试机制,2~3 次重试均失败之后,可以打错误日志报警,可以让这些没有请求成功的消息在内存/磁盘上堆积,在合适的时机再次进行重发。经验来看,这种错误发生的并不频繁:一天 x 次,x 是个位数。所以我们哪怕在内存里都堆积起来。。虽然不靠谱,但也算是解决之道。无论哪种方式都可以保证下游绝对能收到这些消息。

理论归理论,现实世界有很多互联网公司的业务都是用胶水语言,例如 PHP 来写的。如果 PHP 做的是你的上游,同时又不是很关心这些状态流日志,那一般的做法是:

  1. 订单状态发生改变时,按照下游的要求拼字段,并以 RPC 或 MQ 的方式将消息发出
  2. 如果消息发送失败,重试 2~3 次,之后打印错误日志。
  3. 因为我不能让这个给下游提供数据的操作阻塞主流程,哪怕是我忘了打日志,那主流程也是要继续的。

受限于 PHP 的运行模型,我们没有办法做到像其它语言在金融、支付类系统中那样,消息先落库,失败的话在后台反复重发。

虽然这种问题在国内自建机房,网络情况比较好的公司内一般一天也就遇到个位数的发送失败。但问题在于,这样个位数的失败,会使整个数据链路可靠性低于 100%。如果发生大规模的网络分区或者 MQ 故障,那么可能这些消息可能就完全丢掉了(因为上游不关心状态流转日志,他可能只关注订单的最终状态,所以一般也不会落入 MySQL)。

如果想要求上游系统对消息的必达性进行保障,那么上游系统实际上可能还要另外写脚本,这显然不太现实。如果非要在不换语言的前提下出一个解决方案,理论上也是可以的。就是我们让 PHP 也支持消息堆积,只要将消息堆积到 MySQL 里即可。下游可以通过别的途径,比如 binlog 重新拿到这部分失败的数据,但是这件事情可能对于一般的业务系统来说,会觉得我不关心的事情还要做这么复杂,那我肯定是吃饱了撑的没事干。此外,对于订单量级在千万级,并且订单状态有很多的业务系统,如果把所有的订单状态变更日志都落盘了,那也是一笔不小的开销,实际操作起来也不得不考虑在内。

所以你看到了,有些事情并不见得从技术角度来讲有解决方案,就一定可以很轻松地解决。如果你是下游系统的维护者,同时遇到了类似本文中提到的问题,等待着你的可能依然是每过段时间就要去查这种没什么意义的 case 的窘境。

当然,这里的问题虽然是客观存在,但不一定在所有场景下都会成为问题。很多公司的数据计算和统计,并不是需要对用户公开。这些数据可能只是为了拿来做一些后台的促销或者推荐决策,这种场景下即使数据偶尔出错,多个几少个几也无所谓,反正用户又看不到,而且一般也不会影响数据大盘。我们常常看到一些做分布式计算的人大吹特吹各种流式计算框架,但其大多应用场景也并不是我们本文说到的必须计算正确的场景。

技术都是有局限性的,使用时应该对其场景有一定的判断力。

Xargin

Xargin

If you don't keep moving, you'll quickly fall behind
Beijing