KafkaProducer在多线程环境下是安全的,可以在多线程下使用

cilent代码

消息的发送分为3种

1.异步发送后不管结果
2.异步发送后,调用Future的get方法阻塞查看结果
3.异步回调函数不阻塞,针对失败和成功后进行处理

  • 序列化
    消息发送时需要序列化字节数组,需要实现serializer。生产者的序列化和消费者的反序列化应可以一一对应,才能够解析,kafka默认提供几种基本类型的接口
    image.png
public class KafkaProducerAnalysis {
    /**
     * 生产者逻辑
     * 1。配置生产者客户端参数及创建相应的生产者实例
     * 2。构建待发送消息
     * 3。发送消息
     * 4。关闭生产者实例
     */
    public static final String brokerList = "";//如果是集群可以用,进行分割,也可以只配置一个ip,后续kafka会自己处理
    public static final String topic = "topic-demo";

    public static final Properties initConfig() {
        Properties props = new Properties();
//        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("bootstrap.servers", brokerList);
        props.put("client.id", "producer.client.id.demo");
        return props;
    }

    /**
     * 消息发送分为3种
     * 1。发后即忘的异步
     * 2。同步
     * 3。异步
     *
     * @param args
     */
    public static void main(String[] args) {

//        Partitioner   分区器
//        ProducerInterceptor 拦截器
        Properties props = initConfig();
        //producer一般会负责多条消息的发送,需要调用close方法关闭回收资源,会阻塞等待之前的消息发送完成
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            //构建发送消息
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Hello,Kafka!");
            try {
                /**
                 * 1。发后即忘
                 */
                producer.send(record);
                /**
                 * 2。同步
                 */
                RecordMetadata recordMetadata = producer.send(record).get();
                Future<RecordMetadata> send = producer.send(record);
                RecordMetadata recordMetadata1 = send.get();
                /**
                 * 3。异步回调函数
                 * 可以通过callback回调函数避免future做异步造成逻辑混乱。
                 * 注意此Callback为kafka的接口,不是JDK原生的
                 * kafka有响应时会做回调处理,要么成功,要么抛出异常
                 */
                producer.send(record, new Callback() {
                    @Override
                    /**
                     * 该方法的metadata和exception只有一个能为null,不可同时存在
                     * Callback回调可以保证分区有序,即在同一个分区2个消息发送结束
                     * 那么最先发送的肯定会最先执行callback函数
                     */
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception != null) {
                            //TODO 做异常处理
                            exception.printStackTrace();
                        } else {
                            //正常发送
                        }
                    }
                });

            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }
}

ProducerRecord<K, V>

生产者发送的消息的包装类,可以定义topic、partition、headers、key、value、timestamp等内容
image.png
image.png

分区器

消息序列化后需要确定分区,如果ProducerRecord构造的时候确定分区字段则不需要再指定计算
image.png
其根据topic、k等数据信息通过patririon来计算分区

拦截器

1.在消息发送前做准备工作,拦截、定制化消息。
2.发送回调逻辑前调用
image.png

整体producer架构

生产者producer由2个线程组成

  • 主线程
    创建消息、拦截器、序列化起、分区器等作用,缓存到消息累加器RecordAccumulator队列中,减少网络I/O的消耗
    image.png

  • Sender线程
    存消息从Deque尾部插入,取消息从Deque头部读取
    image.png

元数据的更新

  • 指Kafka集群的元数据,记录了集群中有哪些主题,主题有哪些分区,每个分区的leader和follower副本分配的节点、ISR、AR等
  • 客户端中没有需要的元数据信息则会更新,通过metadata.max.age.ms的值进行更新,默认为5分钟
  • 更新是在客户端内部进行的,对外不可见
  • Sender线程负责更新
  • 主线程会读取信息,通过synchronized和final来保证多线程安全

生产者的参数

acks

用于指定分区中必须要有多少个副本收到消息,生产者才会认为是成功写入的,涉及到可靠性和吞吐量的权衡

  • acks=1 默认值
    leader副本成功写入消息,就会响应成功,此时如果leader崩溃,重新选举后其他ldeadr中没有消息,会丢失,折中的方案
  • acks=0
    生产者发送消息后不等待服务器响应直接认为成功,吞吐量最大
  • acks=-1或all
    需要ISR中的所有leader、follower副本都写入才响应成功,最强可靠性。但当ISR中只有leader时就退化为acks=1的情况。需要和replicas参数配合才能达到使用效果

其他参数

  • max.request.size
    用来限制生产者能发送的消息最大值,默认为1MB。和broker端的message.max.byte参数要配合使用,否则如果客户端发送大于服务端则会报错
  • retries
    配置生产者的重试次数,默认为0,即发生异常时不进行重试
  • retries.backoff.ms
    两次重试之间的间隔,避免无效的频繁重试,默认为100ms
  • compression.type
    指定消息压缩方式,默认不压缩为none,可为gzip、snappy
  • connection.max.idle.ma
    闲置多久后关闭连接,默认9分钟
  • linger.ms
    发送ProducerBatch之前等待消息继续追加的时间,默认为0。即会在ProducerBatch充满或等待超过linger.ms后发送出去。
    增加值会造成消息的延迟,但会提升网络I/O的吞吐量
  • revice.buffer.bytes
    设置接受Socket缓冲区大小,默认为32KB,如果为-1则使用系统默认值
  • send.buffer.bytes
    设置发送Socket缓冲区大小,默认为128KB,为-1则为系统默认
  • request.timeout.ms
    配置Producer等待请求响应的最长时间,默认为30000ms。

这个家伙很懒,啥也没有留下😋