关于kafka发送消息的三种方式总结

这篇文章主要介绍了关于kafka发送消息的三种方式总结,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

kafka发送消息的方式

package com.zl.kafkademo; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; import java.util.Properties; /** * @Auther: le * @Date: 2019/4/23 22:05 * @Description: */ public class MyProducer implements Job { private static KafkaProducer producer; static { Properties properties = new Properties(); properties.put("bootstrap.servers","127.0.0.1:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer(properties); } /** * 第一种直接发送,不管结果 */ private static void sendMessageForgetResult(){ ProducerRecord record = new ProducerRecord( "kafka-study","name","Forget_result" ); producer.send(record); producer.close(); } /** * 第二种同步发送,等待执行结果 * @return * @throws Exception */ private static RecordMetadata sendMessageSync() throws Exception{ ProducerRecord record = new ProducerRecord( "kafka-study","name","sync" ); RecordMetadata result = producer.send(record).get(); System.out.println(result.topic()); System.out.println(result.partition()); System.out.println(result.offset()); return result; } /** * 第三种执行回调函数 */ private static void sendMessageCallback(){ ProducerRecord record = new ProducerRecord( "kafka-study","name","callback" ); producer.send(record,new MyProducerCallback()); } //定时任务 @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { try { sendMessageSync(); }catch (Exception e){ System.out.println("error:"+e); } } private static class MyProducerCallback implements Callback{ @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e !=null){ e.printStackTrace(); return; } System.out.println(recordMetadata.topic()); System.out.println(recordMetadata.partition()); System.out.println(recordMetadata.offset()); System.out.println("Coming in MyProducerCallback"); } } public static void main(String[] args){ //sendMessageForgetResult(); //sendMessageCallback(); JobDetail job = JobBuilder.newJob(MyProducer.class).build(); Trigger trigger = TriggerBuilder.newTrigger() .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever()).build(); try { Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); scheduler.scheduleJob(job,trigger); scheduler.start(); }catch (SchedulerException e){ e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }

需要引入文件

         org.apache.kafkakafka-clients0.10.0.1 org.quartz-schedulerquartz2.3.0

测试方法

MAC下操作指令

1、创建主题:

./kafka-topics.sh --create --topic kafka-study --zookeeper 127.0.0.1:2181 --config max.message.bytes=12800000 --config flush.messages=1 --partitions 5 --replication-factor 1

2、运行上述程序,执行定时任务

3、查看消费情况

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-study --from-beginning

windows操作指令

 1、进入  D:\zookeeper-3.4.14\bin   打开新的cmd,输入“zkServer“,运行Zookeeper

 2、进入 D:\kafka_2.11-0.11.0.0 运行cmd

.\bin\windows\kafka-server-start.bat .\config\server.properties

3、 创建主题

进入D:\kafka_2.11-0.11.0.0运行cmd,输入:

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看已创建主题:

.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

查看指定主题的详细信息:

.\bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181 --topic test

查看主题消费详情:

.\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic kafka-study --from-beginning

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持0133技术站。

以上就是关于kafka发送消息的三种方式总结的详细内容,更多请关注0133技术站其它相关文章!

赞(0) 打赏
未经允许不得转载:0133技术站首页 » Java