aws 上 kafka 服务更新导致断连一例

公司内部写 kafka 的 consumer 和 producer 使用的是社区流行的 sarama 这个库,这个库应该 bug 挺多的,之前有云厂商建议用户不要使用该 lib 的文档:为什么不推荐使用Sarama Go客户端收发消息?

不过用都用了,随便换也不好,碰到问题了还是要先定位一下,然后再去看到底是不是 lib 本身的质量问题导致。贸然就说这是 lib 的 bug 会让人鄙视。

现在就碰到这么个场景:

部门内有一个 kafka 的 message 的 redirector 模块,收到上游消息后,启动三个 goroutine 转发给下游的 kafka 集群,等待转发全部完毕之后提交 ack,并继续处理下一条消息,是一个比较简单且朴素的设计。

不过下游的 kafka 集群是 aws 提供的 msk 服务,该服务每月会有例行的安全升级,升级方式是滚动升级,每次更新一个实例,滚动更新直至完毕,持续时间会比较长,可能会有十多分钟。比如下面这样:

但是这个 redirector 在升级期间却无法进行工作,并且在升级完之后,看起来也不会继续订阅上游的消息,这就有点诡异了。之前的同事也一直没认真看这个问题,只是认为在队列重启时,producer 会无法重连而已,是 lib 的 bug。

但是在 Go 里,没什么问题是通过 pprof 和日志定位不出来的,服务无法工作期间,我们可以看看 redirector 模块上的日志:

