发布于: 2023-10-24 ,更新于:2024-07-05
kafka安装文档
关闭防火墙
1 systemctl stop firewalld
需安装Zookeeper组件具体要求同Zookeeper任务要求,并与Kafka环境适配,启动Zookeeper并截图保存结果:
在master,slave1,slave2分别启动zookeeper
zookeeper 配置文件 /opt/software/zookeeper/conf/zoo.cfg
,数据文件夹设置为 /opt/software/zookeeper/zkdata
,zoo.cfg
需要和 kafka 的 zookeeper.properties
适配
1 vi /opt/module/zookeeper/conf/zoo.cfg
需要查看的配置内容:
1 2 3 4 5 dataDir=/opt/module/zookeeper/zkdata clientPort=2181 server.1=master:2888:3888 server.2=slave1:2888:3888 server.3=slave2:2888:3888
解压kafka安装包到 /opt/module
路径,并修改解压后文件夹名为 kafka ,截图并保存结果:
1 2 tar -xvzf /opt/software/kafka1.0.0.tar.gz -C /opt/module mv /opt/module/kafka_2.11-1.0.0 /opt/module/kafka
设置kafka环境变量, 并使环境变量只对当前root用户生效,截图并保存结果:
添加以下内容:
1 2 export KAFKA_HOME=/opt/module/kafka export PATH=$PATH:$KAFKA_HOME/bin
刷新生效:
1 source /root/.bash_profile
修改kafka相应文件, 截图并保存结果:
在 master 上修改文件zookeeper.properties
1 vi /opt/module/kafka/config/zookeeper.properties
配置文件内容:
1 2 3 4 5 clientPort=2181 dataDir=/opt/module/kafka/data server.1=master:2888:3888 server.2=slave1:2888:3888 server.3=slave2:2888:3888
在kafka安装文件夹中建立 data 文件夹,与 dataDir=/opt/module/kafka/data
对应
1 2 mkdir /opt/module/kafka/datavi /opt/module/kafka/data/myid
在 master 上修改 server.properties
1 vi /opt/module/kafka/config/server.properties
修改以下内容:
1 2 3 4 5 broker.id =0 listeners =PLAINTEXT://192.168.152.240:9092 advertised.listeners =PLAINTEXT://192.168.152.240:9092 log.dirs =/usr/local/src/kafka/data zookeeper.connect =192.168.152.240:2181,192.168.152.241:2181,192.168.152.242:2181
文件分发:
1 2 3 4 scp -r /opt/module/kafka slave1:/opt/module scp -r /opt/module/kafka slave2:/opt/module scp /root/.bash_profile slave1:/root scp /root/.bash_profile slave2:/root
在其他节点刷新变量文件生效:
1 source /root/.bash_profile
1 2 3 cat >/opt/module/kafka/data/myid<<EOF 2 EOF
修改 server.properties
文件为以下内容:
1 2 3 4 broker.id=1 listeners=PLAINTEXT://192.168.152.241:9092 advertised.listeners=PLAINTEXT://192.168.152.241:9092 log.dirs=/usr/local/src/kafka/data zookeeper.connect=192.168.152.240:2181,192.168.152.241:2181,192.168.152.242:2181
在 slave2 上修改 myid
文件为 3
1 2 3 cat >/opt/module/kafka/data/myid<<EOF 3 EOF
修改 server.properties
文件为以下内容:
1 2 3 4 broker.id=1 listeners=PLAINTEXT://192.168.152.242:9092 advertised.listeners=PLAINTEXT://192.168.152.242:9092 log.dirs=/usr/local/src/kafka/data zookeeper.connect=192.168.152.240:2181,192.168.152.241:2181,192.168.152.242:2181
启动 kafka 并保存命令输出结果, 截图并保存结果:
分别在master, slave1, slave2 上执行
1 2 3 kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties jps
输出以下内容为正常启动:
1 2 3 2198 QuorumPeerMain 6732 Jps 3758 Kafka
创建指定topic, 截图并保存结果:
1 kafka-topics.sh --zookeeper master01:2181,slave01:2181,slave02:2181 --create --partitions 3 --replication-factor 3 --topic test
输出以下内容为正常启动:
1 Created topic "test-topic" .
查看所有topic信息, 并截图保存结果:
1 kafka-topics.sh --list --zookeeper master01:2181
输出:
1 kafka-topics.sh -zookeeper master01:2181,slave01:2181,slave02:2181 --describe --topic test
输出:
1 2 3 4 Topic:test-topic PartitionCount:3 ReplicationFactor:3 Configs: Topic: test-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,0 Topic: test-topic Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: test-topic Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1
启动指定生产者(producer), 并截图保存结果:
注意: 控制台发送者的broker-list的ip需要与server.properties中listeners处的一致, 控制台消费者就能接收到消息
1 kafka-console-producer.sh --broker-list 192.168.152.240:9092 --topic test
启动消费者(sonsumer), 并截图保存结果:
再开启一个master连接, 输入:
1 kafka-console-consumer.sh --bootstrap-server 192.168.152.240:9092 --topic test
注: kafka旧版本连接服务器的参数为 --zookeeper
, 新版为 --bootstrap-server
输出:
1 Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]
测试生产者(producer), 并截图保存结果:
1 kafka-console-producer.sh --broker-list 192.168.152.240:9092 --topic test
1 2 3 4 5 6 7 8 9 [root@master01 ~] >111 >222 >333 >444 >555 >666 >777 >888
测试消费者(consumer), 并截图保存结果
消费者上显示:
1 2 3 4 5 6 7 8 9 Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. 111 222 333 444 555 666 777 888
基于命令行方式使用kafka
1. 三台机器上启动zookeeper
zkServer.sh start
2. 三台机器上启动kafka服务
1 kafka-server-start.sh -daemon /usr/local/src/kafka/config/server.properties
3. 查看进程
4. 创建主题
1 2 3 4 5 kafka-topics.sh --create \ --topic itcasttopic \ --partitions 3 \ --replication-factor 2 \ --zookeeper master01:2181,slave01:2181,slave02:2181
5. 创建生产者,来生产消息。在master01上输入
1 2 3 kafka-console-producer.sh \ --broker-list master01:9092,slave01:9092,slave02:9092 \ --topic itcasttopic
6. 创建消费者,来消费消息。在slave01上输入
1 2 3 kafka-console-consumer.sh \ --from-beginning \ --topic itcasttopic
测试消费者(consumer)(kafka2.11-2.0.0命令有所不同)
1 2 3 4 kafka-console-consumer.sh \ --from-beginning \ --topic test \ --bootstrap-server 192.168.152.245:9092