irpas技术客

Java客户端访问Kafka_雷皓_java 访问kafka

网络投稿 1651

public class MsgProducer { 2 public static void main ( String [] args ) throws InterruptedException , ExecutionException { 3 Properties props = new Properties (); 4 props . put ( ProducerConfig . BOOTSTRAP_SERVERS_CONFIG , "192.168.0.60:9092,192.168.0.60:9093,192.168.0.60:9094" ); 5 /* 6 发出消息持久化机制参数 7 ( 1 ) acks = 0 : 表示 producer 不需要等待任何 broker 确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。 8 ( 2 ) acks = 1 : 至少要等待 leader 已经成功将数据写入本地 log ,但是不需要等待所有 follower 是否成功写入。就可以继续发送下一条消息。 这种情况下,如果 follower 没有成功备份数据,而此时 leader 9 又挂掉,则消息会丢失。 10 ( 3 ) acks =‐ 1 或 all : 这意味着 leader 需要等待所有备份 ( min . insync . replicas 配置的备份个数 ) 都成功写入日志,这种策略会保证只要有 一个备份存活就不会丢失数据。 11 这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。 12 */ 13 props . put ( ProducerConfig . ACKS_CONFIG , "1" ); 14 // 发送失败会重试,默认重试间隔 100ms ,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在接收者那 边做好消息接收的幂等性处理 15 props . put ( ProducerConfig . RETRIES_CONFIG , 3 ); 16 // 重试间隔设置 17 props . put ( ProducerConfig . RETRY_BACKOFF_MS_CONFIG , 300 ); 18 // 设置发送消息的本地缓冲区,如果设置了该缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是 33554432 ,即 32MB 19 props . put ( ProducerConfig . BUFFER_MEMORY_CONFIG , 33554432 ); 20 //kafka 本地线程会从缓冲区取数据,批量发送到 broker , 21 // 设置批量发送消息的大小,默认值是 16384 ,即 16kb ,就是说一个 batch 满了 16kb 就发送出去 22 props . put ( ProducerConfig . BATCH_SIZE_CONFIG , 16384 ); 23 // 默认值是 0 ,意思就是消息必须立即被发送,但这样会影响性能 24 // 一般设置 100 毫秒左右,就是说这个消息发送完后会进入本地的一个 batch ,如果 100 毫秒内,这个 batch 满了 16kb 就会随 batch 一起被发送出 去 25 // 如果 100 毫秒内, batch 没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长 26 props . put ( ProducerConfig . LINGER_MS_CONFIG , 100 ); 27 // 把发送的 key 从字符串序列化为字节数组 28 props . put ( ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG , StringSerializer . class . getName ()); 29 // 把发送消息 value 从字符串序列化为字节数组 30 props . put ( ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer . class . getName ()); 31 32 Producer < String , String > producer = new KafkaProducer <> ( props ); 33 34 int msgNum = 5 ; 35 CountDownLatch countDownLatch = new CountDownLatch ( msgNum ); 36 for ( int i = 1 ; i <= msgNum ; i ++ ) { 37 Order order = new Order ( i , 100 + i , 1 , 1000.00 ); 38 // 指定发送分区 39 ProducerRecord < String , String > producerRecord = new ProducerRecord < String , String > ( "order‐topic" 40 , 0 , order . getOrderId (). toString (), JSON . toJSONString ( order )); 41 // 未指定发送分区,具体发送的分区计算公式: hash(key)%partitionNum 42 /*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("my‐replicated‐topic" 43 , order . getOrderId (). toString (), JSON . toJSONString ( order )); */ 44 45 // 等待消息发送成功的同步阻塞方法 46 /*RecordMetadata metadata = producer.send(producerRecord).get(); 47 System . out . println ( " 同步方式发送消息结果: " + "topic‐" + metadata . topic () + "|partition‐" 48 + metadata . partition () + "|offset‐" + metadata . offset ()); */ 49 50 // 异步方式发送消息 51 producer . send ( producerRecord , new Callback () { 52 @Override 53 public void onCompletion ( RecordMetadata metadata , Exception exception ) { 54 if ( exception != null ) { 55 System . err . println ( " 发送消息失败: " + exception . getStackTrace ()); 56 57 } 58 if ( metadata != null ) { 59 System . out . println ( " 异步方式发送消息结果: " + "topic‐" + metadata . topic () + "|partition‐" 60 + metadata . partition () + "|offset‐" + metadata . offset ()); 61 } 62 countDownLatch . countDown (); 63 } 64 }); 65 66 // 送积分 TODO 67 68 } 69 70 countDownLatch . await ( 5 , TimeUnit . SECONDS ); 71 producer . close (); 72 } 73 }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #JAVA #访问kafka #PUBLIC #class #MsgProducer #2