《Kafka环境配置》

《Kafka环境配置》


title: 《Kafka环境配置》
date: 2022-6-7 21:20:41
description: Spark (HA) Spark (Yarn)

阅读全文

**


kafka 安装配置
Kafka是一种高吞吐量的分布式发布订阅消息系统,其在大数据开发应用上的目的是通过 Hadoo的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。大数据开发需掌握Kafka架构原理及各组件的作用和使用方法及相关功能的实现。

上传文件包 到/export/server/
解压文件

tar -zxvf kafka_2.11-2.0.0.tgz

创建软连接

ln -s kafka_2.11-2.0.0/ kafka

进入 /export/server/kafka/config 修改 配置文件 server.properties

cd /export/server/kafka/config vim server.properties

(1)21 行内容 broker.id=0 为依次增长的:0、1、2、3、4,集群中唯一 id 从0开始,每台不能重复
(注:此处不用修改)

21 broker.id=0

(2)31 行内容 #listeners=PLAINTEXT://:9092 取消注释,内容改为:
listeners=PLAINTEXT://master:9092
PLAINTEXT为通信使用明文(加密ssl)

31 listeners=PLAINTEXT://master:9092

(3)59 行内容 log.dirs=/tmp/kafka-logs 为默认日志文件存储的位置,改为
log.dirs=/export/server/data/kafka-logs

59 log.dirs=/export/data/kafka-logs

(4)63 行内容为 num.partitions=1 是默认分区数

63 num.partitions=1

(5)76 行部分

############################ Log Flush Policy ###############################

数据安全性(持久化之前先放到缓存上,从缓存刷到磁盘上)interval.messages interval.ms

(6)93 行部分

########################### **Log Retention Policy**############################ 
数据保留策略 168/24=7,1073741824/1024=1GB,300000ms = 300s = 5min超过了删掉 (最后修改时间还是创建时间-->日志段中最晚的一条消息,维护这个最大的时间戳-->用户无法 干预

(7)121 行内容 zookeeper.connect=localhost:2181 修改为zookeeper.connect=master:2181,slave1:2181,slave2:2181

121 zookeeper.connect=master:2181,slave1:2181,slave2:2181

(8)126 行内容 group.initial.rebalance.delay.ms=0 修改为group.initial.rebalance.delay.ms=3000

133 group.initial.rebalance.delay.ms=3000

(9)给 slaves1和 slavs2 scp 分发 kafka
cd /export/server/

scp -r /export/server/kafka_2.11-2.0.0/ slave1:$PWD
scp -r /export/server/kafka_2.11-2.0.0/ slave2:$PWD

(10)创建软连接

ln -s /export/server/kafka_2.11-2.0.0/ kafka

(11)配置 kafka 环境变量(注:可以一台一台配,也可以在 master 完成后发给 slave1 和slave2)

vim /etc/profile # kafka 环境变量 export KAFKA_HOME=/export/server/kafka export PATH=$PATH:$KAFKA_HOME/bin

(12)重新加载环境变量

source /etc/profile

(13)分别在 slave1 和slave2 上修改配置文件 路径:/export/server/kafka/config
将文件 server.properties 的第 21 行的 broker.id=0 修改为 broker.id=1 同理 slave2 同样操

21 broker.id=1

(14)将文件 server.properties 的第 31 行的 listeners=PLAINTEXT://master:9092 修改为
listeners=PLAINTEXT://slave1:9092 同理slave2 同样操作

31 listeners=PLAINTEXT://slave1:9092

(15)启停 kafka (注:kafka 启动需要在 zookeeper 启动的情况下才可)

kafka-server-start.sh -daemon /export/server/kafka/config/server.properties

hadoop,zookeeper,kafka启动
结果显示:

(base) [root@master ~]# jps
11793 NodeManager
91699 Kafka
85618 QuorumPeerMain
10697 NameNode
10924 DataNode
11596ResourceManager
109852 Jps

[root@slave1 ~]# jps
9301 DataNode
9493 SecondaryNameNode
95959 Kafka
102971 Jps
9855 NodeManager
89534 QuorumPeerMain

[root@slave2 ~]# jps
88660 QuorumPeerMain
95204 Kafka
9110 NodeManager
8616 DataNode
102104 Jps

关闭 kafka
kafka-server-stop.sh stop

定制脚本一键启动

vim kafka-all.sh
放入 /bin 路径下

#!/bin/bash
if [ $# -eq 0 ] ;
then
echo “please input param:start stop”
else
if [ $1 = start ] ;then
echo “${1}ing master”
ssh master “source /etc/profile;kafka-server-start.sh -daemon /export/server/kafka/config/server.properties”
for i in {1..2}
do
echo “${1}ing slave${i}”
ssh slave${i} “source /etc/profile;kafka-server-start.sh -daemon /export/server/kafka/config/server.properties”
done
fi
if [ $1 = stop ];then
echo “${1}ping master “
ssh master “source /etc/profile;kafka-server-stop.sh”
for i in {1..2}
do
echo “${1}ping slave${i}”
ssh slave${i} “source /etc/profile;kafka-server-stop.sh”
done
fi
fi
**