irpas技术客

使用librdkafka的C++接口实现Kafka生产者和消费者客户端_windsofchange_c++ librdkafka

大大的周 3803

1. librdkafka简介:

librdkafka 是 Apache Kafka 的 C/C++ 开发包,提供 生产者、消费者 和 管理客户端。

设计理念是可靠以及高性能的消息传输,当前可支持每秒超过100万的消息生产和300万每秒的消息消费。

官方README 文档对librdkafka的介绍: “librdkafka — the Apache Kafka c/C++ client library”

librdkafka/INTRODUCTION.md https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md

librdkafka/examples/ https://github.com/edenhill/librdkafka/tree/master/examples

Usage: 使用时,需要在源程序中包含包含 "rdkafka.h" 头文件

2. librdkafka的C++接口: 2.1 RdKafka::Conf::create():

创建Conf配置实例,用于填充用户指定的各配置项:

//namespace RdKafka; //brief Create configuration object: //RdKafka::Conf ---> 配置接口类,用来设置对生产者、消费者、broker的各配置项的值 static Conf *create(ConfType type); enum ConfType { CONF_GLOBAL, //Global configuration CONF_TOPIC //Topic specific configuration };

使用举例:

RdKafka::Conf *m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); if(m_config == nullptr) { std::cout << "Create Rdkafka Global Conf Failed." << std::endl; } RdKafka::Conf *m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); if(m_config == nullptr) { std::cout << "Create Rdkafka Topic Conf Failed." << std::endl; } 2.2 Conf::ConfResult set():

Conf类中的多个set成员函数,用于对不同的配置项进行赋值:

class Conf { public: virtual Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr); virtual Conf::ConfResult set(const std::string &name, DeliveryReportCb *dr_cb, std::string &errstr); virtual Conf::ConfResult set(const std::string &name, EventCb *event_cb, std::string &errstr); virtual Conf::ConfResult set(const std::string &name, PartitionerCb *partitioner_cb, std::string &errstr); //... //... }; enum ConfResult { CONF_UNKNOWN = -2, //Unknown configuration property CONF_INVALID = -1, //Invalid configuration value CONF_OK = 0 //Configuration property was succesfully set };

使用举例:

RdKafka::Conf::ConfResult result; std::string error_str; RdKafka::Conf *m_config; //设置 "booststrap::servers" 配置项: result = m_config->set("bootstrap.servers", "127.0.0.`:9092", error_str); if(result != RdKafka::Conf::CONF_OK) { std::cout << "Global Conf set 'booststrap.servers' failed: " << error_str << std::endl; } //设置 "event_cb" 配置项: RdKafka::EventCb* m_event_cb = new ProducerEventCb; result = m_config->set("event_cb", m_event_cb, error_str); if(result != RdKafka::Conf::CONF_OK) { std::cout << "Global Conf set 'event_cb' failed: " << error_str << std::endl; } 2.3 RdKafka::Producer::create():

创建Producer生产者客户端:

class Producer : public virtual Handle { public: static Producer *create(Conf *conf, std::string &errstr); };

使用举例:

RdKfka::Producer *m_producer; m_producer = RdKafka::Producer::create(m_config, error_str); if(m_producer == nullptr) { std::cout << "Create Topic failed: " << error_str << std::endl; } 2.4 RdKafka::Topic::create():

创建Topic主题对象:

class Topic { public: static Topic *create(Handle *base, const std::string &tipic_str, const Conf *conf, std::string &errstr); };

使用举例:

RdKafka::Topic *m_topic; m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, error_str); if(m_topic == nullptr) { std::cout << "Create Topic failed: " << error_str << std::endl; } 2.5 RdKafka::Producer::produce(): class Producer : public virtual Handle { public: virtual ErrorCode produce(Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const std::string *key, void *msg_opaque); virtual ErrorCode produce(); }; //Use RdKafka::err2str() to translate an error code a human readable string enum ErrorCode { //Internal errors to rdkafka: ERR_BEGIN = -200, //Begin internal error codes ERR_BAD_MSG = -199, //Received message is incorrect //... ERR_END = -100, //End interval error codes //Kafka broker errors: ERROR_UNKNOWN = -1, //Unknown broker error ERROR_NO_ERROR = 0, //Success //... };

