irpas技术客

window下kafka安装及其使用_MetaSmiles_kafka安装完整步骤windows

大大的周 6521

目录

1.kafka安装

1.1安装JDK1.8

1.2安装Zookeeper3.7

1.3 Kafka2.13安装

2.命令行测试

3.客户端程序开发

3.1?openssl编译

3.3?生产者

3.4 消费者


1.kafka安装

本地装了一套kafka的环境:

序号

名称

备注

下载链接

1

JDK1.8

Java开发环境

https://·/apache/zookeeper/

3

Kafka2.13

Kafka开发环境

http://kafka.apache.org/downloads.html

下载如下:

?

1.1安装JDK1.8

步骤1:双击安装包,直到安装完成,

步骤2:需要添加以下的环境变量(右键点击“我的电脑” -> "高级系统设置" -> "环境变量"?),如下:

JAVA_HOME:?C:\Program Files\Java\jdk1.8.0_171 (jdk的安装路径)

Path: 在现有的值后面添加"; %JAVA_HOME%\bin"

步骤3:1.3 打开cmd运行 "java -version" 查看当前系统Java的版本:

1.2安装Zookeeper3.7

步骤1:解压安装包apache-zookeeper-3.7.0-bin.tar.gz,

步骤2:打开zookeeper-3.4.13\conf,把zoo_sample.cfg重命名成zoo.cfg,从文本编辑器里打开zoo.cfg,把dataDir的值改成“./zookeeper-3.4.13/data”

步骤3:添加如下系统变量:

ZOOKEEPER_HOME: D:\kafkaPrestduy\apache-zookeeper-3.7.0-bin (zookeeper目录)

Path: 在现有的值后面添加 ";%ZOOKEEPER_HOME%\bin;"

1.3 Kafka2.13安装

步骤1:kafka_2.13-3.1.0.tgz解压

步骤2:打开kafka_2.11-2.0.0\config,从文本编辑器里打开 server.properties。把 log.dirs的值改成 “./logs”

2.命令行测试

步骤1:启动zookeeper,进入目录:D:\kafkaPrestduy\apache-zookeeper-3.7.0-bin\bin

执行如下:./zkserver

?

步骤2:启动kafka,进入目录:D:\kafkaPrestduy\kafka_2.13-3.1.0\bin\windows

启动服务:?./kafka-server-start.bat ..\..\config\server.properties

?

创建话题:

.\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

?

创建生产者:

kafka-console-producer.bat --broker-list localhost:9092 --topic test

创建消费者:

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

查看话题:

?./kafka-topics.bat --list ??--bootstrap-server localhost:9092

3.客户端程序开发

librdkafka依赖openssl,zlib与zstd库,所以先编译依赖,然后编译librdkafka

各个库使用版本:

库名

下载地址

openssl-1.0.2a

/index.html

librdkafka-1.4.x

https://github.com/edenhill/librdkafka

zlib

NuGet获取(解决方案右键->获取NuGet)

zstd

NuGet获取(解决方案右键->获取NuGet)

备注:zlib与zstd:D:\kafkaPrestduy\librdkafka-1.4.x\librdkafka-1.4.x\win32\packages

3.1?openssl编译

OpenSSL编译在vs2019,首先下载/index.html下,找并下载:openssl-1.0.2a

步骤1:下载编译环境Perl和NASM,双击安装即可

Download & Install Perl - ActiveState

https://·/edenhill/librdkafka,下载并解压

步骤2:\librdkafka-1.4.x\win32\librdkafka.sln下使用vs2019打开

步骤3:编译librdkafka项目,点击编译,报错如下:

?Error This project references NuGet package(s) that are missing on this computer. Use NuGet Package Restore to download them. For more information, see 必应

解决方法:

找到 项目文件librdkafka.csproj,打开后,移除下面:

<Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild"> ??<PropertyGroup> ????<ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. ?For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText> ??</PropertyGroup> ??<Error Condition="!Exists('$(SolutionDir)\.nuget\NuGet.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\.nuget\NuGet.targets'))" /> </Target>

2.zlib和zstd连接失败

在\librdkafka-1.4.x\win32\packages目录下,有zlib和libzstd编译好的库,只添加包含头文件和附加库目录,以及链接lib文件。

Zlib头文件:

\librdkafka-1.4.x\win32\packages\zlib.v140.windesktop.msvcstl.dyn.rt-dyn.1.2.8.8\build\native\include

zlib库文件:

librdkafka-1.4.x\win32\packages\zlib.v140.windesktop.msvcstl.dyn.rt-dyn.1.2.8.8\lib\native\v140\windesktop\msvcstl\dyn\rt-dyn\x64\Release

libzstd头文件:

\librdkafka-1.4.x\win32\packages\confluent.libzstd.redist.1.3.8-g9f9630f4-test1\build\native\include

Libzstd库文件:

\librdkafka-1.4.x\win32\packages\confluent.libzstd.redist.1.3.8-g9f9630f4-test1\build\native\lib\win\x64

3.确实openssl缺失

找到opensssl编译生成目录,添加包含头文件和附加库目录,以及链接lib文件

步骤4:编译libkafka,文件生产在\librdkafka-1.4.x\win32\outdir\v142\x64\Release

?

3.3?生产者

官方精简:

