部门的一个新项目使用了kafka 0.9来进行开发,不得不说目前公司的kafka版本真是够混乱,从0.8~0.10三个版本都存在,而我们部门在选择版本的时候则更加不慎重,追新追时髦,0.9刚出就直接开始使用了。还依赖一个不成熟的spring-kafka库来做开发,开发到快要上线遇到了消费hang住的情况,没有去深查源码,选择了去第三方库的方式。。。用官方的原生sdk来规避这个问题。说来运气不错,这个问题竟然被成功地规避掉了(233。

然后就遇到了紧接着的重复消费的问题,同事遇到了问题首先的疑惑是,为什么我使用了别人的demo代码,但在我的场景里才出现了重复消费呢?

嗯,好吧,我们来查一查。

一个kafka 0.9比较典型的消费一般是长这样的

main:  
for(int i=0; i < partitionNum; i++) {  
    threadN.run(); // 这里在java里可能会用executor之类的库进行封装,更好看一些。
}

in thread code:  
void run() {  
    consumer = new consumer;
    consumer.subscribe(topicname);
    while(true) {
        msgs = consumer.poll(timeout);
        for(int i = 0; i < msgs.size(); i++) {
            // process msg i;
        }
        consumer.commitOffsetSync();
    }
}

在0.9的client端,poll表示主动向kafka拉取消息,然后依次处理所有拉取到的消息,再提交offset。

看着还蛮简单的代码,但是在有大量消息涌入的时候却会导致重复消费。

现象是下面这样(打开debug日志,并去除了一些无用日志):

rebalance  assgin partition 3 to xxx

thread-25  partition-3 offset: 100  
thread-25  partition-3 offset: 101

thread-26  partition-3 offset: 80  
thread-26  partition-3 offset: 81  
thread-25  partition-3 ...  

这个现象蛮奇怪,我们的消费线程明明活得好好地,依然在勤勤恳恳地处理消息,但为什么被判断死亡,rebalance了呢?因为是别人的项目,我在看到这个bug之前也没有研究过0.9的api,所以猜测可能是跟commit的时机有关系,正好kafka0.9也支持consumer的offset自动commit,把自动commit改成true,再试一发~

问题依旧。

看来有必要了解一下在0.9中kafka是如何对consumer进行判活。

我们来看看官方博客中对consumer 0.9的新api的说明,

The consumer’s poll loop is designed to handle this problem. All network IO is done in the foreground when you call poll or one of the other blocking APIs. The consumer does not use any background threads. This means that heartbeats are only sent to the coordinator when you call poll. If your application stops polling (whether because the processing code has thrown an exception or a downstream system has crashed), then no heartbeats will be sent, the session timeout will expire, and the group will be rebalanced.  

有几个要点:

1. 消费者通过向coordinator发送heartbeat来保活,在poll的时机是唯一一次发送heartbeat的时机。

2. 消费者没有任何后台线程,所以发送heartbeat的poll是在前台。也就是说poll也是一个同步的调用。  

其实看到这里基本已经猜得八九不离十了。。这是kafka 0.9的客户端api的设计问题。

本身消息队列就是为了解耦。在很多情况下,消息在下游进行处理就是一项比较耗时的工作,先不说我们这里的业务。最普遍的例如调用第三方提供的短信网关、向用户发送电子邮件等等。但是像0.9的demo这样来写我们的消费者的话,while期间拉取了大量的消息进行处理,非常容易直接超时而被coordinator判活判定为已死亡,从而剔除该消费者,而将该消费者正在消费的partition rebalance给其它消费者。从而导致重复消费。

为了解决这个问题怎么办呢,我们可以把session.timeout.ms调大,但这样会导致消费者确实挂掉的时候没有办法及时发现,在session.timeout.ms之后才能知道,从而导致消息延迟,又引入了别的问题。

显然这个问题也不是只有我们才会想到,官方在0.10对这个问题进行了修复,heartbeat可以在后台发送了。 https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread

但是毕竟我们用的是0.9对不对。

是不是这下又要换sdk了?

这件事情教育我们:

1. 无论是连接池还是什么其它类型的client sdk,心跳一定要支持在后台线程发送

2. 还是要好好看文档啊。。。。我说同学们。对使用的工具心中有数,出了问题才好从之推断结论啊。