使用举例:

RdKafka::ErrorCode error_code = m_producer->produce(m_topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, payload, len, key, NULL); m_producer->pool(0); //poll()参数为0意味着不阻塞,poll(0)主要是为了触发应用程序提供的回调函数 if(error_code != ERROR_NO_ERROR) { std::cerr << "Produce failed: " << RdKafka::err2str(error_code) << std::endl; if(error_code == ERR_QUEUE_FULL) { m_producer->poll(1000); //如果发送失败的原始是队列正满,则阻塞等待一段时间 } else if(error_code == ERR_MSG_SIZE_TOO_LARGE) { //如果消息过大超过了max_size,则需要对消息做裁剪后重新发送 } else { std::cerr << "ERR_UNKNOWN_PARTITION or ERR_UNKNOWN_TOPIC" << std::endl; } } 2.6 RdKafka::KafkaConsumer::create():

创建Consumer消费者客户端:

class KafkaConsumer : public virtual Handle { public: static KafkaConsumer *create(const Conf *conf, std::string &errstr); };

使用举例:

RdKafka::KafkaConsumer *m_consumer; m_consumer = RdKafka::KafkaConsumer::create(m_config, error_str); if(m_consumer == nullptr) { std::cout << "Create KafkaConsumer failed: " << error_str << std::endl; } 2.7 RdKafka::KafkaConsumer::subscribe():

Consumer消费者订阅Topic主题:

class KafkaConsumer : public virtual Handle { public: virtual ErrorCode subscribe(const std::vector<std::string> &topics); };

使用举例:

std::vector<std::string> topics; topics.push_back(topic_str); RdKafka::ErrorCode error_code = m_consumer->subscribe(topics); if(error_code != ERROR_NO_ERROR) { std::cerr << "Consumer Subscribe Topics Failed: " << RdKafka::err2str(error_code) << std::endl; } 2.8 RdKafka::KafkaConsumer::consume():

Consumer消费者拉取消息进行消费:

class KafkaConsumer : public virtual Handle { public virtual Message *consume(int timeout_ms); };

使用举例:

