某个群里因为消息队列和丢消息吵起来了,有人觉得 kafka 没有 ack,所以客户端缓冲会导致丢消息。
这理论有点牵强,即使真的丢消息也不会是因为客户端的缓冲问题。刚好之前仔细读过官方讲 Replication 的一篇 Kafka Replication 的 wiki,不是很长,但很详细地介绍了 Kafka 在做 producer ack 和 replication 的一些折衷。周末无事也就顺便翻译如下吧~以后查着也方便。
Kafka Replication High-level Design
为 kafka 加上副本是基于更强的持久性和高可用的考虑。我们希望在 server failure 的情况下,依然可以保证所有成功 publish 的消息都不会丢失,并且可以被消费。这里的 server failure 可以是机器故障,程序错误,或者更普遍的软件升级。我们有下面一些高层设计目标:
可配置的持久化保证:例如一个有很多关键数据的应用,可以选择更强的持久化保证,但在这样的保证下也会导致写入延迟提升,另外一个有存储了大量“软状态”(soft-state)数据的应用则可以选择弱一些的一致性保证和更低的写入延迟。
自动化的复制管理:我们想要简化 broker 服务器的分配,并且能够增量地对集群进行扩展。
这里主要需要解决两个方面的问题:
如何均匀地将 partition 的副本分配到 broker server 上?
对于一个给定的 partition,如何将消息扩散(propagate)到所有副本上?
副本定位 Replica placements
初始位置 Initial placement
只有创建 topic 命令,按照存活 broker(手动创建 topic 的命令)来决定 partition 分布; rebalance 命令则会按照集群拓补结构重新规划 partiton 分布, 并按照目标执行 partition 移动。这句原文感觉有点念不通啊。。。
我们首先用管理 api 来创建初始的 broker 集合:
create cluster with brokers broker-0, broker-1, broker2
然后使用另外的管理 api 来创建一个新的 topic:
create topic topicX with 100 partitions
经过这步操作,下面的信息就会被注册到 zookeeper 里了:
-
一份 broker 的列表。
-
一份 topic 的列表,对每个 topic,还会有一个 partition 的列表。
为了更好的负载均衡,我们稍微"过量"地 partition 一个topic。一般来说,partition 数会比 server 数要多(译注:这里应该说的是 partition 总数吧)。对于每一个 topic 来说,会平均地将 partition 分布到各个 broker 上去。首先将 broker 和 partition 分别进行排序。如果有 n 个 broker,我们会把第 i 个 partion 分配到 i % n 个上去。这个 partition 的第一个副本也就会驻留在这个 broker 上,并被当做这个partition 的 "preferred 副本"了。其它的副本也希望能够用类似的方式来分布,以使 broker 挂掉的时候负载可以均匀地分布到所有存活的 broker 去,而不是单独的一个 broker。为了达到上述目标,先设想有 m 个 partition 被分配给了 broker i。第 k 个 partition 的第 j 个副本会被分配给 broker (i + j + k) % n。
紧跟着的图展示了partition p0 ~ p14 的的副本在 broker-0 ~ broker-4 的分布情况。在这个例子中,如果 broker-0 挂了,存活下来的 p0,p1 和 p2 仍然可以提供服务。这些 partition 副本的分布信息会被存储在 zookeeper 里。
动态添加 broker (Incrementally add brokers online)
我们希望能够通过类似下面的管理命令来动态增加 broker 集合中的节点数。
alter cluster add brokers broker-3, broker-4
新的 broker 加入时,集群会从现有的 broker 上自动移动一些 partition 到新的 broker 上。我们的目标是能够在保持均衡负载的同时能够最小化数据的移动量。这里使用了一个独立的 协调进程(coordinator process)来完成 rebalance,算法在下面给出。
下线 broker (Take brokers offline)
通过将在线上的 broker 拉下线来收缩集群规模也是常见的需求,例如下面的管理命令:
alter cluster remove brokers broker-1
下线 broker-1 会启动新的分配任务,将原来在 broker-1 上的 partition 重新进行分配。重分配任务一结束,broker-1 就会被正式拉下线(offline)。这个命令也会删除 broker-1 的状态变更路径(原文是 state change path for broker-1,我感觉说的是 zk 里存的 broker 信息吧)。
数据副本 (Data replication)
kafka 允许 client 端选择同步、异步 replication。异步 replication 时,一旦有一个 replica (一般就是leader) 收到了消息就会发回 ack。同步模式则会以最大努力(best effort)来 ack,这种模式下只会当消息到达多个 replica 才会发回 ack(这里的replica其实是指 partition 的 repica 对应的 broker)。当 client 端向某个 topic 发布消息时,server 端需要将消息扩散到所有 replica。这时需要确定:
1. 如何对消息进行扩散;
2. 在向 client 发送 ack 之前,需要有多少 replica 已经接收到了这条消息。
3. 当一个 replica 挂了的时候会发生啥。
4. 当挂了的 replica 又回来了!的时候会发生啥。
在下面一节中会介绍现在业界已经存在的复制策略。然后再来讲 kafka 里的同步和异步复制策略。
相关工作 Related work
与 replica 保持同步有两种常见的策略:primary-backup 复制和 quorum-based 复制(译注:这就不翻译了,大意就是基于主备的复制和基于选举的复制)。两种情况下节点角色都被设计为一个 leader 和一群 follower节点(译注:曾经的 master 和 slave 现在都政治不正确了)。所有的写请求都是和 leader 节点通信,然后 leader 节点把写扩散到 follower 节点去。
在主备复制模式中,leader 会等待所有的 replica 写都结束后再对 client 发 ack。如果一个 replica挂了,leader就把他从当前的组中剔除掉,并写入到剩余的 replica 中。挂掉的 replica 如果恢复了并且赶上了 leader 最新的进度(译注:应该是加回来复制到了最新的 offset)就可以重新加回到组里。有 f 个节点的主备模式最多可以容忍 f-1 个机器挂掉。
基于选举的复制,leader 则会等待大多数节点写完成。replica 组的大小不会因为某些 replica 挂掉就变化(译注:就是说我始终认为集群有那么多节点,如果需要计算的时候还是拿挂掉之前的节点数来计算)。例如一个有 2f+1 副本的集群,基于选举的复制可以容忍 f 个节点挂掉。如果 leader 自己挂掉了,那么至少需要 f+1 个节点重新选举出一个新的 leader。
这两种方式都有自己的权衡:
1. 基于选举的方式相比基于主备的方式有更低的写入延迟。在基于主备的方式中,任意一个 replica 节点的延迟(例如长时间的GC)都会提升整体的写入延迟,但前者没有这个问题。
2. 给定任意数量的节点,主备的方式能够容忍更多的机器挂掉。
3. 基于主备的方式中把 replication factor 设置为2依然能够很好地工作(就是有俩节点)。但在基于选举复制的方式中,则需要两个节点都活着才是系统可用(2/2+1)。
Kafka 选择了基于主备的复制,由于这种方式能够容忍更多机器挂掉,并且对于两节点也可以工作。当然这种情况下如果一个 replica 挂掉或者变慢了会有打嗝(译注:这比喻。。不能理解,大概就是抖动的意思吧。。)的现象。但这些是比较少见的情况,而且打嗝的时间也可以通过调整 timeout 参数来降低。
同步复制 Synchronous replication
同步复制使用的是经典的主备方式。每一个 partition 有 n 个 replica 且可承受 n-1 个 replica 失效。其中一个 replica 被选举为 leader,其余的则是 follower。leader 会维护一个 in-sync replica (ISR)的集合:是和 leader 的进度完全一致的那些 follower(译注:应该就是offset一致)。对于每一个 partition,在 zk 里存储了当前的 leader 和当前的 ISR。
每一个 replica 在本地 log 里存储 message,并在 log 中维护了一些重要的 offset 位置。Log end offset (LEO) 代表的是 log 的结尾。High watermark (HW) 代表最后一条 committed 的 message。每条日志会周期性地同步到磁盘。已经 flush 的 offset 之前的所有数据可以确保已被持久化到磁盘。之后我们会看到,flush 的 offset 可能在 HW 之前,也可能是之后。
写入 Writes
向某个 partition 发送消息的过程,client 首先从 zk 里找到 partition 的 leader,并将消息发给这个 leader。leader 把消息写入到本地 log。每个 follower 会用简单的 socket 通道不断地从 leader 拉取新消息。这种方式可以确保 follower 接收到消息的顺序与 leader 中写入的顺序完全一致。follower 将每条接收到的消息写入到它自己的日志并给 leader 返回一个 ack。一旦 leader 从所有的 ISR 中接收到了 ack,这条 message 就算被 committed 了。这时候 leader 会挪动 HW,并向 client 端返回 ack。为了性能考虑,这里 follower 会在消息写入到内存的时候就向 leader 发送 ack。所以对于每一条 committed 的 message,我们能够确保这条消息是被存储在多个 replica 的内存中的。但并不保证哪个 replica 已经把消息刷到了磁盘。考虑到相关的失败是很少见的,这种方式能够在响应时间和持久化之间获得较好的平衡。未来可能会考虑增加更强的持久化保证选项。
leader 也会周期性地广播 HW 到所有的 follower。follower 对于这个广播的响应可以在下一次 follower 进行拉取的时候直接返回给 leader(译注:这里有个piggybacked的术语,挺有意思的)。每个 replica 会不时地将其 HW 也刷到磁盘上去。
读取 Reads
出于简洁考虑,读取总是和 leader 通信的。只有 HW 之前的消息才会暴露给读取方。
失败场景 Failure scenarios
follower 挂掉 Follower failure
经过配置好的 timeout 时间之后,leader 会把失效的 follower 从它的 ISR 中移除,之后的写入会在剩下的 ISR 进行。如果失效的 follower 恢复的时候,这个follower 会先把 HW 之前的 log 都 truncate 掉。然后开始追赶 leader 在这个 HW 之后的所有消息。当 follower 完全赶上的时候,leader 会把他加入到 ISR 列表中。
Leader failure
有三种 leader 挂掉的情况需要考虑:
-
leader 在写入本地 log 之前就崩溃了。这种情况下 client 端会超时,并向新的 leader 发送这条消息。
-
leader 在把消息写入到本地 log 之后,在给客户端返回 ack 之前崩溃了。
a. 需要保证原子性:要么所有的 replica 写入消息完成,要么就都没有写入成功。
b. client 端会重试发送消息。这种情况下,系统理论上需要保证消息不会被重复写两次。一个 replica 可能会在已经把消息写入到了它的本地 log,并已经将消息 committed(译注:移动 HW 吧)之后,被选举成了新的 leader。
-
leader 在发送回 ack 响应之后崩溃了。这种情况下新的 leader 被选举出来开始接收请求。
当上述情况发生时,需要按照下列步骤重新选举一个新的 leader。
-
ISR 列表中每一个存活的 replica 在 zookeeper 中注册自身。
-
第一个注册到 zk 的 replica 成为新的 leader。leader 选择它的 LEO 作为新的 HW。
-
每一个 replica 在 zk 里注册对 leader 节点的监听,这样只要节点发生变化就会收到 event。每次 replica 收到新 leader 的消息时:
a. 如果 replica 不是新的 leader(它必须是一个 follower),那么就把 log truncate 到 HW 的位置,并开始追赶 leader 的日志。
-
leader 等待 ISR 中所有的存活 replica 追上了进度,或者配置的时间已经经过/超时。leader 将当前的 ISR 写入到 zookeeper 并将读写功能开放到 client。
(Note, 初始 ISR 是空的时候,任何一个 replica 都有可能成为 leader.)
Asynchronous replication
为了支持异步复制,leader 接收到消息后一写到本地的 log 就可以马上返回 ack。这里唯一需要说明的是在 follower 进行 catchup 的阶段,可能需要 truncate 掉它 HW 之前的数据。这是因为异步复制的情况下,一条消息没有任何保证能够在 broker 失效依然存活。
Open Issues
第二种 leader failure 的情况下,原子性怎么保证?
如何避免同一个 partition 同时存在多个 leader?
如果 broker 分布在多个机架上,如何保证至少有一个 replica 分布在不同的机架上?(译注:想在一个机架出问题的时候其它还能提供服务吧)
Kafka Replication Detailed Design
下面是不同的 proposal 的具体实现。在 0.8 版本中的实现是基于 v3 协议
具体的参考这里:
https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3