亚洲免费在线视频-亚洲啊v-久久免费精品视频-国产精品va-看片地址-成人在线视频网

您的位置:首頁技術文章
文章詳情頁

Java kafka如何實現自定義分區類和攔截器

瀏覽:57日期:2022-08-31 13:14:07

生產者發送到對應的分區有以下幾種方式:

(1)指定了patition,則直接使用;(可以查閱對應的java api, 有多種參數)

(2)未指定patition但指定key,通過對key的value進行hash出一個patition;

(3)patition和key都未指定,使用輪詢選出一個patition。

但是kafka提供了,自定義分區算法的功能,由業務手動實現分布:

1、實現一個自定義分區類,CustomPartitioner實現Partitioner

import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;public class CustomPartitioner implements Partitioner { /** * * @param topic 當前的發送的topic * @param key 當前的key值 * @param keyBytes 當前的key的字節數組 * @param value 當前的value值 * @param valueBytes 當前的value的字節數組 * @param cluster * @return */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //這邊根據返回值就是分區號, 這邊就是固定發送到三號分區 return 3; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { }}

2、producer配置文件指定,具體的分區類

// 具體的分區類props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 'kafka.CustomPartitioner');

技巧:可以使用ProducerConfig中提供的配置ProducerConfig

kafka producer攔截器

攔截器(interceptor)是在Kafka 0.10版本被引入的。

interceptor使得用戶在消息發送前以及producer回調邏輯前有機會對消息做一些定制化需求,比如修改消息等。

許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)。

所使用的類為:

org.apache.kafka.clients.producer.ProducerInterceptor

我們可以編碼測試下:

1、定義消息攔截器,實現消息處理(可以是加時間戳等等,unid等等。)

import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;import java.util.UUID;public class MessageInterceptor implements ProducerInterceptor<String, String> { @Override public void configure(Map<String, ?> configs) { System.out.println('這是MessageInterceptor的configure方法'); } /** * 這個是消息發送之前進行處理 * * @param record * @return */ @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { // 創建一個新的record,把uuid入消息體的最前部 System.out.println('為消息添加uuid'); return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),UUID.randomUUID().toString().replace('-', '') + ',' + record.value()); } /** * 這個是生產者回調函數調用之前處理 * @param metadata * @param exception */ @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { System.out.println('MessageInterceptor攔截器的onAcknowledgement方法'); } @Override public void close() { System.out.println('MessageInterceptor close 方法'); }}

2、定義計數攔截器

import java.util.Map;import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;public class CounterInterceptor implements ProducerInterceptor<String, String>{ private int errorCounter = 0; private int successCounter = 0; @Override public void configure(Map<String, ?> configs) { System.out.println('這是CounterInterceptor的configure方法'); } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { System.out.println('CounterInterceptor計數過濾器不對消息做任何操作'); return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { // 統計成功和失敗的次數 System.out.println('CounterInterceptor過濾器執行統計失敗和成功數量'); if (exception == null) { successCounter++; } else { errorCounter++; } } @Override public void close() { // 保存結果 System.out.println('Successful sent: ' + successCounter); System.out.println('Failed sent: ' + errorCounter); }}

3、producer客戶端:

import org.apache.kafka.clients.producer.*;import java.util.ArrayList;import java.util.List;import java.util.Properties;public class Producer1 { public static void main(String[] args) throws Exception { Properties props = new Properties(); // Kafka服務端的主機名和端口號 props.put('bootstrap.servers', 'localhost:9092'); // 等待所有副本節點的應答 props.put('acks', 'all'); // 消息發送最大嘗試次數 props.put('retries', 0); // 一批消息處理大小 props.put('batch.size', 16384); // 請求延時,可能生產數據太快了 props.put('linger.ms', 1); // 發送緩存區內存大小,數據是先放到生產者的緩沖區 props.put('buffer.memory', 33554432); // key序列化 props.put('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer'); // value序列化 props.put('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer'); // 具體的分區類 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 'kafka.CustomPartitioner'); //定義攔截器 List<String> interceptors = new ArrayList<>(); interceptors.add('kafka.MessageInterceptor'); interceptors.add('kafka.CounterInterceptor'); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 1; i++) { producer.send(new ProducerRecord<String, String>('test_0515', i + '', 'xxx-' + i), new Callback() {public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println('這是producer回調函數');} }); } /*System.out.println('現在執行關閉producer'); producer.close();*/ producer.close(); }}

總結,我們可以知道攔截器鏈各個方法的執行順序,假如有A、B攔截器,在一個攔截器鏈中:

(1)執行A的configure方法,執行B的configure方法

(2)執行A的onSend方法,B的onSend方法

(3)生產者發送完畢后,執行A的onAcknowledgement方法,B的onAcknowledgement方法。

(4)執行producer自身的callback回調函數。

(5)執行A的close方法,B的close方法。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持好吧啦網。

標簽: Java
相關文章:
主站蜘蛛池模板: 狠狠狠狠狠 | 亚洲国产最新 | 欧洲一级毛片 | 手机在线观看亚洲国产精品 | 99爱在线观看精品视频 | 手机看片免费基地 | 欧美les视频xxxx在线观看 | 怡红院免费全部视频在线视频 | 亚洲欧美日韩综合一区久久 | 男女视频免费看 | 亚洲国产日韩精品 | 特黄特色三级在线观看 | 国产精品莉莉欧美自在线线 | 91视频站| 美女被cao免费看在线看网站 | 免费观看大片毛片 | 九九色视频| 中国嫩模一级毛片 | 亚洲日韩中文字幕天堂不卡 | 亚洲精品一区二区三区四 | 午夜精品一区二区三区在线观看 | 免费萌白酱国产一区二区三区 | 粉嫩jk制服美女啪啪 | 日本视频免费在线播放 | 久久精品中文字幕不卡一二区 | 久久久久免费精品国产 | 精品国产免费第一区二区三区日韩 | 免费高清毛片在线播放视频 | 亚洲美女高清aⅴ视频免费 亚洲美女黄色片 | 欧美日韩国产va另类 | 亚洲无吗 | 日韩国产片| 国产大乳喷奶水在线看 | fefe66免费毛片你懂的 | 日韩偷拍自拍 | 欧美一级俄罗斯黄毛片 | 国产精品极品美女自在线看免费一区二区 | 日韩免费一区二区三区在线 | 那里有黄色网址 | 亚洲经典乱码在线播 | 亚洲精品456在线播放无广告 |