irpas技术客

Kafka——创建无消费者组的消费者消费订阅主题_柯腾__kafka不指定消费组

irpas 5050

前景

最近在编写了一个websocket+kafka的推送订阅组件,类似之前的一篇wesocket+redis的推送订阅组件。 期间遇到了两个问题。

不停地组平衡

由于每次订阅不同的主题都会去生成一个消费者,且同一个服务里博主配置的消费者组是同一个。相当于: 每次订阅不同的主题 = 消费者组会加入一个新的消费者 => 消费者组发生组平衡 => 组内所有消费者都会暂停手头的工作去等待消费者组进行组平衡 这样显然是不合适的。 后来就决定在每次创建消费者的时候都给他指定一个新的消费者组。这样就不会影响到其他消费者消费数据了。

订阅时间过长

在实现完功能后,发现一个问题就是从websocket接收到订阅信息到websocket推送接收到的kafka数据这期间花了3s多,这样会导致使用体验很差。排查了一个,这3s用在了这里: 也就是Request joining group due to: need to re-join with the given member-id。当我创建一个带消费者组的消费者后,它在加入消费者组之前需要消费者组给它分配一个member-id。

提问

为什么消费者组给消费者分配member-id需要3s的时间? 影响这个时间的因素又有哪些?

在我查阅了一番资料以后,并没有得出什么结论。希望知道答案的大佬能给小弟解惑。 既然这条路对于我来说走不通,那就换条路走吧~ 在《Kafka权威指南》第53页第二行中写道“创建不属于任何一个群组的消费者也是可以的”既然加入消费者组需要那么长的时间,那我可不可以不加入消费者组?当个自由人不香嘛~ 当然我们得明确自己的需求,是不是可以不要消费者组,这里就需要了解一下消费者组的作用了,博主所知道的作用有:

  1.确保组内消费者不会消费到同一个主题的同一个分组(保证消费的数据不重复)。   2. 同一个消费者组的成员可以一起消费一个主题(不同分区)里面的数据,比单个消费者消费整个主题的数据压力来的要轻很多。   3.当组内的成员“死亡”的时候,它所负责的分区会交给组内的其他成员。

对比了一下这三点,发现博主好像并不是很需要消费者组。 随即,博主就将创建消费者组的那一块代码中,删去了关于消费者组的一些配置。然后就是启动项目,开始测试。测试中发现会报一个错误:To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration,那是因为博主用的KafkaConsumer的subscribe方法订阅的数据,该方法会调用maybeThrowInvalidGroupIdException方法去检测消费者的消费者组存不存在,不存在则会报上面说的那个错误。我们可以换一种方式订阅:KafkaConsumer的assign方法。

核心代码

websocket相关部分的代码和之前《websocket+redis动态订阅和动态取消订阅》中的几乎是一摸一样的,这里就不贴出来了。

KafkaPubsub @Component @Slf4j public class KafkaPubSub { private KafkaClient kafkaClient = GetBeanUtil.getBean(KafkaClient.class); private boolean pollFlag = true; private KafkaConsumer<String, String> consumer = null; private final ObjectMapper o = new ObjectMapper(); private String message = "{\n" + " \"id\": \"%s\",\n" + " \"msg\": %s,\n" + " \"type\": 0\n" + "}"; public void subAndPoll(String topic){ consumer = kafkaClient.noGroupSubscribe(topic); while (pollFlag) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { WebSocketServer.publish(String.format(message, topic, record.value()), topic); } } consumer.unsubscribe(); } public void unsubscribe(){ this.pollFlag = false; } } KafkaClient @Slf4j public class KafkaClient { private Properties noGroupproperties; public KafkaClient(MessageBrokerProperties messageBrokerProperties) { noGroupproperties = new Properties(); noGroupproperties.put("bootstrap.servers", messageBrokerProperties.getBootstrapServers()); noGroupproperties.put("enable.auto.commit", false); noGroupproperties.put("key.deserializer", messageBrokerProperties.getKeyDeserializer()); noGroupproperties.put("value.deserializer", messageBrokerProperties.getValueDeserializer()); } private KafkaConsumer<String, String> getNoGroupComsumer(){ return new KafkaConsumer(noGroupproperties); } public KafkaConsumer<String, String> noGroupSubscribe(String topic){ KafkaConsumer<String, String> consumer = getNoGroupComsumer(); TopicPartition topicPartition = new TopicPartition(topic, 0); consumer.assign(Arrays.asList(topicPartition)); return consumer; } }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #kafka不指定消费组 #joining #group #due #To #need