色综合图-色综合图片-色综合图片二区150p-色综合图区-玖玖国产精品视频-玖玖香蕉视频

您的位置:首頁技術文章
文章詳情頁

spring 整合kafka監聽消費的配置過程

瀏覽:3日期:2023-07-20 15:01:58
前言

最近項目里有個需求,要消費kafka里的數據。之前也手動寫過代碼去消費kafka數據。但是轉念一想。既然spring提供了消費kafka的方法。就沒必要再去重復造輪子。于是嘗試使用spring的API。

項目技術背景,使用springMVC,XML配置和注解相互使用。kafka的配置都是使用XML方式。

整合過程

1. 引入spring-kafka的依賴包

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.0.RELEASE</version> </dependency>

2. 在spring的xml文件里增加配置項,也可以單獨創建一個spring-context-XX.xml文件。

<!-- consumer configuration 該配置項可以根據自己業務的實際需求做增加或刪除--> <bean class='java.util.HashMap'> <constructor-arg> <map><entry key='bootstrap.servers' value='${kafka.bootstrap.servers}' /><entry key='group.id' value='group' /><entry key='enable.auto.commit' value='true' /><entry key='auto.commit.interval.ms' value='3000' /><entry key='session.timeout.ms' value='10000' /><entry key='key.deserializer' value='org.apache.kafka.common.serialization.StringDeserializer' /><entry key='value.deserializer' value='org.apache.kafka.common.serialization.StringDeserializer' /> </map> </constructor-arg> </bean> <!-- create factory 該類是spring jar包里提供,就這么配置--> <bean class='org.springframework.kafka.core.DefaultKafkaConsumerFactory'> <constructor-arg> <ref bean='consumerProperties' /> </constructor-arg> </bean> <!-- 自定義的消費類,需要實現spring的接口 --> <bean /> <!-- 該類也是jar包里提供的,注入的監聽類是自己定義的,topic名稱是配置文件引入的--> <bean class='org.springframework.kafka.listener.ContainerProperties'> <constructor-arg name='topics' value='${kafka.paypal.topic.name}'/> <property name='messageListener' ref='payPalConsumer' /> </bean> <!-- 改類也是jar里提供的,把這個containerProperties和consumerfactory 注入 --> <bean init-method='doStart'> <constructor-arg ref='consumerFactory' /> <constructor-arg ref='containerProperties' /> </bean>

2. 自定義消費者類,消費者類依然可以使用注解。

/** * get msg from kafka */@Component public class PayPalConsumer implements MessageListener<String, String> { private static Logger logger = LoggerFactory.getLogger(PayPalConsumer.class); @Autowired private XXService XXService; @Override public void onMessage(ConsumerRecord<String, String> authorizeRecord) { String value = authorizeRecord.value(); if (StringUtils.isEmpty(value)){ logger.warn('receive message from kafka is null'); return; } logger.info('receive message from kafka is {}',value); }}

使用這個步驟配置,一次性過。非常順利。

到此這篇關于spring 整合kafka監聽消費的配置過程的文章就介紹到這了,更多相關spring 整合kafka內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!

標簽: Spring
相關文章:
主站蜘蛛池模板: 日韩毛片免费视频一级特黄 | 欧美精品午夜毛片免费看 | 中文三 级 黄 色 片 | 一级成人| 日本一区二区三区欧美在线观看 | 男女男在线精品网站免费观看 | 国产在线综合视频 | 性刺激久久久久久久久 | 久久综久久美利坚合众国 | 日韩毛片高清在线看 | 在线视频中文 | 久久中文字幕久久久久 | 成人国产欧美精品一区二区 | 欧美在线观看视频一区 | 日韩美女网站在线看 | 久久99国产精品视频 | 国产短视频精品一区二区三区 | 一级毛片私人影院免费 | 最近免费手机中文字幕3 | 四色永久 | 欧美成人性毛片免费版 | 老司机成人免费精品视频 | 美女视频黄视大全视频免费网址 | 亚洲欧美日韩在线播放 | 久久精品99 | 狼人总合狼人综合 | 中文字幕va一区二区三区 | 国产精品v免费视频 | 美美女高清毛片视频免费观看 | 亚洲精品国产高清不卡在线 | 欧美综合自拍亚洲综合百度 | 一级毛片美国一级j毛片不卡 | 国产一区影视 | 欧美japanese孕交 | 日本肥老妇色xxxxx日本老妇 | 牛人盗摄一区二区三区视频 | 99久久免费国产精品 | 国产a一级毛片含羞草传媒 国产a自拍 | 久久综合中文字幕一区二区 | 国产午夜精品久久理论片 | 精品视频在线免费看 |