在使用过程中,发现 安装kafka_2.12-2.7 时java客户端无法连接kafka, 更换kafka_2.13-2.7 后,java客户端可以连接kafka。
三个主要功能
概念
5个核心api
topic与 logs
对于每个主题,Kafka集群维护一个分区日志,如下所示:
数据复制与容错
分区分布在Kafka群集中的服务器上,每个服务器处理数据并要求共享分区。每个分区都在可配置数量的服务器之间复制,以实现容错功能。每个分区有一个leader,有0-多个followers,leader处理所有的读写请求,followers被动的复制leader数据,leader故障后,follower按照规则升级为leader。
消费者
[root@localhost ~]# wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
[root@k8s-n1 ~]# wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
IP | hostname&app |
---|---|
192.168.0.126 | k8s-n2& zookeeper、kafka |
192.168.0.127 | k8s-n3& zookeeper、kafka |
192.168.0.128 | k8s-n4& zookeeper、kafka |
[root@k8s-n2 ~]# tar xf kafka_2.12-2.5.0.tgz
[root@k8s-n2 ~]# mv kafka_2.12-2.5.0 /data/kafka
[root@k8s-n2 ~]# cd /data/kafka
[root@k8s-n2 kafka]# mkdir -pv /data/kafka/logs
#修改配置文件(主要修改log.dirs、zookeeper.connect;broker.id三台机器不能相同)
[root@k8s-n2 kafka]# cat config/server.properties | egrep -v '^#|^$'
broker.id=0
#broker.id=1 #192.168.0.127机器id设置
#broker.id=2 #192.168.0.128机器id设置
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.0.126:2181,192.168.0.127:2181,192.168.0.128:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
#启动(3节点都启动)
[root@k8s-n2 kafka]# ./bin/kafka-server-start.sh -daemon config/server.properties
[root@k8s-n2 kafka]# jps
1361 QuorumPeerMain
28581 Jps
27815 Kafka
#关闭命令./bin/kafka-server-stop.sh
至此centos7 kafka安装过程结束
#创建topic
[root@k8s-n2 kafka]# ./bin/kafka-topics.sh --create --bootstrap-server 192.168.0.126:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic my-replicated-topic.
#查看topci
[root@k8s-n2 kafka]# ./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic
Topic: my-replicated-topic PartitionCount: 1 ReplicationFactor: 3 Configs: segmen
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,
[root@k8s-n2 kafka]#
#启动一个生产者,发送消息
[root@k8s-n2 kafka]# bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-replicated-topic
>my test message 1
>my test message 2
>^C
#启动一个消费者消费消息
[root@k8s-n2 kafka]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-inning --topic my-replicated-topic
my test message 1
my test message 2
^C
#杀死一个kafka
[root@k8s-n2 kafka]# jps
1361 QuorumPeerMain
17109 Jps
30809 Kafka
[root@k8s-n2 kafka]# kill 30809
[root@k8s-n2 kafka]# jps
1361 QuorumPeerMain
17401 Jps
#在另外一台机器查看,Isr同步的只有2个节点
[root@k8s-n3 kafka]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic: my-replicated-topic PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,1
#再发送消息
[root@k8s-n2 kafka]# bin/kafka-console-producer.sh --bootstrap-server 192.168.0.127:9092 --topic my-replicated-topic
>111
>222
>^C
[root@k8s-n2 kafka]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.127:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2
111
222
^C
#启动kafka,再次查看Isr状态,很快恢复
[root@k8s-n2 kafka]# ./bin/kafka-server-start.sh -daemon config/server.properties
[root@k8s-n2 kafka]# ./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic: my-replicated-topic PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,1,0
jdk 支持8和11版本,这里使用jdk8,jdk安装过程省略
[root@localhost data]# tar xf kafka_2.12-2.7.0.tgz
[root@localhost data]# cd kafka_2.12-2.7.0/
[root@localhost kafka_2.12-2.7.0]# ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
##关闭zk命令使用 ./bin/zookeeper-server-stop.sh -daemon ./config/zookeeper.properties
#启动kafka
[root@localhost kafka_2.12-2.7.0]# ./bin/kafka-server-start.sh -daemon ./config/server.properties
###关闭kafka命令./bin/kafka-server-stop.sh ./config/server.properties