Kafka官网 https://kafka.apache.org/
分布式流式处理平台、高吞吐、可持久化、水平扩展、支持流数据处理
- 消息系统
系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性、消息顺序性、回溯消费 - 存储系统
消息持久化功能、多副本机制、可作为长期的数据存储系统使用 - 流式处理平台
完整的流式处理类库、窗口、连接、聚合等
基本概念
Kafka体系包含Producer、Broker、Consumer和外部Zookeeper做集群元数据、选举等操作
- Producer:clinet生产者,将消息发送到Broker
- Broker:server实例,服务代理节点,将收到的消息存储到磁盘
- Consumer:client消费者,从Broker订阅消费消息并处理相应业务逻辑
topic主题、partition分区
- topic
topic是逻辑划分,可以细分为多个partition区
分区规则合理,则消息可以均匀地分配到不同的分区broker中,减少I/O瓶颈 - partition
同一个topic的patition可以分布在不同的broker上
一个partition只属于一个topic
同一个topic下不同partition的消息不同 - offset
offset为消息被追加到分区后分配的偏移量offset递增号
offset在同一个partition种保证消息顺序性
offset不跨越partition,所以不保证同一个topic中的顺序性
副本Replica
提高容灾能力,同一分区的不同副本保存相同信息(同一时刻,副本之间不是完全一样),一主多从。
- leader负责读写、follower负责同步信息
- AR(Assigned Replicas)分区中的所有副本
- ISR(In-Sync Replicas)与leader副本保持一定程度(可以通过参数配置)同步的副本,ISR是AR的一个子集
- OSR(Out-of-Sync Replicas)与leader相比滞后过多的follwer
- AR=ISR+OSR
通过引入ISR,当ISR中的leader消息被写入,且所有的ISR中也同步后leader的高水位线会改变,此时client可以拉取
安装、配置
- 安装zookeeper
官网地址 https://zookeeper.apache.org/releases.html
修改zookeeper的相关配置 - 安装kafka
官网地址 https://kafka.apache.org/downloads
修改server.properties配置
broker.id 配置编号
listeners 配置内网IP
advertised.listeners 配置外网IP
log.dirs 配置日志目录
zookeeper.connect 配置zookeeperIP
advertised.listeners和listeners的端口相同
后台以集群启动kafka,需要修改启动端口和id
./kafka-server-start.sh -daemon ../config/server.properties
查看相应日志是否正常
#生产者消费者代码
- 依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
- 生产者
static String brokerList = "";//IP端口
static String topic = "topic-demo";
public static void main(String[] args) {
// new StringSerializer()
Properties properties = new Properties();
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers", brokerList);
//配置客户端生产者参数
// 注意producer需要close
try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
//构建发送消息
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Hello,Kafka!");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
- 消费者
public class ConsumerFastStart {
static String brokerList = "";//IP端口
static String topic = "topic-demo";
static String groupId = "group.demo";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("bootstrap.servers", brokerList);
properties.put("group.id", groupId);
//创建客户端实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//订阅主题
consumer.subscribe(Collections.singleton(topic));
//循环消费
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
records.forEach(v-> System.out.println(v.value()));
}
}
}
Comments | 0 条评论