Spring Boot集成Kafka的示例代码

本篇文章主要介绍了Spring Boot集成Kafka的示例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

本文介绍了Spring Boot集成Kafka的示例代码,分享给大家,也给自己留个笔记

系统环境

使用远程服务器上搭建的kafka服务

  1. Ubuntu 16.04 LTS
  2. kafka_2.12-0.11.0.0.tgz
  3. zookeeper-3.5.2-alpha.tar.gz

集成过程

1.创建spring boot工程,添加相关依赖:

   4.0.0com.laravelshao.springbootspring-boot-integration-kafka0.0.1-SNAPSHOTjarspring-boot-integration-kafkaDemo project for Spring Boot org.springframework.bootspring-boot-starter-parent2.0.0.RELEASE UTF-8UTF-81.8  org.springframework.bootspring-boot-starter org.springframework.kafkaspring-kafka org.springframework.bootspring-boot-starter-json org.springframework.bootspring-boot-starter-testtest   org.springframework.bootspring-boot-maven-plugin

2.添加配置信息,这里使用yml文件

 spring: kafka: bootstrap-servers:X.X.X.X:9092 producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer consumer: group-id: test auto-offset-reset: earliest value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring: json: trusted: packages: com.laravelshao.springboot.kafka 

3.创建消息对象

 public class Message { private Integer id; private String msg; public Message() { } public Message(Integer id, String msg) { this.id = id; this.msg = msg; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } @Override public String toString() { return "Message{" + "id=" + id + ", msg='" + msg + '\'' + '}'; } } 

4.创建生产者

 package com.laravelshao.springboot.kafka; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; /** * Created by shaoqinghua on 2018/3/23. */ @Component public class Producer { private static Logger log = LoggerFactory.getLogger(Producer.class); @Autowired private KafkaTemplate kafkaTemplate; public void send(String topic, Message message) { kafkaTemplate.send(topic, message); log.info("Producer->topic:{}, message:{}", topic, message); } } 

5.创建消费者,使用@ KafkaListener注解监听主题

 package com.laravelshao.springboot.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * Created by shaoqinghua on 2018/3/23. */ @Component public class Consumer { private static Logger log = LoggerFactory.getLogger(Consumer.class); @KafkaListener(topics = "test_topic") public void receive(ConsumerRecord consumerRecord) { log.info("Consumer->topic:{}, value:{}", consumerRecord.topic(), consumerRecord.value()); } } 

6.发送消费测试

 package com.laravelshao.springboot; import com.laravelshao.springboot.kafka.Message; import com.laravelshao.springboot.kafka.Producer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ApplicationContext; @SpringBootApplication public class IntegrationKafkaApplication { public static void main(String[] args) throws InterruptedException { ApplicationContext context = SpringApplication.run(IntegrationKafkaApplication.class, args); Producer producer = context.getBean(Producer.class); for (int i = 1; i <10; i++) { producer.send("test_topic", new Message(i, "test topic message " + i)); Thread.sleep(2000); } } } 

可以依次看到发送消息,消费消息

异常问题

反序列化异常(自定义的消息对象不在kafka信任的包路径下)?

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.719 Container exception
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test_topic-0 at offset 9. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'com.laravelshao.springboot.kafka.Message' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
 at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139)
 at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113)
 at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:191)
 at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
 at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
 at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
 at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
 at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
 at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
 at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:667)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.lang.Thread.run(Thread.java:745)

解决方法:将当前包添加到kafka信任的包路径下

 spring: kafka: consumer: properties: spring: json: trusted: packages: com.laravelshao.springboot.kafka 

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持html中文网。

以上就是Spring Boot集成Kafka的示例代码的详细内容,更多请关注0133技术站其它相关文章!

赞(0) 打赏
未经允许不得转载:0133技术站首页 » Java