irpas技术客

To avoid unwanted headers like spring_json_header_types_zzhongcy

网络投稿 8027

如果采用StringSerializer,则直接使用getBytes设置header @Bean public ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // See https://kafka.apache.org/documentation/#producerconfigs for more properties return props; } @Bean public KafkaTemplate<Integer, String> kafkaTemplate() { return new KafkaTemplate<Integer, String>(producerFactory()); }

采用getBytes设置header即可去掉spring_json_header_types header:

public void sendFoo(String data){ Message<String> message = MessageBuilder .withPayload(data) .setHeader(KafkaHeaders.TOPIC, topicFoo) .setHeader(KafkaHeaders.MESSAGE_KEY, "999") .setHeader(KafkaHeaders.PARTITION_ID, 0) .setHeader("X-Custom-Header", "Sending Custom Header with Spring Kafka".getBytes()) .setHeader("X-Custom-Header2", "Sending Custom Header with Spring Kafka2".getBytes()) .build(); LOG.info("sending message='{}' to topic='{}'", data, topicFoo); kafkaTemplate.send(message);

其他参考: Requirement:

? ? ? The application must generate a Spring Message object including?id?as header and send the message to Kafka Topic. We have used Spring Cloud Streams to integrate the application with Kafka.

Expected Output

? ? ? The consumer application that consumes message from the Kakfa Topic must be able to read the message headers?id?and validate if the value of the header is same as Kafka Producer Record Key.

Actual Output

? ? ? The consumer application that consumes message from the Kafka Topic is always receiving the header?id?value as "None".

Customization

? ? ? To avoid unwanted headers like?spring_json_header_types?generated along with custom headers by the default mapper class i.e.?BinderHeaderMapper?and override the default header patterns to allow?id, a custom header mapper is implemented.

@Slf4j public class CustomKafkaHeaderMapper extends AbstractKafkaHeaderMapper { private final ObjectMapper objectMapper; public CustomKafkaHeaderMapper(String... patterns) { super(patterns); this.objectMapper = new ObjectMapper(); this.objectMapper.registerModule( (new SimpleModule()) .addDeserializer( MimeType.class, new CustomKafkaHeaderMapper.MimeTypeJsonDeserializer())); } @Override public void fromHeaders(MessageHeaders messageHeaders, Headers target) { messageHeaders.forEach( (key, value) -> { if (!KafkaHeaders.DELIVERY_ATTEMPT.equals(key) && this.matches(key, value)) { Object valueToAdd = this.headerValueToAddOut(key, value); if (valueToAdd instanceof byte[]) { target.add(new RecordHeader(key, (byte[]) valueToAdd)); } else if (valueToAdd instanceof String) { target.add(new RecordHeader(key, ((String) valueToAdd).getBytes(this.getCharset()))); } else { try { target.add(new RecordHeader(key, new ObjectMapper().writeValueAsBytes(valueToAdd))); } catch (JsonProcessingException e) { logger.error( e, () -> "Could not map " + key + " with type " + value.getClass().getName() ); } } } }); } @Override public void toHeaders(Headers source, Map<String, Object> target) { source.forEach( header -> { if (KafkaHeaders.DELIVERY_ATTEMPT.equals(header.key())) { target.put(header.key(), ByteBuffer.wrap(header.value()).getInt()); } else { target.put(header.key(), new String(header.value(), this.getCharset())); } }); } private class MimeTypeJsonDeserializer extends StdNodeBasedDeserializer<MimeType> { private static final long serialVersionUID = 1L; MimeTypeJsonDeserializer() { super(MimeType.class); } public MimeType convert(JsonNode root, DeserializationContext ctxt) throws IOException { if (root instanceof TextNode) { return MimeType.valueOf(root.asText()); } else { JsonNode type = root.get("type"); JsonNode subType = root.get("subtype"); JsonNode parameters = root.get("parameters"); Map<String, String> params = (Map) CustomKafkaHeaderMapper.this.objectMapper.readValue( parameters.traverse(), TypeFactory.defaultInstance() .constructMapType(HashMap.class, String.class, String.class)); return new MimeType(type.asText(), subType.asText(), params); } } } }

This?customHeaderMapper?bean name is configured to the Binder properties as mentioned below:spring.cloud.stream.kafka.binder.headerMapperBeanName=customKafkaHeaderMapper

Analysis

I thought the Custom Header Mapper will serve the purpose and set the?id?header in the Kafka Producer Record. But in my initial analysis, I observed that

The custom header mapper bean is captured and configured to be used in the code below inside?KafkaMessageChannelBinder?class?createProducerMessageHandler?method. final KafkaHeaderMapper mapper = null; if (this.configurationProperties.getHeaderMapperBeanName() != null) { mapper = (KafkaHeaderMapper)applicationContext.getBean(this.configurationProperties.getHeaderMapperBeanName(), KafkaHeaderMapper.class); }

But at the end of this method, the handler is set to a new KafkaHeaderMapper object that uses the custom header mapper to map spring message headers.

final KafkaHeaderMapper mapper = null; if (this.configurationProperties.getHeaderMapperBeanName() != null) { mapper = (KafkaHeaderMapper)applicationContext.getBean(this.configurationProperties.getHeaderMapperBeanName(), KafkaHeaderMapper.class); } if (mapper == null) { try { mapper = (KafkaHeaderMapper)applicationContext.getBean("kafkaBinderHeaderMapper", KafkaHeaderMapper.class); } catch (BeansException var14) { } } Object mapper; if (producerProperties.getHeaderMode() != null && !HeaderMode.headers.equals(producerProperties.getHeaderMode())) { mapper = null; } else if (mapper == null) { String[] headerPatterns = ((KafkaProducerProperties)producerProperties.getExtension()).getHeaderPatterns(); if (headerPatterns != null && headerPatterns.length > 0) { mapper = new BinderHeaderMapper(BinderHeaderMapper.addNeverHeaderPatterns(Arrays.asList(headerPatterns))); } else { mapper = new BinderHeaderMapper(); } } else { mapper = new KafkaHeaderMapper() { public void toHeaders(Headers source, Map<String, Object> target) { mapper.toHeaders(source, target); } public void fromHeaders(MessageHeaders headers, Headers target) { mapper.fromHeaders(headers, target); BinderHeaderMapper.removeNeverHeaders(target); } }; } handler.setHeaderMapper((KafkaHeaderMapper)mapper);

? ? If it can be observed inside the?fromHeaders?method of the above code, the?mapper?is the custom header mapper instance that is mapping the spring message headers to the?Apache Kafka Headers.

? ? ?Till here, I can see the?id?header entry available in the?Apache Kafka Headers. Post that the?BinderHeaderMapper's?removeNeverHeaders?is invoked by passing the Apache Kafka Headers to explicitly remove the?id?header from the record.

public static void removeNeverHeaders(Headers headers) { headers.remove("id"); headers.remove("timestamp"); headers.remove("deliveryAttempt"); headers.remove("scst_nativeHeadersPresent"); }

As per my analysis, even if my application sets the a simple Spring Message with?id?header, before the message is written to the Kafka topic, the above code is removing the 'id' header from the Apache Kafka Headers and the id value is deserialized as 'None' by the consumer.

I would like to know, if there is a way to avoid the removal of the?id?header from the Kafka Producer Record Headers as per our project requirement.

Please let me know, if you need further information.

其他参考

4.?Reference

DefaultKafkaHeaderMapper no longer adding data type for String data type · Issue #1346 · spring-projects/spring-kafka · GitHub

转自:

Spring Kafka overriding custom header pattern and removes id from Producer Record Header · Issue #1803 · spring-projects/spring-kafka · GitHub


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #To #avoid #unwanted #headers #like #Kafka