KafkaProducer在多线程环境下是安全的,可以在多线程下使用
cilent代码
消息的发送分为3种
1.异步发送后不管结果
2.异步发送后,调用Future的get方法阻塞查看结果
3.异步回调函数不阻塞,针对失败和成功后进行处理
- 序列化
消息发送时需要序列化字节数组,需要实现serializer。生产者的序列化和消费者的反序列化应可以一一对应,才能够解析,kafka默认提供几种基本类型的接口
public class KafkaProducerAnalysis {
/**
* 生产者逻辑
* 1。配置生产者客户端参数及创建相应的生产者实例
* 2。构建待发送消息
* 3。发送消息
* 4。关闭生产者实例
*/
public static final String brokerList = "";//如果是集群可以用,进行分割,也可以只配置一个ip,后续kafka会自己处理
public static final String topic = "topic-demo";
public static final Properties initConfig() {
Properties props = new Properties();
// props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("bootstrap.servers", brokerList);
props.put("client.id", "producer.client.id.demo");
return props;
}
/**
* 消息发送分为3种
* 1。发后即忘的异步
* 2。同步
* 3。异步
*
* @param args
*/
public static void main(String[] args) {
// Partitioner 分区器
// ProducerInterceptor 拦截器
Properties props = initConfig();
//producer一般会负责多条消息的发送,需要调用close方法关闭回收资源,会阻塞等待之前的消息发送完成
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
//构建发送消息
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Hello,Kafka!");
try {
/**
* 1。发后即忘
*/
producer.send(record);
/**
* 2。同步
*/
RecordMetadata recordMetadata = producer.send(record).get();
Future<RecordMetadata> send = producer.send(record);
RecordMetadata recordMetadata1 = send.get();
/**
* 3。异步回调函数
* 可以通过callback回调函数避免future做异步造成逻辑混乱。
* 注意此Callback为kafka的接口,不是JDK原生的
* kafka有响应时会做回调处理,要么成功,要么抛出异常
*/
producer.send(record, new Callback() {
@Override
/**
* 该方法的metadata和exception只有一个能为null,不可同时存在
* Callback回调可以保证分区有序,即在同一个分区2个消息发送结束
* 那么最先发送的肯定会最先执行callback函数
*/
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
//TODO 做异常处理
exception.printStackTrace();
} else {
//正常发送
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
ProducerRecord<K, V>
生产者发送的消息的包装类,可以定义topic、partition、headers、key、value、timestamp等内容
分区器
消息序列化后需要确定分区,如果ProducerRecord构造的时候确定分区字段则不需要再指定计算
其根据topic、k等数据信息通过patririon来计算分区
拦截器
1.在消息发送前做准备工作,拦截、定制化消息。
2.发送回调逻辑前调用
整体producer架构
生产者producer由2个线程组成
-
主线程
创建消息、拦截器、序列化起、分区器等作用,缓存到消息累加器RecordAccumulator队列中,减少网络I/O的消耗
-
Sender线程
存消息从Deque尾部插入,取消息从Deque头部读取
元数据的更新
- 指Kafka集群的元数据,记录了集群中有哪些主题,主题有哪些分区,每个分区的leader和follower副本分配的节点、ISR、AR等
- 客户端中没有需要的元数据信息则会更新,通过metadata.max.age.ms的值进行更新,默认为5分钟
- 更新是在客户端内部进行的,对外不可见
- Sender线程负责更新
- 主线程会读取信息,通过synchronized和final来保证多线程安全
生产者的参数
acks
用于指定分区中必须要有多少个副本收到消息,生产者才会认为是成功写入的,涉及到可靠性和吞吐量的权衡
- acks=1 默认值
leader副本成功写入消息,就会响应成功,此时如果leader崩溃,重新选举后其他ldeadr中没有消息,会丢失,折中的方案 - acks=0
生产者发送消息后不等待服务器响应直接认为成功,吞吐量最大 - acks=-1或all
需要ISR中的所有leader、follower副本都写入才响应成功,最强可靠性。但当ISR中只有leader时就退化为acks=1的情况。需要和replicas参数配合才能达到使用效果
其他参数
- max.request.size
用来限制生产者能发送的消息最大值,默认为1MB。和broker端的message.max.byte参数要配合使用,否则如果客户端发送大于服务端则会报错 - retries
配置生产者的重试次数,默认为0,即发生异常时不进行重试 - retries.backoff.ms
两次重试之间的间隔,避免无效的频繁重试,默认为100ms - compression.type
指定消息压缩方式,默认不压缩为none,可为gzip、snappy - connection.max.idle.ma
闲置多久后关闭连接,默认9分钟 - linger.ms
发送ProducerBatch之前等待消息继续追加的时间,默认为0。即会在ProducerBatch充满或等待超过linger.ms后发送出去。
增加值会造成消息的延迟,但会提升网络I/O的吞吐量 - revice.buffer.bytes
设置接受Socket缓冲区大小,默认为32KB,如果为-1则使用系统默认值 - send.buffer.bytes
设置发送Socket缓冲区大小,默认为128KB,为-1则为系统默认 - request.timeout.ms
配置Producer等待请求响应的最长时间,默认为30000ms。
Comments | 0 条评论