RdKafka::Message *m_message = m_consumer->consume(5000); //若超过 5000ms 未订阅到消息,则触发 RdKafka::ERR_TIMED_OUT 2. 使用librdkafka的C++接口实现生产者客户端: 2.1 main_producer.cpp #include "producer_kafka.h" using namespace std; int main() { KafkaProducer producer("127.0.0.1:9092", "topic-demo", 0); sleep(5); for(int i = 0; i < 10; i++) { char msg[64] = {0}; sprintf(msg, "%s%4d", "Hello Kafka ", i); //msg = "Hello Kafka 0001"; char key[8] = {0}; sprintf(key, "%d", i); //key = "1"; producer.pushMessage(msg, key); } KafkaProducer::wait_destroyed(50000); return 0; } 2.2 kafka_producer.h #ifndef __KAFKAPRODUCER_H_ #define __KAFKAPRODUCER_H_ #include <string> #include <iostream> #include "rdkafkacpp.h" class KafkaProducer { public: explicit KafkaProducer(const std::string& brokers, const std::string& topic, int partition); //epplicit:禁止隐式转换,例如不能通过string的构造函数转换出一个broker ~KafkaProducer(); void pushMessage(const std::string& msg, const std::string& key); protected: std::string m_brokers; std::string m_topicStr; int m_partition; RdKafka::Conf* m_config; //RdKafka::Conf --- 配置接口类,用来设置对 生产者、消费者、broker的各项配置值 RdKafka::Conf* m_topicConfig; RdKafka::Producer* m_producer; RdKafka::Topic* m_topic; RdKafka::DeliveryReportCb* m_dr_cb; //RdKafka::DeliveryReportCb 用于在调用 RdKafka::Producer::produce() 后返回发送结果,RdKafka::DeliveryReportCb是一个类,需要自行填充其中的回调函数及处理返回结果的方式 RdKafka::EventCb* m_event_cb; //RdKafka::EventCb 用于从librdkafka向应用程序传递errors,statistics,logs 等信息的通用接口 RdKafka::PartitionCb* m_partitioner_cb; //Rdkafka::PartitionCb 用于设定自定义分区器 }; class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb { public: void dr_cb(RdKafka::Message &message) { //重载基类RdKafka::DeliveryReportCb中的虚函数dr_cb() if(message.err() != 0) { //发送出错 std::cerr << "Message delivery failed: " << message.errstr() << std::endl; } else { //发送成功 std::cerr << "Message delivered to topic: " << message.topic_name() << " [" << message.partition() << "] at offset " << message.offset() << std::endl; } } }; /* class DeliveryReportCb { //Kafka将会在produce()之后返回发送结果时调用 DeliveryReportCb::dr_cb(), 并将结果填充到message中 public: virtual void dr_cb(Message &message) = 0; //Message::err() 中保存返回结果 virtual ~DeliveryReportCb() {} }; 返回的发送结果 Message 中包含的信息: class Message { public: virtual std::string errstr() const = 0; //返回错误原因 virtual ErrorCode err() const = 0; //返回错误码,ErrorCode是enum枚举类型 virtual Topic *topic() const = 0; //返回生产的消息所发送到的主题,返回值类型为 RdKafka::Topic 类对象 virtual std::string topic_name() const = 0; //返回生产的消息所发送到的主题名 virtual int32_t partition() const = 0; //返回生产的消息所发送到的分区号 virtual void *payload() const = 0; //消息内容 virtual size_t len() const = 0; //消息长度 virtual const std::string *key() const = 0; //消息的key virtual int64_t offset() const = 0; //消息偏移量 //... }; enum ErrorCode { ERR_BEGIN = -200, //... ERR_UNKNOWN = -1, ERR_NO_ERROR = 0, //SUCCESS ERR_OFFSET_OUT_OF_RANGE = 1, //... }; */ class ProducerEventCb : public RdKafka::EventCb { public: void event_cb(RdKafka::Event &event) { switch(event.type()) { case RdKafka::EVENT::EVENT_ERROR: std::cout << "RdKafka::EVENT::EVENT_ERROR: " << RdKafka::err2str(event.err()) << std::endl; break; case RdKafka::EVENT::EVENT_STATS: std::cout << "RdKafka::EVENT::EVENT_STATS: " << events.str() << std::endl; break; case RdKafka::EVENT::EVENT_LOG: std::cout << "RdKafka::EVENT::EVENT_LOG: " << events.fac() << std::endl; break; case RdKafka::EVENT::EVENT_THROTTLE: std::cout << "RdKafka::EVENT::EVENT_THROTTLE: " << event.broker_name() << std::endl; break; } } } /* EventCb 是用于从librdkafka向应用程序返回errors,statistics, logs等信息的通用接口: class EventCb { public: virtual void event_cb(Event &event) = 0; //提供给应用程序去自定义重载的event_cb函数 virtual ~EventCb(){ } }; class Event { public: enum Type { EVENT_ERROR, EVNET_STATS, EVENT_LOG, EVENT_THROTTLE //Event is a throttle level signaling from the broker, 紧急事件? }; enum Severity { //表示Event事件的严重级别 EVENT_SEVERITY_EMERGE = 0, EVENT_SEVERITY_ALERT = 1, EVENT_SEVERITY_CRITICAL = 2, EVENT_SEVERITY_ERROR = 3, EVENT_SEVERITY_WARNING = 4, EVENT_SEVERITY_NOTICE = 5, EVENT_SEVERITY_INFO = 6, EVENT_SEVERITY_DEBUG = 7, }; virtual Type type() const = 0; //返回Event事件类型 virtual ErrorCode err() const = 0; //错误码 virtual Severity severity() const = 0; //return log serverity level, 返回log的严重级别 virtual std::string fac() const = 0; virtual std::string broker_name() const = 0; virtual std::string broker_id() const = 0; virtual vool fatal() const = 0; //bool值,返回这是否是一个fatal级错误 //... }; */ class HashPartitionerCb : public RdKafka::PartitionerCb { //自定义生产者分区器,作用就是返回一个分区id。 对key计算Hash值,得到待发送的分区号(其实这跟默认的分区器计算方式是一样的) public: int32_t partitioner_cb( const Topic *topic, const std::string *key, int32_t partition_cnt, void *msg_opaque) { char msg[128] = {0}; sprintf(smg, "HashPartitionCb:[%s][%s][%d]", topic->name().c_str(), key->c_str(), partition_cnt); std::cout << msg << std::endl; //前面的操作只是为了在分区器回调中打印出一行打印,分区器真正的操作是在下面generate_hash,生成一个待发送的分区ID return generate_hash(key->c_str(), key->size()) % partition_cnt; } private: static inline unsigned int generate_hash(const char *str, size_t len) { unsigned int hash = 5381; for (size_t i = 0; i < len; i++) { hash = ( (hash << 5) + hash ) + str[i]; } return hash; //返回值必须在 0 到 partition_cnt 之间。如果出错则发回 PARTITION_UA(-1) } }; /* class PartitionerCb { public: virtual int32_t partitioner_cb( const Topic *topic, const std::string *key, int32_t partition_cnt, void *msg_opaque) = 0; //自定义分区器的接口,需要指定发送的主题、key、分区数、opaque virtual ~PartitionerCb() { } }; 注意: 用户实现的派生类重载partitioner_cb() 这个函数后,也是要提供给Kafka去调用的,其中参数 partition_cnt 并非由Producer指定,而是Kafka根据Topic创建时的信息去查询, 且Kafka上的Topic创建也不是由Producer生产者客户端创建的,目前已知的方法只有使用 kafka-topics.sh 脚本这一种方法。 关于“创建主题”的描述: -- 如果broker端配置参数 auto.create.topics.enable 设置为true(默认值为true), 那 么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为 num.partitions(默认值为1)、副本因为为 default.replication.factor(默认值为1)的主题。 -- 除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会按照配置参数 num.partitions 和 default.replication.factor的值创建一个相应主题。 */ #endif /* 总结: 1. KafkaProducer构造函数:初始化一个“生产者客户端”,要指定三个参数:(1)生产者所要连往的Kafka地址,是zookeeper的ip和port;(2)生产者后续生产的消息所要发送的Topic主题;(3)消息所要发往的分区? 2. RdKafka::Conf 是配置接口类,用来设置生产者、消费者、broker的各项配置值: namespace RdKafka { RdKafka::Conf { enum ConfType { CONF_GLOBAL, //Global Configuration , 全局级别的配置 (broker级,所有主题都一样) CONF_TOPIC //Topic specfic configuration , Topic主题级别的专用配置(每个主题都不一样) }; enum ConfResult { //用于保存 RdKafka::Conf::set() 函数的返回值 CONF_UNKNOWN = -2, //Unknown configuration property, set() 传入的配置项不合法(Kafka中没有定义对应的配置项) CONF_INVALID = -1, //Invalid configuration value, set() 传入的配置项的值不正确 CONF_OK = 0 //Configuration property was successfully set, set() 设置成功 }; static Conf *create(ConfType type); //创建一个Conf实例,传入ConfType参数,返回Conf*实例的指针,是static静态函数 virtual Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr); //配置name指定的配置项的值 //eg: RdKafka::Conf::ConfResult errCode = m_config->set("dr_cb", m_dr_cb, errorstr); }; }; Q: 为什么要创建 RdKafka::Conf(GLOBAL/TOPIC)、RdKafka::Producer、RdKafka::Topic ? A: 这就跟socket() 函数一样,创建一个sockfd句柄,是为了通知内核创建相关的实例,sockfd是与内核中的实例相关联的句柄,内核需要维护不同的实例来区分不同的连接。 同理,Kafka服务器也需要区分不同的生产者,需要区分不同的生产者的差异化配置。因此需要应用程序创建一个Producer客户端实例(m_producer句柄),将自己想要的配置填充好(m_conf), 再将m_producer句柄和m_conf配置属性都传递给Kafka服务端,Kafka在内部创建并维护这个生产者实例,当生产者往Kafka的broker上推送消息时,Kafka就可以根据预先配置好的属性对其进行相应的处理。 */ 2.3 producer_kafka.cpp #include "producer_kafka.h" //("192.168.0.105:9092", "topic_demo", 0) KafkaProducer::KafkaProducer(const std::string& brokers, const std::string& topic, int partition) { m_brokers = brokers; m_topicStr = topic; m_partition = partition; //先填充构造生产者客户端的参数配置: m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); if(m_config == nullptr) { std::cout << "Create Rdkafka Global Conf Failed." << std::endl; } m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); if(m_topicConfig == nullptr) { std::cout << "Create Rdkafka Topic Conf Failed." << std::endl; } //下面开始配置各种需要的配置项: RdKafka::Conf::ConfResult result; std::string error_str; result = m_config->set("booststrap.servers", m_brokers, error_str); //设置生产者待发送服务器的地址: "ip:port" 格式 if(result != RdKafka::Conf::CONF_OK) { std::cout << "Global Conf set 'booststrap.servers' failed: " << error_str << std::endl; } result = m_config->set("statistics.interval.ms", "10000", error_str); if(result != RdKafka::Conf::CONF_OK) { std::cout << "Global Conf set ‘statistics.interval.ms’ failed: " << error_str << std::endl; } result = m_config->set("message.max.bytes", "10240000", error_str); //设置发送端发送的最大字节数,如果发送的消息过大则返回失败 if(result != RdKafka::Conf::CONF_OK) { std::cout << "Global Conf set 'message.max.bytes' failed: " << error_str << std::endl; } m_dr_cb = new ProducerDeliveryReportCb; result = m_config->set("dr_cb", m_dr_cb, error_str); //设置每个消息发送后的发送结果回调 if(result != RdKafka::Conf::CONF_OK) { std::cout << "Global Conf set ‘dr_cb’ failed: " << error_str << std::endl; } m_event_cb = new ProducerEventCb; result = m_config->set("event_cb", m_event_cb, error_str); if(result != RdKafka::Conf::CONF_OK) { std::cout << "Global Conf set ‘event_cb’ failed: " << error_str << std::endl; } m_partitioner_cb = new HashPartitionerCb; result = m_topicConfig->set("partitioner_cb", m_partitioner_cb, error_str); //设置自定义分区器 if(result != RdKafka::Conf::CONF_OK) { std::cout << "Topic Conf set ‘partitioner_cb’ failed: " << error_str << std::endl; } //创建Producer生产者客户端: m_producer = RdKafka::Producer::create(m_config, error_str); //RdKafka::Producer::create(const RdKafka::Conf *conf, std::string &errstr); if(m_producer == nullptr) { std::cout << "Create Producer failed: " << error_str << std::endl; } //创建Topic对象,后续produce发送消息时需要使用 m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, error_str); //RdKafka::Topic::create(Hanle *base, const std::string &topic_str, const Conf *conf, std::string &errstr); if(m_topic == nullptr) { std::cout << "Create Topic failed: " << error_str << std::endl; } } /* 另外几个关键的参数: partition.assignment.strategy : range,roundrobin 消费者客户端partition分配策略,当被选举为leader时,分配partition分区给组员消费者的策略 */ void KafkaProducer::pushMessage(const std::string& msg, const std::string& key) { int32_t len = str.length(); void *payload = const_cast<void*>(static_cast<const void*>(str.data())); RdKafka::ErrorCode error_code = m_producer->prodce( m_topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, payload, len, key, NULL); m_producer->poll(0); //poll()参数为0意味着不阻塞;poll(0)主要是为了触发应用程序提供的回调函数 if(error_code != ERR_NO_ERROR) { std::cerr << "Produce failed: " << RdKafka::err2str(error_code) << std::endl; if(error_code == ERR_QUEUE_FULL) { m_producer->poll(1000); //如果发送失败的原因是队列正满,则阻塞等待一段时间 } else if(error_code == ERR_MSG_SIZE_TOO_LARGE) { //如果发送消息过大,超过了max.size,则需要裁减后重新发送 } else { std::cerr << "ERR_UNKNOWN_PARTITION or ERR_UNKNOWN_TOPIC" << std::endl; } } } /* RdKafka::ErrorCode RdKafka::Produce::produce( RdKafka::Topic *topic, int32_t partition, //生产消息发往的主题、分区 int msgflags, //RK_MSG_COPY(拷贝payload)、 RK_MSG_FREE(发送后释放payload内容)、 RK_MSG_BLOCK(在消息队列满时阻塞produce函数) void *payload, size_t len, //消息净荷、长度 const string &key, void *msg_opaque); //key; opaque是可选的应用程序提供给每条消息的opaque指针,opaque指针会在dr_cb回调函数内提供,在Kafka内部维护 返回值: ERR_NO_ERROR : 消息成功入队 ERR_QUEUE_FULL : 队列满 ERR_MSG_SIZE_TOO_LARGE : 消息长度过大 ERR_UNKNOWN_PARTITION : 所指定的分区不存在 ERR_UNKNOWN_TOPIC : 所指定的主题不存在 int RdKafka::Producer::poll (int timeout_ms); 阻塞等待生产消息发送完成, poll另外的重要作用是: (1)轮询处理指定的Kafka句柄的Event(m_producer上的Event事件,在本例中处理事件的方式是进行打印); (2)触发应用程序提供的回调函数调用,例如 ProducerDeliveryReportCb 等回调函数都需要poll()进行触发。 */ KafkaProducer::~KafkaProducer() { while(m_producer->outq_len() > 0) { //当 Handle->outq_len() 客户端的“出队列” 的长度大于0 std::cerr << "Waiting for: " << m_producer->outq_len() << std::endl; m_producer->flush(5000); } delete m_config; delete m_topicConfig; delete m_topic; delete m_producer; delete m_dr_cb; delete m_event_cb; delete m_partitioner_cb; } /* -- flush(): ErrorCode Kafka::Producer::flush(int timeout_ms); flush会优先调用poll(),去触发生产者提前注册的各种回调函数,然后等待生产者上的所有消息全部发送完毕。 -- outq_len: “出队列”长度,是 Handle 类的一个成员, 表示 生产者队列中中待发送到broker上的数据,或 消费者队列中待发送到broker上的ACK。 Handle是Producer和Consumer类的基类,表示“客户端的句柄”: class Producer : public virtual Handle { } class Consumer : public virtual Handle { } class KafkaConsumer : public virtual Handle { } */ 3. 使用librdkafka的C++接口实现实现消费者客户端: 3.1 main_consumer.cpp #include "kafka_consumer.h" int main() { std::string brokers = "127.0.0.1:9092"; std::vector<std::string> topics; //待消费主题的集合 topics.push_back("topic-demo"); std::string group = "consumer-group-demo"; //消费组 KafkaConsumer consumer(brokers, group, topics, RdKafka::Topic::OFFSET_BEGINNING); consumer.pullMessage(); RdKafka::wait_destroyed(5000); return 0; } /* 在生产者/消费者 客户端 连接 broker 时,填充“bootstrap.server” 参数: Q: 为什么只设置了一个broker的地址(port = 9092),如果Kafka集群中有多个broker,且生产者/消费者订阅的Topic横跨多个broker时,生产者是如何知道其他broker的? A: bootstrap.server 参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,具体的内容格式为: host1:port1, host2:port2 可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为 " "。 【注意这里并非需要所有的broker地址,因为生产者会从给定的broker里查找到其他broker信息。】 不过建议至少要设置两个以上的broker地址信息,当其中任意一个宕机时,生产者仍然可以连接到Kafka集群上。 */ 3.2 kafka_consumer.h #ifndef __KAFKACONSUMER_H_ #define __KAFKACONSUMER_H_ #include <string> #include <iostream> #include <vector> #include <stdio.h> #include "rdkafkacpp.h" class KafkaConsumer { public: explicit KafkaConsumer(const std::string& brokers, const std::string& groupID, const std::vector<std::string>& topics, int partition); ~KafkaConsumer(); void pullMessage(); protected: std::string m_brokers; std::string m_groupId; std::vector<std::string> m_topicVector; //一个消费者可以同时订阅多个主题,所有用vector int m_partition; RdKafka::Conf* m_config; //GLOBAL 级别的配置(Consumer客户端级别) RdKafka::Conf* m_topicConfig; //TOPIC 级别的配置 RdKafka::KafkaConsumer* m_consumer; //消费者客户端实例 RdKafka::EventCb* m_event_cb; //Event事件回调 RdKafka::RebalanceCb* m_rebalance_cb; //再均衡 回调 }; class ConsumerEventCb : public RdKafka::EventCb { public: void event_cb(RdKafka::Event& event) { switch (event.type()) { case RdKafka::Event::EVENT_ERROR: if (event.fatal()) //判断是否为FATAL错误 std::cerr << "FATAL "; std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl; break; case RdKafka::Event::EVENT_STATS: std::cerr << "\"STATS\": " << event.str() << std::endl; break; case RdKafka::Event::EVENT_LOG: fprintf(stderr, "LOG-%i-%s: %s\n", event.severity(), event.fac().c_str(), event.str().c_str()); break; case RdKafka::Event::EVENT_THROTTLE: std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " << event.broker_name() << " id " << (int)event.broker_id() << std::endl; break; default: std::cerr << "EVENT " << event.type() << " (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl; break; } } }; class ConsumerRebalanceCb : public RdKafka::RebalanceCb { public: void rebalance_cb( RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*> &partitions) //Kafka服务端通过 err参数传入再均衡的具体事件(发生前、发生后),通过partitions参数传入再均衡 前/后,旧的/新的 分区信息 { std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": " << printTopicPartition(partitions); if(err == RdKafka::ERR__ASSIGN_PARTITIONS) { //ERR__ASSIGN_PARTITIONS: 表示“再均衡发生之后,消费者开始消费之前”,此时消费者客户端可以从broker上重新加载offset consumer->assign(partitions); //再均衡后,重新 assign() 订阅这些分区 partition_count = (int)partitions.size(); } else if(err == RdKafka::ERR__REVOKE_PARTITIONS) { //ERR__REVOKE_PARTITIONS: 表示“消费者停止消费之后,再均衡发生之前”,此时应用程序可以在这里提交 offset consumer->unassign(); //再均衡前,unassign() 退订这些分区 partition_count = 0; //退订所有分区后,清0 } else { std::cerr << "Rebalancing error: " << RdKafka::err2str(err) << std::endl; } } private: static void printTopicPartition(const std::vector<RdKafka::TopicPartition*> &partitions) { //打印出所有的主题、分区信息 for(unsigned int i = 0 ; i < partitions.size() ; i++) { std::cerr << partitions[i]->topic() << "[" << partitions[i]->partition() << "], "; } std::cerr << "\n"; } private: int partition_count; //保存consumer消费者客户端 当前订阅的分区数 }; /* class RdKafka::RebalanceCb { public: virtual void rebalance_cb( RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector<TopicPartition*> &partitions); virtual ~RebalanceCb() { } }; 注意参数 vector<TopicPartition*> &partitions; 中 元素的类型是 TopicPartiton: class TopicPartitionImpl { string topic; int partition_; }; 同时包括 主题 和 分区信息,所以 consumer.assign(); 订阅分区的方式是包括不同主题的不同分区的集合。 */ #endif 3.3 kafka_consumer.cpp #include "kafka_consumer.h" KafkaConsumer::KafkaConsumer(const std::string& brokers, const std::string& groupId, const std::vector<std::string>& topics, int partition) { m_brokers = borker; m_groupId = groupId; m_topicVector = topics; m_partition = partition; //创建Conf实例: m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); if(m_config == nullptr) { std::cout << "Create Rdkafka Global Conf Failed." << std::endl; } m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); if(m_topicConfig == nullptr) { std::cout << "Create Rdkafka Topic Conf Failed." << std::endl; } //设置Conf的各个配置参数: RdKafka::Conf::ConfResult result; std::string error_str; result = m_config->set("bootstrap.servers", m_brokers, error_str); if(result != RdKafka::Conf::CONF_OK) { std::cout << "Conf set 'bootstrap.servers' failed: " << error_str << std::endl; } result = m_config->set("group.id", m_groupId, error_str); //设置消费组名:group.id(string类型) if(result != RdKafka::Conf::CONF_OK) { std::cout << "Conf set 'group.id' failed: " << error_str << std::endl; } result = m_config->set("max.partition.fetch.bytes", "1024000", error_str); //消费消息的最大大小 if(result != RdKafka::Conf::CONF_OK) { std::cout << "Conf set 'max.partition.fetch.bytes' failed: " << error_str << std::endl; } result = m_config->set("enable.partition.eof", "false", error_str); //enable.partition.eof: 当消费者到达分区结尾,发送 RD_KAFKA_RESP_ERR__PARTITION_EOF 事件,默认值 true if(result != RdKafka::Conf::CONF_OK) { std::cout << "Conf set 'enable.partition.eof' failed: " << error_str << std::endl; } m_event_cb = new ConsumerEventCb; result = m_config->set("event_cb", m_event_cb, error_str); if(result != RdKafka::Conf::CONF_OK) { std::cout << "Conf set 'event_cb' failed: " << error_str << std::endl; } m_reblance_cb = new ConsumerRebalanceCb; result = m_config->set("rebalance_cb", m_reblance_cb, error_str); if(result != RdKafka::Conf::CONF_OK) { std::cout << "Conf set 'rebalance_cb' failed: " << error_str << std::endl; } //设置 topic_conf的配置项: result = m_topicConfig->set("auto.offset.reset", "latest", error_str); if(result != RdKafka::Conf::CONF_OK) { std::cout << "Topic Conf set 'auto.offset.reset' failed: " << error_str << std::endl; } result = m_config->set("default_topic_conf", m_topicConfig, error_str); if(result != RdKafka::Conf::CONF_OK) { std::cout << "Conf set 'default_topic_conf' failed: " << error_str << std::endl; } //创建消费者客户端: m_consumer = RdKafka::KafkaConsumer::create(m_config, error_str); if(m_consumer == nullptr) { std::cout << "Create KafkaConsumer failed: " << error_str << std::endl; } std::cout << "Create KafkaConsumer succeed, consumer name : " << m_consumer->name() << std::endl; } void RdKafkaConsumer::pullMessage() { }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #C #librdkafka #namespace #stdint #main