今天配置的是一个2.5.0的一个kafka集群,新的版本将废弃zookeeper,今天不讨论新版本
有一个私有的云环境,业务需求希望通过公网向kafka发送数据,使用SCRAM-SHA-256加密,内网仍然需要能够正常访问, 而其中,需要通过DNAT的方式来映射内网端口暴漏给互联网。而在做映射的时候,必然是一个端口对一个端口的,于是,大致的示意拓扑如下
如果你不是这种方式,可以尝试Kafka 实现内外网访问流量分离来解决问题
而在实际的生产中,你会发现,内网采用内网IP进行访问的时候,kafka是可以正常协商进行处理请求
而在公网通过6.78.5.32的9092,9093,9094
端口访问的时候会出现出现一个问题,客户端当请求A通过6.78.5.32:9092
发送,经过防火墙DNAT层后,发给后端kafka,而此时kafka收到消息后回复给发送者,而回复的时候是使用的172.16.100.7:9092
端口,你的客户端根本就不认识172.16.100.7
,因此发送失败
而这个现象在你只是向kafka发送消息,而不在乎他是否返回的时候,代码层面显示是成功的,但是数据并未成功插入。于是,就有了另外一种方式
消息发送后需要返回,服务端和客户端都分别写ip和hostname,通过域名和本地hosts的方式解析出ip,分别发送到代理服务器和客户端,而不是某一个固定的ip。无论来自公网的访问还是内网的访问,最终在本地的hosts各自指向一个可以被访问到的一个ip,从而完成响应。
这种形式在官网的某些字段中被解读为“防止中间人攻击”
如下
- version: kafka_2.12-2.5.0
- jdk: 1.8.0_211
先决条件:
- 同步时间
10 * * * * ntpdate ntp.aliyun.com
- 修改hosts并本地hosts
#172.16.100.7
hostnamectl set-hostname kafka1
#172.16.100.8
hostnamectl set-hostname kafka2
#172.16.100.9
hostnamectl set-hostname kafka3
172.16.100.7 kafka1
172.16.100.8 kafka2
172.16.100.9 kafka3
准备工作
二进制安装java,或者rpm安装即可
tar xf jdk-8u211-linux-x64.tar.gz -C /usr/local/
cd /usr/local && ln -s jdk1.8.0_211 java
cat > /etc/profile.d/java.sh <<EOF
export JAVA_HOME=/usr/local/java
export PATH=\$JAVA_HOME/bin:\$PATH
export CLASSPATH=.:\$JAVA_HOME/lib/dt.jar:\$JAVA_HOME/lib/tools.jar
EOF
source /etc/profile.d/java.sh
准备工作,所有节点创建目录和用户
DPATH=/data
mkdir ${DPATH}/zookeeper/logs ${DPATH}/kafka/ ${DPATH}/logs/ -p
groupadd -r -g 699 kafka
useradd -u 699 -s /sbin/nologin -c 'kafka server' -g kafka kafka -M
chown -R kafka.kafka ${DPATH}
下载kafka_2.12-2.5.0,解压到/usr/local/下,创建软连接到当前的kafka
tar xf kafka_2.12-2.5.1.gz -C /usr/local/
cd /usr/local/
ln -s kafka_2.12-2.5.1 kafka
tar xf kafka_2.12-2.5.0.tgz -C /usr/local/
cd /usr/local/
ln -s kafka_2.12-2.5.0 kafka
/usr/local/kafka/config
准备两个认证文件作为kafka认证
- kafka_client_jaas.conf
#kafka客户端连接方式及生产者、消费者连接集群的用户密码
cat > /usr/local/kafka/config/kafka_client_jaas.conf << EOF
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="marksugar"
password="linuxea.com";
};
EOF
#kafka客户端连接方式及生产者、消费者连接集群的用户密码
cat > /usr/local/kafka/config/kafka_client_jaas.conf << EOF
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="marksugar"
password="linuolxloADMINXP[QP[1]]";
};
EOF
kafka_server_jaas.conf
markadmin用户作为超级管理员,这里的用户和密码文件是为了后面启动使用
cat > /usr/local/kafka/config/kafka_server_jaas.conf << EOF
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="markadmin"
password="MwMzA0MGZGFmOG"
user_markadmin="markadmin";
};
EOF
zookeeper
备份
mv /usr/local/kafka/config/zookeeper.properties /usr/local/kafka/config/zookeeper.properties.bak
1, 在10.100.63.7修改配置文件
cat > /usr/local/kafka/config/zookeeper.properties << EOF
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
dataLogDir=/data/zookeeper/logs
clientPort=2181
server.0=172.16.100.7:2888:3888
server.1=172.16.100.8:2888:3888
server.2=172.16.100.9:2888:3888
EOF
2, 创建id。每台节点不一样
echo "0" > /data/zookeeper/myid
3, 启动脚本
cat > /etc/systemd/system/zookeeper.service << EOF
[Unit]
Description=ZooKeeper Service
After=network.target
After=network-online.target
Wants=network-online.target
[Service]
Environment=ZOO_LOG_DIR=/u01/data/zookeeper/logs
PIDFile=/data/zookeeper/zookeeper_server.pid
User=kafka
Group=kafka
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
#RestartSec=15
#LimitNOFILE=65536
#OOMScoreAdjust=-999
Type=simple
Restart=on-failure
[Install]
WantedBy=default.target
EOF
kafka
default.replication.factor=2 1不备份,2备份
num.network.threads=3 大于CPU+1
num.io.threads=8 cpu的两倍
1, 在10.100.63.7修改配置文件
我们新创建一个文件,不用原来的文件
我们直接配置一个advertised.listeners=SASL_PLAINTEXT://kakfa.linuxea.com:9092,kakfa.linuxea.com在本地hosts写入,写入的ip是代理的ip地址
假设不需要代理,而只是集群访问,则配置为当前的IP 即可
cat > /usr/local/kafka/config/server-scram.properties << EOF
broker.id=1
##### Socket Server Settings 监听协议和端口#######
listeners=SASL_PLAINTEXT://172.16.100.7:9092
advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9092
######### Log Basics ##########
#日志路径
log.dirs=/data/kafka/
#num.partitions=16
######## Zookeeper 集群信息 ##########
zookeeper.connect=172.16.100.7:2181,172.16.100.8:2181,172.16.100.9:2181
###### SCRAM Settings 认证部分########
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:markadmin;User:marksugar
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
# 创建三个副本和分区
num.partitions=3
#auto.create.topics.enable=true
default.replication.factor=2
EOF
执行
cat > /usr/local/kafka/config/server-scram.properties << EOF
broker.id=0
listeners=SASL_PLAINTEXT://172.16.100.7:9092
advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9092
log.dirs=/data/kafka/
zookeeper.connect=172.16.100.7:2181,172.16.100.8:2181,172.16.100.9:2181
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:markadmin;User:marksugar
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
num.partitions=3
#auto.create.topics.enable=true
default.replication.factor=2
EOF
启动脚本
cat > /etc/systemd/system/kafka.service << EOF
[Unit]
Description=kafka Service
After=network.target syslog.target
[Service]
Environment=ZOO_LOG_DIR=/data/kafka/logs
SyslogIdentifier=kafka
# 添加limit参数
LimitFSIZE=infinity
LimitCPU=infinity
LimitAS=infinity
LimitMEMLOCK=infinity
LimitNOFILE=64000
LimitNPROC=64000
User=kafka
Group=kafka
Type=simple
Restart=on-failure
Environment="KAFKA_OPTS=-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf"
Environment="PATH=${PATH}:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server-scram.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
[Install]
WantedBy=default.target
EOF
修改java配置,bin下的kafka-server-start.sh,配置内存大小,并且配置9999端口eagle
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
#export KAFKA_HEAP_OPTS="-server -Xms4G -Xmx4G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParalupancyPercent=70"
export KAFKA_HEAP_OPTS="-server -Xms4G -Xmx4G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200"
export JMX_PORT="9999"
fi
配置
----- 将kafka程序目录复制和启动脚本到172.16.100.8和172.16.100.9的/usr/local下,修改如下:
scp -r kafka_2.12-2.5.1 172.16.100.8:/usr/local/
scp -r kafka_2.12-2.5.1 172.16.100.9:/usr/local/
scp /etc/systemd/system/zookeeper.service 172.16.100.8:/etc/systemd/system/
scp /etc/systemd/system/zookeeper.service 172.16.100.9:/etc/systemd/system/
scp /etc/systemd/system/kafka.service 172.16.100.8:/etc/systemd/system/
scp /etc/systemd/system/kafka.service 172.16.100.9:/etc/systemd/system/
登录到172.16.100.8,172.16.100.9创建目录,做软连接
cd /usr/local && ln -s kafka_2.12-2.5.1 kafka
mkdir /u01/data/zookeeper/logs -p
mkdir -p /u01/data/kafka/
groupadd -r -g 699 kafka
useradd -u 699 -s /sbin/nologin -c 'kafka server' -g kafka kafka -M
mkdir /u01/data/logs/ -p
chown -R kafka.kafka /u01/data/
chown -R /usr/local/kafka_2.12-2.5.1/ kafka.kafka*
根据
server.0=172.16.100.7:2888:3888
server.1=172.16.100.8:2888:3888
server.2=172.16.100.9:2888:3888
对应修改
172.16.100.8
echo "1" > /u01/data/zookeeper/myid
172.16.100.9
echo "2" > /u01/data/zookeeper/myid
- kafka
修改server-scram.properties文件内容,这四项修改
172.16.100.8
broker.id=2
##### Socket Server Settings 监听协议和端口#######
listeners=SASL_PLAINTEXT://172.16.100.8:9093
advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9093
172.16.100.9
broker.id=3
##### Socket Server Settings 监听协议和端口#######
listeners=SASL_PLAINTEXT://172.16.100.9:9094
advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9094
172.16.100.8 kafka配置修改后如下
broker.id=2
##### Socket Server Settings 监听协议和端口#######
listeners=SASL_PLAINTEXT://172.16.100.8:9093
advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9093
######### Log Basics ##########
#日志路径
log.dirs=/u01/data/kafka/
#num.partitions=16
######## Zookeeper 集群信息 ##########
zookeeper.connect=172.16.100.7:2181,172.16.100.8:2181,172.16.100.9:2181
###### SCRAM Settings 认证部分########
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:markadmin;User:marksugar
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
num.partitions=3
#auto.create.topics.enable=true
default.replication.factor=2
172.16.100.9 kafka配置修改后如下
broker.id=3
##### Socket Server Settings 监听协议和端口#######
listeners=SASL_PLAINTEXT://172.16.100.9:9094
advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9094
######### Log Basics ##########
#日志路径
log.dirs=/u01/data/kafka/
#num.partitions=16
######## Zookeeper 集群信息 ##########
zookeeper.connect=172.16.100.7:2181,172.16.100.8:2181,172.16.100.9:2181
###### SCRAM Settings 认证部分########
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:markadmin;User:marksugar
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
num.partitions=3
#auto.create.topics.enable=true
default.replication.factor=2
zookeeper授权
先启动整个zookeeper集群,分别授权两个用户
- 如果环境变量有问题可以在脚本/usr/local/kafka/bin/kafka-run-class.sh里面添加
JAVA_HOME=/usr/local/java
systemctl start zookeeper
systemctl enable zookeeper
systemctl status zookeeper
开始创建用户
创建语句
/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --alter \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=linuxea.com],SCRAM-SHA-256=[password=linuxea.com]' \
--entity-type users --entity-name marksugar
/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --alter \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=MwMzA0MGIwZjMwMjg3MjY4NWE2ZGFmOG],SCRAM-SHA-256=[password=MwMzA0MGIwZjMwMjg3MjY4NWE2ZGFmOG]' \
--entity-type users --entity-name markadmin
# 开始创建markadmin
[root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --alter \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=MwMzA0MGIwZjMwMjg3MjY4NWE2ZGFmOG],SCRAM-SHA-256=[password=MwMzA0MGIwZjMwMjg3MjY4NWE2ZGFmOG]' \
--entity-type users --entity-name markadmin
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Completed updating config for entity: user-principal 'markadmin'
# 开始创建marksugar
[root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --alter \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=linuxea.com],SCRAM-SHA-256=[password=linuxea.com]' \
--entity-type users --entity-name marksugar
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Completed updating config for entity: user-principal 'marksugar'.
查看所有SCRAM证书
/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users
如下
[root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Configs for user-principal 'markadmin' are SCRAM-SHA-256=salt=MWtxOG56cHNybGhoank1Nmg2M3dsa2JwZGw=,stored_key=G6nlglpSF0uQDskBmV3uOrpuGwEcFKfeTOcaIpuqINY=,server_key=pBEXAihvOLqAGzns2fbu2p96LqLVLM78clUAyftpMjg=,iterations=4096
Configs for user-principal 'marksugar' are SCRAM-SHA-256=salt=MTJnbGxpMWRzajZoMXRvcnBxcXF3b241MDY=,stored_key=mCocSbPBI0yPp12Kr9131nFDA6GIP11p++FQwp0+Ri4=,server_key=vzMqKkT+ZwaVWOryD2owVlMk5gMEaSW2wZI+s1x9Fd8=,iterations=4096
查看单个用户的证书
/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users --entity-name marksugar
/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users --entity-name markadmin
查看
[root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users --entity-name marksugar
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Configs for user-principal 'marksugar' are SCRAM-SHA-256=salt=MTJnbGxpMWRzajZoMXRvcnBxcXF3b241MDY=,stored_key=mCocSbPBI0yPp12Kr9131nFDA6GIP11p++FQwp0+Ri4=,server_key=vzMqKkT+ZwaVWOryD2owVlMk5gMEaSW2wZI+s1x9Fd8=,iterations=4096
[root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 10.100.63.9:2181 --describe --entity-type users --entity-name markadmin
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Configs for user-principal 'markadmin' are SCRAM-SHA-256=salt=MWtxOG56cHNybGhoank1Nmg2M3dsa2JwZGw=,stored_key=G6nlglpSF0uQDskBmV3uOrpuGwEcFKfeTOcaIpuqINY=,server_key=pBEXAihvOLqAGzns2fbu2p96LqLVLM78clUAyftpMjg=,iterations=4096
启动kafka
授权完成,启动第一台kafka。对目录进行授权
chown -R kafka.kafka /usr/local/kafka*
先手动启动测试是否正常
sudo -u kafka KAFKA_OPTS=-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server-scram.properties
观察日志/usr/local/kafka/logs/server.log,正常情况下能够看到如下提示已经启动
[2021-05-21 17:12:03,524] INFO [KafkaServer id=3] started (kafka.server.KafkaServer)
此时kafka需要配置hosts,hosts包含所有的主机名和代理主机名
172.16.100.7 kafka1
172.16.100.8 kafka2
172.16.100.9 kafka3
172.16.100.10 kafka.linuxea.com
如果没用问题配置开启启动
systemctl enable kafka
systemctl start kafka
systemctl status kafka
并且以此启动其他两台
验证用户
创建主题
/usr/local/kafka/bin/kafka-topics.sh --zookeeper 10.100.63.7:2181 --create --topic test --partitions 12 --replication-factor 3
发送消息
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_client_jaas.conf"
cat producer.conf
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="xyt#*admin.com&!k4";
内网访问
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 172.16.100.7:9092,172.16.100.8:9093,172.16.100.9:9094 --topic test --producer.config producer.conf
> hello
远程
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9093,kafka3:9094 --topic test --producer.config producer.conf
消费消息
cat consumer.conf
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin.com&!k4";
内网访问
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.16.100.7:9092,172.16.100.8:9093,172.16.100.9:9094 --topic test --from-beginning --consumer.config consumer.conf
hello
远程
/usr/local/kafka/bin/kafka-console-consumer.sh --broker-list kafka1:9092,kafka2:9093,kafka3:9094 --topic test --from-beginning --consumer.config consumer.conf
构建代理层
nginx stream配置
stream {
log_format proxy '$remote_addr [$time_local]'
'$protocol $status $bytes_sent $bytes_received'
'$session_time "$upstream_addr" '
'"$upstream_bytes_sent" "$upstream_bytes_received" "$upstream_connect_time"';
upstream kafka1 {
server kafka1:9092 weight=1;
}
server {
listen 9092;
proxy_pass kafka1;
access_log /data/logs/9092.log proxy ;
}
upstream kafka2 {
server kafka2:9093 weight=1;
}
server {
listen 9093;
proxy_pass kafka2;
access_log /data/logs/9093.log proxy ;
}
upstream kafka3 {
server kafka3:9094 weight=1;
}
server {
listen 9094;
proxy_pass kafka3;
access_log /data/logs/9094.log proxy ;
}
}
添加hosts,并且kafka节点也要如下配置
172.16.100.7 kafka1
172.16.100.8 kafka2
172.16.100.9 kafka3
172.16.100.10 kafka.linuxea.com
测试kafka连通性
测试节点也需要配置hosts指向proxy
172.16.100.10 kafka.linuxea.com
安装 python 3.8 ,并且安装confluent_kafka
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple confluent_kafka
And
pip install -i https://mirrors.aliyun.com/pypi/simple confluent_kafka
脚本如下
# !/usr/bin/python
# #encoding=utf-8
from confluent_kafka import Producer
import json
from datetime import datetime
"""
def producer_demo():
# 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
producer = KafkaProducer(bootstrap_servers=['IP:9092'],
security_protocol='SASL_PLAINTEXT',
#sasl_mechanism="SCRAM-SHA-256",
sasl_mechanism='PLAIN',
#sasl_kerberos_service_name='admin',
#sasl_kerberos_domain_name='hadoop.hadoop.com',
sasl_plain_username='admin',
sasl_plain_password="*admin.com",
#key_serializer=lambda k: json.dumps(k).encode('utf-8'),
value_serializer=lambda v: json.dumps(v).encode('utf-8')
# ,api_version=(0, 10)
) # 连接kafka
msg_dict = "Hello World".encode('utf-8') # 发送内容,必须是bytes类型
for i in range(0, 3):
#msg = json.dumps(msg_dict)
future = producer.send('test', msg_dict, partition=0)
try:
future.get(timeout=10) # 监控是否发送成功
except kafka_errors: # 发送失败抛出kafka_errors
traceback.format_exc()
producer.close()
"""
def confluentKafkaDemo():
topic_name = 'test' ##
count = 100
start = 0
conf = {
'bootstrap.servers': 'kafka.linuxea.com:9092,kafka.linuxea.com:9093,kafka.linuxea.com:9094',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanisms': 'SCRAM-SHA-256',
'sasl.username': 'linuxea', ## 用户名
'sasl.password': 'MwMzA0MGFmOG' ## 密码
}
producer = Producer(**conf)
data = {
'name': 'test1 is ok',
'time': str(datetime.now())
}
try:
while start < count:
producer.produce(topic_name, (json.dumps(data)).encode(), callback=delivery_report)
producer.flush()
start = start+1
except Exception as e:
print(e)
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
if __name__ == '__main__':
#producer_demo()
confluentKafkaDemo()
执行脚本查看是否插入成功
kafka-eagle
kafka-eagle在被使用了用户验证的集群将能不能够正常使用,总会有一些瑕疵
kafka-eagle上仍然需要做hosts解析
172.16.100.7 kafka1
172.16.100.8 kafka2
172.16.100.9 kafka3
下载2.0.5
tar xf kafka-eagle-bin-2.0.5.tar.gz -C /usr/local/
cd /usr/local/kafka-eagle-bin-2.0.5
tar xf kafka-eagle-web-2.0.5-bin.tar.gz
ln -s /usr/local/kafka-eagle-bin-2.0.5/ /usr/local/kafka-eagle
cp /usr/local/kafka-eagle/kafka-eagle-web-2.0.5/conf/system-config.properties /usr/local/kafka-eagle/kafka-eagle-web-2.0.5/conf/system-config.properties.bak
mkdir /data/kafka-eagle/db/ -p
为了方便,kafka-eagle必须修改hostname为ip地址
hostnamectl set-hostname 192.168.3.6
配置环境变量
cat > /etc/profile.d/kafka-eagle.sh <<EOF
export KE_HOME=/usr/local/kafka-eagle/kafka-eagle-web-2.0.5
export PATH=\$PATH:\$JAVA_HOME/bin:\$KE_HOME/bin
EOF
source /etc/profile.d/kafka-eagle.sh
java
export JAVA_HOME=/usr/local/jdk1.8.0_211
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH
export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH
创建ke.db
cd /data/kafka-eagle/db/ && sqlite3 ke.db
配置文件中删掉cluster2的配置。修改zk地址,sasl开启验证
######################################
# multi zookeeper & kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=172.16.100.7:2181,172.16.100.8:2181,172.16.100.9:2181
######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123
######################################
# broker size online list
######################################
cluster1.kafka.eagle.broker.size=20
######################################
# zk client thread limit
######################################
kafka.zk.limit.size=32
######################################
# kafka eagle webui port
######################################
kafka.eagle.webui.port=8048
######################################
# kafka jmx acl and ssl authenticate
######################################
cluster1.kafka.eagle.jmx.acl=false
cluster1.kafka.eagle.jmx.user=keadmin
cluster1.kafka.eagle.jmx.password=keadmin123
cluster1.kafka.eagle.jmx.ssl=false
cluster1.kafka.eagle.jmx.truststore.location=/Users/dengjie/workspace/ssl/certificates/kafka.truststore
cluster1.kafka.eagle.jmx.truststore.password=ke123456
######################################
# kafka offset storage
######################################
cluster1.kafka.eagle.offset.storage=kafka
######################################
# kafka jmx uri
######################################
cluster1.kafka.eagle.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
######################################
# kafka metrics, 15 days by default
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.metrics.retain=15
######################################
# kafka sql topic records max
######################################
kafka.eagle.sql.topic.records.max=5000
######################################
# delete kafka topic token
######################################
kafka.eagle.topic.token=keadmin
######################################
# kafka sasl authenticate
######################################
cluster1.kafka.eagle.sasl.enable=true
cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256
cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="markadmin" password="markadmin.com";
cluster1.kafka.eagle.sasl.client.id=
cluster1.kafka.eagle.blacklist.topics=
cluster1.kafka.eagle.sasl.cgroup.enable=false
cluster1.kafka.eagle.sasl.cgroup.topics=
######################################
# kafka sqlite jdbc driver address
######################################
kafka.eagle.driver=org.sqlite.JDBC
kafka.eagle.url=jdbc:sqlite:/data/kafka-eagle/db2.0.5/ke.db
kafka.eagle.username=root
kafka.eagle.password=www.kafka-eagle.org
启动
/usr/local/kafka-eagle/kafka-eagle-web-2.0.5/bin/ke.sh start
kafka增加副本
{
"topics": [
{"topic": "linuxea_position_shaanxi_1"}
],
"version": 1
}
[root@linuxea06 bin]# ./kafka-reassign-partitions.sh --zookeeper 172.16.100.9:2181 --topics-to-move-json-file linuxea_position_shaanxi_1.json --broker-list "0,1,2,3,4,5" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"linuxea_position_shaanxi_1","partition":3,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":5,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":4,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":2,"replicas":[2,1,3],"log_dirs":["any","any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"linuxea_position_shaanxi_1","partition":3,"replicas":[3,2,1],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":5,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":2,"replicas":[2,3,1],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":4,"replicas":[1,3,2],"log_dirs":["any","any","any"]}]}
复制
{"version":1,"partitions":[
{"topic":"linuxea_position_shaanxi_1","partition":3,"replicas":[3,2,1],"log_dirs":["any","any","any"]},
{"topic":"linuxea_position_shaanxi_1","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]},
{"topic":"linuxea_position_shaanxi_1","partition":5,"replicas":[2,1,3],"log_dirs":["any","any","any"]},
{"topic":"linuxea_position_shaanxi_1","partition":2,"replicas":[2,3,1],"log_dirs":["any","any","any"]},
{"topic":"linuxea_position_shaanxi_1","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},
{"topic":"linuxea_position_shaanxi_1","partition":4,"replicas":[1,3,2],"log_dirs":["any","any","any"]}]}
执行
./kafka-reassign-partitions.sh --zookeeper 172.16.100.9:2181 --reassignment-json-file linuxea_position_shaanxi_1.json -execute
查看
[root@linuxea06 bin]# ./kafka-reassign-partitions.sh --zookeeper 172.16.100.9:2181 --reassignment-json-file linuxea_position_shaanxi_1.json --verify
Status of partition reassignment:
Reassignment of partition linuxea_position_shaanxi_1-3 completed successfully
Reassignment of partition linuxea_position_shaanxi_1-0 completed successfully
Reassignment of partition linuxea_position_shaanxi_1-5 completed successfully
Reassignment of partition linuxea_position_shaanxi_1-4 completed successfully
Reassignment of partition linuxea_position_shaanxi_1-1 completed successfully
Reassignment of partition linuxea_position_shaanxi_1-2 completed successfully
[root@linuxea06 bin]#
kafka删除topics
leder列出
/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
删除
/usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 17.168.0.174:2181 --topic test1
Topic test1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
创建
节点数:--replication-factor 3
分区: --partitions 18
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 18 --topic topic1
删除group
我们使用kafka-consumer-groups.sh列出consumer
[root@linuxea06 bin]# ./kafka-consumer-groups.sh --bootstrap-server 172.16.100.9:9092 --list --command-config ../config/admin.conf
linuxea-python-consumer-group-position_3
spring-boot-group-position-agg-linuxea-pre
spring-boot-group-position-agg-linuxea-1
linuxea-consumer-third-party
spring-boot-alarm-outage-shaanxi-group1
spring-boot-alarm-offlinenotask-linuxea-group1
spring-boot-group-position-linuxea-2
spring-boot-alarm-outage-linuxea-group1
开始删除
./kafka-consumer-groups.sh \
--bootstrap-server <bootstrap-server-url> \
--delete-offsets \
--group linuxea_position_shaanxi_1 \
--topic spring-boot-group-position-agg-linuxea-1
报错解决
Error: Executing consumer group command failed due to org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
java.util.concurrent.ExecutionException: org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.listGroups(ConsumerGroupCommand.scala:180)
at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:61)
at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
Caused by: org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
at org.apache.kafka.clients.admin.KafkaAdminClient$23.handleFailure(KafkaAdminClient.java:2773)
at org.apache.kafka.clients.admin.KafkaAdminClient$Call.fail(KafkaAdminClient.java:641)
at org.apache.kafka.clients.admin.KafkaAdminClient$TimeoutProcessor.handleTimeouts(KafkaAdminClient.java:757)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.timeoutPendingCalls(KafkaAdminClient.java:825)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1119)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
添加认证信息
--command-config ../config/admin.conf
参考
kafka-consumer-group-script-to-see-all-consumer-group-not-working
how-to-remove-a-kafka-consumer-group-from-a-specific-topic
kafka-consumer-groups.sh消费者组管理
Kafka 实现内外网访问流量分离
评论