前景提要
HDC调试需求开发(15万预算),能者速来!>>>
spirng-kafka的多consumer问题困扰了我好久,今天项目再次出现
Attempt to heart beat failed since the group is rebalancing, try to re-join group.
这个问题,导致消息接收不了了,查询了很多资料,也看了很多相关文章,
但是并没有找到什么解决方法,也许是我搜索方式错了?
只好上这来提问题,希望有人能帮助我解决。
先说下情况:
kafka版本为 9.0.1
由于项目属于分布式的微服务架构,有时候需要消息能到达每个同服务实例,因此需要实现kafka的广播模式,基于kafka同一Topic下不同Group都会收到消息,所以在一开始在kafka属性配置中使用了实例IP作为groupID: private Map<String, Object> consumerProps() { return new CustomHashMap() .put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers) .put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) .put(ConsumerConfig.GROUP_ID_CONFIG, "receiveMessage"+ IpUtil.getLocalhostAddress().replace(".", "")) .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100") .put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000") .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); }
使用@KafkaListener注解方式进行消息接收,其中 receiveKafkaListenerContainerFactory 是自定义bean @KafkaListener(containerFactory = "receiveKafkaListenerContainerFactory", topics = KafkaTopicName.DEVICE_MESSAGE_TOPIC) public void onMessageListener(MessageTemplate message){ log.info("===> receive [{}]", message.getMessage()); parseMessageAdapter.adapter(message.getType(), message); }
服务启动后,发现kafka只接收到了第一条消息,并且这接收到这条消息后就爆出开头所说的错误: INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] m.h.b.p.listener.receiver.Receiver.onMessageListener:24 - ===> receive [test1234567890test1234567890] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] o.a.k.c.c.i.AbstractCoordinator.handle:623 - Attempt to heart beat failed since the group is rebalancing, try to re-join group. INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] o.s.k.l.KafkaMessageListenerContainer.onPartitionsRevoked:244 - partitions revoked:[] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] o.s.k.l.KafkaMessageListenerContainer.onPartitionsRevoked:244 - partitions revoked:[] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-kafka-consumer-1] o.a.k.c.c.i.AbstractCoordinator.handle:623 - Attempt to heart beat failed since the group is rebalancing, try to re-join group. INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-kafka-consumer-1] o.s.k.l.KafkaMessageListenerContainer.onPartitionsRevoked:244 - partitions revoked:[] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] o.s.k.l.KafkaMessageListenerContainer.onPartitionsAssigned:249 - partitions assigned:[] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-kafka-consumer-1] o.s.k.l.KafkaMessageListenerContainer.onPartitionsAssigned:249 - partitions assigned:[deviceMessageTopic-0] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-kafka-consumer-1] m.h.b.p.listener.receiver.Receiver.onMessageListener:24 - ===> receive [test1234567890test1234567890] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] o.a.k.c.c.i.AbstractCoordinator.handle:623 - Attempt to heart beat failed since the group is rebalancing, try to re-join group. INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] o.s.k.l.KafkaMessageListenerContainer.onPartitionsRevoked:244 - partitions revoked:[] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] o.s.k.l.KafkaMessageListenerContainer.onPartitionsAssigned:249 - partitions assigned:[deviceMessageTopic-0] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] m.h.b.p.listener.receiver.Receiver.onMessageListener:24 - ===> receive [test1234567890test1234567890]
看错误信息很容易理解是 kafka 发送心跳时发现正在给分区重新分配consumer导致发送失败,尝试重新加入group。
由于知道kafka执行rebalance情况有以下几种:
1:有新的consumer加入
2:旧的consumer挂了
3:coordinator挂了,集群选举出新的coordinator
4:topic的partition新加
5:consumer调用unsubscrible(),取消topic的订阅
再结合单服务时启动这个情况,因此条件条件1和2是有可能符合的,但是仔细检查配置,并没有发现有不妥的地方。
于是又去Kafka中查看了Topic信息,只有一个分区,也没有异常。
如果哪位朋友看出了错误所在,恳请在此留下你的解决方式,如果对于该问题还想了解更多的细节,也可能提出来,我会及时回复,万分感谢!