#define _CRT_SECURE_NO_WARNINGS #include <stdio.h> #include <signal.h> #include <string.h> #include "rdkafka.h" static volatile sig_atomic_t run = 1; static void stop(int sig) { run = 0; fclose(stdin); } static void dr_msg_cb(rd_kafka_t* rk,const rd_kafka_message_t* rkmessage, void* opaque) { if (rkmessage->err) { fprintf(stderr, "%% Message delivery failed: %s\n", rd_kafka_err2str(rkmessage->err)); } else { fprintf(stderr, "Message delivered % d bytes partition % d\n", rkmessage->len, rkmessage->partition); } } int main(int argc, char** argv) { rd_kafka_t* rk; rd_kafka_conf_t* conf; char errstr[512]; char buf[512]; const char* brokers; const char* topic; brokers = "localhost:9092"; topic = "test_2"; conf = rd_kafka_conf_new(); if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%s\n", errstr); return 1; } rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!rk) { fprintf(stderr,"Failed to create new producer: %s\n", errstr); return 1; } signal(SIGINT, stop); fprintf(stderr,"Type some text and hit enter to produce message\n"); while (run && fgets(buf, sizeof(buf), stdin)) { size_t len = strlen(buf); rd_kafka_resp_err_t err; if (buf[len - 1] == '\n') { buf[--len] = '\0'; } if (len == 0) { rd_kafka_poll(rk, 0); continue; } retry: err = rd_kafka_producev(rk,RD_KAFKA_V_TOPIC(topic),RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),RD_KAFKA_V_VALUE(buf, len),RD_KAFKA_V_OPAQUE(NULL),RD_KAFKA_V_END); if(err) { fprintf(stderr,"Failed to produce to topic %s: %s\n",topic, rd_kafka_err2str(err)); if(err == RD_KAFKA_RESP_ERR__QUEUE_FULL) { rd_kafka_poll(rk, 1000); goto retry; } } rd_kafka_poll(rk, 0); } rd_kafka_flush(rk, 10 * 1000 ); if (rd_kafka_outq_len(rk) > 0) { fprintf(stderr, "%d message(s) were not delivered\n", rd_kafka_outq_len(rk)); } rd_kafka_destroy(rk); return 0; } 3.4 消费者 #define _CRT_SECURE_NO_WARNINGS #include <stdio.h> #include <signal.h> #include <string.h> #include <ctype.h> #include "rdkafka.h" static volatile sig_atomic_t run = 1; static void stop(int sig) { run = 0; } static int is_printable(const char* buf, size_t size) { size_t i; for (i = 0; i < size; i++) if (!isprint((int)buf[i])) return 0; return 1; } int main(int argc, char** argv) { rd_kafka_t* rk=NULL; rd_kafka_conf_t* conf = NULL; rd_kafka_resp_err_t err; char errstr[512]; const char* brokers = NULL; const char* groupid = NULL; char** topics = NULL; int topic_cnt; rd_kafka_topic_partition_list_t* subscription = NULL; int i; brokers = "localhost:9092"; groupid = "101"; char *name = (char*)malloc(10 * sizeof(char*)); strcpy(name,"test_2"); topics = &name; topic_cnt = 1; conf = rd_kafka_conf_new(); if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%s\n", errstr); rd_kafka_conf_destroy(conf); return 1; } if (rd_kafka_conf_set(conf, "group.id", groupid,errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%s\n", errstr); rd_kafka_conf_destroy(conf); return 1; } if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest",errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%s\n", errstr); rd_kafka_conf_destroy(conf); return 1; } rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); if (!rk) { fprintf(stderr, "Failed to create new consumer: % s\n", errstr); return 1; } conf = NULL; rd_kafka_poll_set_consumer(rk); subscription = rd_kafka_topic_partition_list_new(topic_cnt); for (i = 0; i < topic_cnt; i++) rd_kafka_topic_partition_list_add(subscription,topics[i],RD_KAFKA_PARTITION_UA); err = rd_kafka_subscribe(rk, subscription); if (err) { fprintf(stderr,"Failed to subscribe to %d topics: %s\n",subscription->cnt, rd_kafka_err2str(err));rd_kafka_topic_partition_list_destroy(subscription); rd_kafka_destroy(rk); return 1; } fprintf(stderr,"Subscribed to %d topic(s), " "waiting for rebalance and messages...\n",subscription->cnt); rd_kafka_topic_partition_list_destroy(subscription); signal(SIGINT, stop); while (run) { rd_kafka_message_t* rkm; rkm = rd_kafka_consumer_poll(rk, 100); if (!rkm) continue; if (rkm->err) { fprintf(stderr,"%% Consumer error: %s\n",rd_kafka_message_errstr(rkm));rd_kafka_message_destroy(rkm); continue; } printf("Message on %s %d at offset %d:\n",rd_kafka_topic_name(rkm->rkt), rkm->partition,rkm->offset); if (rkm->key && is_printable((const char*)rkm->key, rkm->key_len)) { printf(" Key: %.*s\n", (int)rkm->key_len, (const char*)rkm->key); } else if (rkm->key) { printf(" Key: (%d bytes)\n", (int)rkm->key_len); } if (rkm->payload && is_printable((const char*)rkm->payload, rkm->len)) { printf(" Value: %.*s\n", (int)rkm->len, (const char*)rkm->payload); } else if (rkm->key) { printf(" Value: (%d bytes)\n", (int)rkm->len); } rd_kafka_message_destroy(rkm); } fprintf(stderr, "%% Closing consumer\n"); rd_kafka_consumer_close(rk); /* Destroy the consumer */ rd_kafka_destroy(rk); return 0; }

? ? ??运行结果:

参考:

librdkafka编译及简单使用过程简介_一缕阳光宣泄、整个世界的博客-CSDN博客_rdkafka编译

Windows 下编译 OpenSSL_青春不老,奋斗不止!-CSDN博客_openssl编译

用VS2019编译librdkafka库_eamon100的博客-CSDN博客

编译OpenSSL 动态库/静态库以及运行时库的选择_YuHengZuo的博客-CSDN博客

?


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。