2022-08-18 09:21:23.742933 I | producer/leader/consumer.msg_record/45 state change to [retrying-1]
2022-08-18 09:21:23.742940 I | producer/leader/consumer.msg_record/45 abandoning broker 1
2022-08-18 09:21:23.742946 I | producer/broker/1 state change to [closed] on consumer.msg_record/45
2022-08-18 09:21:23.843132 I | client/metadata fetching metadata for [consumer.msg_record] from broker x-2.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092
2022-08-18 09:21:23.844292 I | producer/leader/consumer.msg_record/45 selected broker 1
2022-08-18 09:21:23.844314 I | producer/broker/1 state change to [open] on consumer.msg_record/45
2022-08-18 09:21:23.844331 I | producer/broker/1 detected epoch rollover, waiting for new buffer
2022-08-18 09:21:23.844380 I | producer/leader/consumer.msg_record/45 state change to [flushing-1]
2022-08-18 09:21:23.844392 I | producer/leader/consumer.msg_record/45 state change to [normal]
2022-08-18 09:21:23.847178 I | client/metadata fetching metadata for [consumer.msg_record] from broker x-2.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092
2022-08-18 09:21:23.848216 I | producer/broker/1 state change to [retrying] on consumer.msg_record/45 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.
2022-08-18 09:21:23.848267 I | Retrying batch for consumer.msg_record-45 because of kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.
2022-08-18 09:21:23.849777 I | client/metadata fetching metadata for [consumer.msg_record] from broker x-2.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092
2022-08-18 09:21:23.850643 I | producer/broker/1 state change to [retrying] on consumer.msg_record/45 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.
2022-08-18 09:21:23.850664 I | Retrying batch for consumer.msg_record-45 because of kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.
2022-08-18 09:21:23.851937 I | client/metadata fetching metadata for [consumer.msg_record] from broker x-2.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092
2022-08-18 09:21:23.852785 I | producer/broker/1 state change to [retrying] on consumer.msg_record/45 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.
2022-08-18 09:21:23.852804 I | Retrying batch for consumer.msg_record-45 because of kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.
2022-08-18 09:21:23.852808 I | producer/txnmanager rolling over epoch due to publish failure on consumer.msg_record/45
2022-08-18 09:21:37.168773 I | consumer/broker/26 abandoned subscription to msg_result.u.3_10/0 because consuming was taking too long
2022-08-18 09:25:30.042615 I | client/metadata fetching metadata for all topics from broker x-2.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092
2022-08-18 09:35:30.042687 I | client/metadata fetching metadata for all topics from broker x-2.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092
2022-08-18 09:35:30.045357 I | client/broker remove invalid broker #3 with b-3.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092
2022-08-18 09:35:30.045416 I | Closed connection to broker b-3.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092
2022-08-18 09:35:30.045767 I | client/metadata found some partitions to be leaderless
2022-08-18 09:35:30.045775 I | client/metadata retrying after 250ms... (3 attempts remaining)
2022-08-18 09:35:30.050145 I | client/metadata fetching metadata for all topics from broker 10.63.30.178:9092
2022-08-18 09:35:30.072330 I | client/metadata fetching metadata for all topics from broker 172.17.13.27:9092
2022-08-18 09:35:30.295867 I | client/metadata fetching metadata for all topics from broker x-2.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092
2022-08-18 09:35:30.298254 I | client/metadata found some partitions to be leaderless
2022-08-18 09:35:30.298266 I | client/metadata retrying after 250ms... (2 attempts remaining)
2022-08-18 09:35:30.322367 I | client/metadata fetching metadata for all topics from broker 172.17.1.89:9092
2022-08-18 09:35:30.548308 I | client/metadata fetching metadata for all topics from broker x-2.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092
2022-08-18 09:35:30.550821 I | client/metadata found some partitions to be leaderless
2022-08-18 09:35:30.550832 I | client/metadata retrying after 250ms... (1 attempts remaining)
2022-08-18 09:35:30.800924 I | client/metadata fetching metadata for all topics from broker x-2.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092
2022-08-18 09:35:30.803826 I | client/metadata found some partitions to be leaderless
2022-08-18 09:45:29.994128 I | client/metadata fetching metadata for all topics from broker 172.17.8.14:9092
2022-08-18 09:45:30.020108 I | client/metadata fetching metadata for all topics from broker 172.17.12.56:9092
2022-08-18 09:45:30.043423 I | client/metadata fetching metadata for all topics from broker 10.2.19.161:9092
2022-08-18 09:45:30.043440 I | client/metadata fetching metadata for all topics from broker x-1.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092
2022-08-18 09:45:30.043477 I | client/metadata got error from broker 1 while fetching metadata: write tcp 10.63.141.191:60554->10.63.40.88:9092: write: broken pipe
2022-08-18 09:45:30.043496 I | client/metadata got error from broker 1 while fetching metadata: EOF
2022-08-18 09:45:30.043509 I | Closed connection to broker x-1.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092
2022-08-18 09:45:30.043514 I | client/brokers deregistered broker #1 at x-1.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092
2022-08-18 09:45:30.043518 I | client/metadata fetching metadata for all topics from broker x-2.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092
2022-08-18 09:45:30.043527 I | Closed connection to broker 10.2.19.161:9092
2022-08-18 09:45:30.043537 I | client/brokers deregistered broker #1 at 10.2.19.161:9092
2022-08-18 09:45:30.043540 I | client/metadata fetching metadata for all topics from broker 10.2.1.143:9092
2022-08-18 09:45:30.043560 I | client/metadata got error from broker 3 while fetching metadata: write tcp 10.63.141.191:50358->10.2.1.143:9092: write: broken pipe
2022-08-18 09:45:30.043577 I | Closed connection to broker 10.2.1.143:9092
2022-08-18 09:45:30.043580 I | client/brokers deregistered broker #3 at 10.2.1.143:9092
2022-08-18 09:45:30.043584 I | Producer.MaxMessageBytes must be smaller than MaxRequestSize; it will be ignored.
2022-08-18 09:45:30.043587 I | ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022-08-18 09:45:30.043599 I | client/metadata fetching metadata for all topics from broker 10.2.41.146:9092
2022-08-18 09:45:30.044114 I | Connected to broker at 10.2.41.146:9092 (registered as #5)
2022-08-18 09:45:30.045569 I | client/brokers registered new broker #3 at b-3.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092
2022-08-18 09:45:30.045578 I | client/brokers registered new broker #1 at x-1.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092
2022-08-18 09:45:30.046439 I | client/brokers registered new broker #1 at 10.2.19.161:9092
2022-08-18 09:45:30.046447 I | client/brokers registered new broker #3 at 10.2.1.143:9092
2022-08-18 09:45:30.050138 I | client/metadata fetching metadata for all topics from broker 10.63.30.178:9092
2022-08-18 09:45:30.072225 I | client/metadata fetching metadata for all topics from broker 172.17.13.27:9092
2022-08-18 09:45:30.322365 I | client/metadata fetching metadata for all topics from broker 172.17.1.89:9092
2022-08-18 09:55:29.994048 I | client/metadata fetching metadata for all topics from broker 172.17.8.14:9092
2022-08-18 09:55:30.020108 I | client/metadata fetching metadata for all topics from broker 172.17.12.56:9092
2022-08-18 09:55:30.039318 I | Producer.MaxMessageBytes must be smaller than MaxRequestSize; it will be ignored.
2022-08-18 09:55:30.039332 I | ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022-08-18 09:55:30.039368 I | client/metadata fetching metadata for all topics from broker 10.2.42.220:9092
2022-08-18 09:55:30.040621 I | Connected to broker at 10.2.42.220:9092 (registered as #6)
2022-08-18 09:55:30.042832 I | client/metadata fetching metadata for all topics from broker x-2.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092
2022-08-18 10:05:30.039285 I | Producer.MaxMessageBytes must be smaller than MaxRequestSize; it will be ignored.
2022-08-18 10:05:30.039298 I | ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022-08-18 10:05:30.040821 I | Connected to broker at 10.2.1.143:9092 (registered as #3)
2022-08-18 10:05:30.042727 I | client/metadata fetching metadata for all topics from broker x-2.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092
2022-08-18 10:05:30.042763 I | client/metadata got error from broker 2 while fetching metadata: write tcp 10.63.141.191:52174->10.63.42.119:9092: write: broken pipe
2022-08-18 10:05:30.042794 I | Closed connection to broker x-2.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092
2022-08-18 10:05:30.042803 I | client/brokers deregistered broker #2 at x-2.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092
2022-08-18 10:05:30.042807 I | Producer.MaxMessageBytes must be smaller than MaxRequestSize; it will be ignored.
2022-08-18 10:05:30.042810 I | ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022-08-18 10:05:30.042820 I | client/metadata fetching metadata for all topics from broker x-1.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092
2022-08-18 10:05:30.044183 I | Connected to broker at x-1.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092 (registered as #1)
2022-08-18 10:05:30.047206 I | client/brokers registered new broker #2 at x-2.kafka.c2.kafka.ap-southeast-1.amazonaws.com:9092

可以注意到其中关键的一行:

2022-08-18 09:21:37.168773 I | consumer/broker/26 abandoned subscription to msg_result.u.3_10/0 because consuming was taking too long

因为 consume 花的时间太长,所以对上游的订阅也被 client 停掉了,为什么会花时间长?只要简单看看 pprof,就能看到阻塞在 sendmessage 上的 goroutine:

1 @ 0x439480 0x407c47 0x40790b 0x1007e8f 0x10d0390 0x8ef3b7 0x469301
#	0x1007e8e	github.com/Shopify/sarama.(*syncProducer).SendMessages+0x10e				/home/jenkins/agent/workspace/golang/vendor/github.com/Shopify/sarama/sync_producer.go:118
#	0x10d038f	git/micro/gomod/pkg/kfk/kfkclient.(*Environment).SendMessages.func1+0x5f	/home/jenkins/agent/workspace/golang/vendor/git/micro/gomod/pkg/kfk/kfkclient/env.go:72
#	0x8ef3b6	git/svc/go/pkg/std.GoChan.func1+0x26						/home/jenkins/agent/workspace/golang/vendor/git/svc/go/pkg/std/job.go:34

1 @ 0x439480 0x449533 0x8ee644 0x10cf4ec 0x10d16be 0x107bb10 0x1121314 0x8ef44f 0x469301
#	0x8ee643	git/svc/go/pkg/std.RunWithContext+0xe3						/home/jenkins/agent/workspace/golang/vendor/git/svc/go/pkg/std/job.go:53
#	0x10cf4eb	git/micro/gomod/pkg/kfk/kfkclient.(*Environment).SendMessages+0x2cb		/home/jenkins/agent/workspace/golang/vendor/git/micro/gomod/pkg/kfk/kfkclient/env.go:71
#	0x10d16bd	git/micro/gomod/pkg/kfk/kfkmsgq/v2.topic.Enqueue+0x22d				/home/jenkins/agent/workspace/golang/vendor/git/micro/gomod/pkg/kfk/kfkmsgq/v2/topic.go:42
#	0x107bb0f	git/micro/e/internal/pkg/eventsvc.(*Impl).ProduceEvent+0x42f		/home/jenkins/agent/workspace/golang/internal/pkg/eventsvc/_produce.go:29
#	0x1121313	git/micro/e/internal/pkg/bizevent/order.onResult.func3+0x183	/home/jenkins/agent/workspace/golang/internal/pkg/bizevent/order/_result.go:99
#	0x8ef44e	git/svc/go/pkg/std.GoWG.func1+0x4e							/home/jenkins/agent/workspace/golang/vendor/git/svc/go/pkg/std/job.go:45

可以看到就是发送消息的 goroutine 发生了阻塞,从日志来看 produce 的消息发不出去,所以导致 library 认为 consume 花费时间太长从而放弃了对上游消息的订阅。

当 aws 的服务滚动更新完成之后,因为之前在 redirector 模块中已经放弃了对上游的订阅,所以这种状态无法恢复,只能通过服务重启来解决。

问题到这里基本就定位差不多了,但是比较诡异的是,kafka 本身是有多副本以及 min.insync_replica 机制的,比如正常情况下,可以将 kafka 的 insync_replica 设置为 3,但 min.insync_replica 设置为 2,这样只要不是多个副本中所在的 broker 全挂掉,按说 produce 也是不应该发生无限阻塞的。

经过和队列的运维同学沟通,结果发现是这些 produce 阻塞的队列副本本身设置就有问题。。。。

好吧,博君一笑。

Xargin

Xargin

If you don't keep moving, you'll quickly fall behind
Beijing