linuxea:kafka在私有云DNAT环境中集群的典型应用

marksugar
2022-03-11 / 0 评论 / 2,006 阅读 / 正在检测是否收录...

今天配置的是一个2.5.0的一个kafka集群,新的版本将废弃zookeeper,今天不讨论新版本
有一个私有的云环境,业务需求希望通过公网向kafka发送数据,使用SCRAM-SHA-256加密,内网仍然需要能够正常访问, 而其中,需要通过DNAT的方式来映射内网端口暴漏给互联网。而在做映射的时候,必然是一个端口对一个端口的,于是,大致的示意拓扑如下

如果你不是这种方式,可以尝试Kafka 实现内外网访问流量分离来解决问题

image-20220311232204412.png

而在实际的生产中,你会发现,内网采用内网IP进行访问的时候,kafka是可以正常协商进行处理请求

而在公网通过6.78.5.32的9092,9093,9094端口访问的时候会出现出现一个问题,客户端当请求A通过6.78.5.32:9092发送,经过防火墙DNAT层后,发给后端kafka,而此时kafka收到消息后回复给发送者,而回复的时候是使用的172.16.100.7:9092端口,你的客户端根本就不认识172.16.100.7,因此发送失败

image-20220311233012743.png

而这个现象在你只是向kafka发送消息,而不在乎他是否返回的时候,代码层面显示是成功的,但是数据并未成功插入。于是,就有了另外一种方式

image-20220311234318947.png

消息发送后需要返回,服务端和客户端都分别写ip和hostname,通过域名和本地hosts的方式解析出ip,分别发送到代理服务器和客户端,而不是某一个固定的ip。无论来自公网的访问还是内网的访问,最终在本地的hosts各自指向一个可以被访问到的一个ip,从而完成响应。

这种形式在官网的某些字段中被解读为“防止中间人攻击”

如下

image-20220311234535042.png

  • version: kafka_2.12-2.5.0
  • jdk: 1.8.0_211

先决条件:

  • 同步时间
10 * * * * ntpdate ntp.aliyun.com
  • 修改hosts并本地hosts
#172.16.100.7
hostnamectl set-hostname   kafka1
#172.16.100.8
hostnamectl set-hostname   kafka2
#172.16.100.9
hostnamectl set-hostname   kafka3
172.16.100.7 kafka1
172.16.100.8 kafka2
172.16.100.9 kafka3

准备工作

二进制安装java,或者rpm安装即可

tar xf jdk-8u211-linux-x64.tar.gz -C /usr/local/
cd /usr/local && ln -s jdk1.8.0_211 java
cat > /etc/profile.d/java.sh <<EOF
export JAVA_HOME=/usr/local/java
export PATH=\$JAVA_HOME/bin:\$PATH 
export CLASSPATH=.:\$JAVA_HOME/lib/dt.jar:\$JAVA_HOME/lib/tools.jar 
EOF
source /etc/profile.d/java.sh

准备工作,所有节点创建目录和用户

DPATH=/data
mkdir ${DPATH}/zookeeper/logs ${DPATH}/kafka/ ${DPATH}/logs/ -p
groupadd -r -g 699 kafka
useradd -u 699 -s /sbin/nologin -c 'kafka server' -g kafka kafka -M
chown -R  kafka.kafka ${DPATH}

下载kafka_2.12-2.5.0,解压到/usr/local/下,创建软连接到当前的kafka

tar xf kafka_2.12-2.5.1.gz -C /usr/local/
cd /usr/local/
ln -s kafka_2.12-2.5.1 kafka

tar xf kafka_2.12-2.5.0.tgz -C /usr/local/
cd /usr/local/
ln -s kafka_2.12-2.5.0 kafka

/usr/local/kafka/config准备两个认证文件作为kafka认证

  • kafka_client_jaas.conf
#kafka客户端连接方式及生产者、消费者连接集群的用户密码
cat > /usr/local/kafka/config/kafka_client_jaas.conf << EOF
KafkaClient {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="marksugar"
    password="linuxea.com";
};
EOF
#kafka客户端连接方式及生产者、消费者连接集群的用户密码
cat > /usr/local/kafka/config/kafka_client_jaas.conf << EOF
KafkaClient {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="marksugar"
    password="linuolxloADMINXP[QP[1]]";
};
EOF

kafka_server_jaas.conf

markadmin用户作为超级管理员,这里的用户和密码文件是为了后面启动使用
cat > /usr/local/kafka/config/kafka_server_jaas.conf << EOF
KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="markadmin"
    password="MwMzA0MGZGFmOG"
    user_markadmin="markadmin";
};
EOF

zookeeper

备份

mv /usr/local/kafka/config/zookeeper.properties /usr/local/kafka/config/zookeeper.properties.bak

1, 在10.100.63.7修改配置文件

cat > /usr/local/kafka/config/zookeeper.properties << EOF
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
dataLogDir=/data/zookeeper/logs
clientPort=2181
server.0=172.16.100.7:2888:3888
server.1=172.16.100.8:2888:3888
server.2=172.16.100.9:2888:3888
EOF

2, 创建id。每台节点不一样

echo "0" > /data/zookeeper/myid

3, 启动脚本

cat > /etc/systemd/system/zookeeper.service << EOF
[Unit]
Description=ZooKeeper Service
After=network.target
After=network-online.target
Wants=network-online.target

[Service]
Environment=ZOO_LOG_DIR=/u01/data/zookeeper/logs
PIDFile=/data/zookeeper/zookeeper_server.pid
User=kafka
Group=kafka
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
#RestartSec=15
#LimitNOFILE=65536
#OOMScoreAdjust=-999
Type=simple
Restart=on-failure

[Install]
WantedBy=default.target
EOF

kafka

default.replication.factor=2 1不备份,2备份
num.network.threads=3  大于CPU+1
num.io.threads=8 cpu的两倍

1, 在10.100.63.7修改配置文件

我们新创建一个文件,不用原来的文件

我们直接配置一个advertised.listeners=SASL_PLAINTEXT://kakfa.linuxea.com:9092,kakfa.linuxea.com在本地hosts写入,写入的ip是代理的ip地址

假设不需要代理,而只是集群访问,则配置为当前的IP 即可

cat > /usr/local/kafka/config/server-scram.properties << EOF
broker.id=1
##### Socket Server Settings 监听协议和端口#######
listeners=SASL_PLAINTEXT://172.16.100.7:9092
advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9092
######### Log Basics ##########
#日志路径
log.dirs=/data/kafka/
#num.partitions=16
######## Zookeeper 集群信息 ##########
zookeeper.connect=172.16.100.7:2181,172.16.100.8:2181,172.16.100.9:2181
###### SCRAM Settings 认证部分########
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:markadmin;User:marksugar


num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

# 创建三个副本和分区
num.partitions=3
#auto.create.topics.enable=true
default.replication.factor=2
EOF

执行

cat > /usr/local/kafka/config/server-scram.properties << EOF
broker.id=0
listeners=SASL_PLAINTEXT://172.16.100.7:9092
advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9092
log.dirs=/data/kafka/
zookeeper.connect=172.16.100.7:2181,172.16.100.8:2181,172.16.100.9:2181
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:markadmin;User:marksugar
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
num.partitions=3
#auto.create.topics.enable=true
default.replication.factor=2
EOF

启动脚本

cat > /etc/systemd/system/kafka.service << EOF
[Unit]
Description=kafka Service
After=network.target syslog.target

[Service]
Environment=ZOO_LOG_DIR=/data/kafka/logs
SyslogIdentifier=kafka

# 添加limit参数
LimitFSIZE=infinity
LimitCPU=infinity
LimitAS=infinity
LimitMEMLOCK=infinity
LimitNOFILE=64000
LimitNPROC=64000


User=kafka
Group=kafka
Type=simple
Restart=on-failure
Environment="KAFKA_OPTS=-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf"
Environment="PATH=${PATH}:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server-scram.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh

[Install]
WantedBy=default.target
EOF

修改java配置,bin下的kafka-server-start.sh,配置内存大小,并且配置9999端口eagle

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    #export KAFKA_HEAP_OPTS="-server -Xms4G -Xmx4G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParalupancyPercent=70"
    export KAFKA_HEAP_OPTS="-server -Xms4G -Xmx4G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200" 
    export JMX_PORT="9999"
fi

配置

----- 将kafka程序目录复制和启动脚本172.16.100.8172.16.100.9/usr/local下,修改如下:

scp -r kafka_2.12-2.5.1 172.16.100.8:/usr/local/
scp -r kafka_2.12-2.5.1 172.16.100.9:/usr/local/
scp /etc/systemd/system/zookeeper.service 172.16.100.8:/etc/systemd/system/
scp /etc/systemd/system/zookeeper.service 172.16.100.9:/etc/systemd/system/
scp /etc/systemd/system/kafka.service 172.16.100.8:/etc/systemd/system/
scp /etc/systemd/system/kafka.service 172.16.100.9:/etc/systemd/system/

登录到172.16.100.8,172.16.100.9创建目录,做软连接

cd /usr/local &&  ln -s kafka_2.12-2.5.1 kafka
mkdir /u01/data/zookeeper/logs -p
mkdir -p /u01/data/kafka/
groupadd -r -g 699 kafka
useradd -u 699 -s /sbin/nologin -c 'kafka server' -g kafka kafka -M
mkdir /u01/data/logs/ -p
chown -R  kafka.kafka /u01/data/
chown -R /usr/local/kafka_2.12-2.5.1/ kafka.kafka*

根据

server.0=172.16.100.7:2888:3888
server.1=172.16.100.8:2888:3888
server.2=172.16.100.9:2888:3888

对应修改

172.16.100.8

echo "1" > /u01/data/zookeeper/myid

172.16.100.9

echo "2" > /u01/data/zookeeper/myid

  • kafka

修改server-scram.properties文件内容,这四项修改

172.16.100.8

broker.id=2
##### Socket Server Settings 监听协议和端口#######
listeners=SASL_PLAINTEXT://172.16.100.8:9093
advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9093

172.16.100.9

broker.id=3
##### Socket Server Settings 监听协议和端口#######
listeners=SASL_PLAINTEXT://172.16.100.9:9094
advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9094

172.16.100.8 kafka配置修改后如下

broker.id=2
##### Socket Server Settings 监听协议和端口#######
listeners=SASL_PLAINTEXT://172.16.100.8:9093
advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9093
######### Log Basics ##########
#日志路径
log.dirs=/u01/data/kafka/
#num.partitions=16
######## Zookeeper 集群信息 ##########
zookeeper.connect=172.16.100.7:2181,172.16.100.8:2181,172.16.100.9:2181
###### SCRAM Settings 认证部分########
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:markadmin;User:marksugar


num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

num.partitions=3
#auto.create.topics.enable=true
default.replication.factor=2

172.16.100.9 kafka配置修改后如下

broker.id=3
##### Socket Server Settings 监听协议和端口#######
listeners=SASL_PLAINTEXT://172.16.100.9:9094
advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9094
######### Log Basics ##########
#日志路径
log.dirs=/u01/data/kafka/
#num.partitions=16
######## Zookeeper 集群信息 ##########
zookeeper.connect=172.16.100.7:2181,172.16.100.8:2181,172.16.100.9:2181
###### SCRAM Settings 认证部分########
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:markadmin;User:marksugar


num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

num.partitions=3
#auto.create.topics.enable=true
default.replication.factor=2

zookeeper授权

先启动整个zookeeper集群,分别授权两个用户

  • 如果环境变量有问题可以在脚本/usr/local/kafka/bin/kafka-run-class.sh里面添加
JAVA_HOME=/usr/local/java
systemctl start zookeeper
systemctl enable zookeeper
systemctl status zookeeper

开始创建用户

创建语句

/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --alter \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=linuxea.com],SCRAM-SHA-256=[password=linuxea.com]' \
--entity-type users --entity-name marksugar

/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --alter \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=MwMzA0MGIwZjMwMjg3MjY4NWE2ZGFmOG],SCRAM-SHA-256=[password=MwMzA0MGIwZjMwMjg3MjY4NWE2ZGFmOG]' \
--entity-type users --entity-name markadmin
# 开始创建markadmin
[root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --alter \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=MwMzA0MGIwZjMwMjg3MjY4NWE2ZGFmOG],SCRAM-SHA-256=[password=MwMzA0MGIwZjMwMjg3MjY4NWE2ZGFmOG]' \
--entity-type users --entity-name markadmin
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Completed updating config for entity: user-principal 'markadmin'

# 开始创建marksugar
[root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --alter \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=linuxea.com],SCRAM-SHA-256=[password=linuxea.com]' \
--entity-type users --entity-name marksugar
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Completed updating config for entity: user-principal 'marksugar'.

查看所有SCRAM证书

/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users

如下

[root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Configs for user-principal 'markadmin' are SCRAM-SHA-256=salt=MWtxOG56cHNybGhoank1Nmg2M3dsa2JwZGw=,stored_key=G6nlglpSF0uQDskBmV3uOrpuGwEcFKfeTOcaIpuqINY=,server_key=pBEXAihvOLqAGzns2fbu2p96LqLVLM78clUAyftpMjg=,iterations=4096
Configs for user-principal 'marksugar' are SCRAM-SHA-256=salt=MTJnbGxpMWRzajZoMXRvcnBxcXF3b241MDY=,stored_key=mCocSbPBI0yPp12Kr9131nFDA6GIP11p++FQwp0+Ri4=,server_key=vzMqKkT+ZwaVWOryD2owVlMk5gMEaSW2wZI+s1x9Fd8=,iterations=4096

查看单个用户的证书

/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users --entity-name marksugar
/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users --entity-name markadmin

查看

[root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users --entity-name marksugar
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Configs for user-principal 'marksugar' are SCRAM-SHA-256=salt=MTJnbGxpMWRzajZoMXRvcnBxcXF3b241MDY=,stored_key=mCocSbPBI0yPp12Kr9131nFDA6GIP11p++FQwp0+Ri4=,server_key=vzMqKkT+ZwaVWOryD2owVlMk5gMEaSW2wZI+s1x9Fd8=,iterations=4096
[root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 10.100.63.9:2181 --describe --entity-type users --entity-name markadmin
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Configs for user-principal 'markadmin' are SCRAM-SHA-256=salt=MWtxOG56cHNybGhoank1Nmg2M3dsa2JwZGw=,stored_key=G6nlglpSF0uQDskBmV3uOrpuGwEcFKfeTOcaIpuqINY=,server_key=pBEXAihvOLqAGzns2fbu2p96LqLVLM78clUAyftpMjg=,iterations=4096

启动kafka

授权完成,启动第一台kafka。对目录进行授权

chown -R kafka.kafka /usr/local/kafka*

先手动启动测试是否正常

sudo -u kafka KAFKA_OPTS=-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server-scram.properties

观察日志/usr/local/kafka/logs/server.log,正常情况下能够看到如下提示已经启动

[2021-05-21 17:12:03,524] INFO [KafkaServer id=3] started (kafka.server.KafkaServer)

此时kafka需要配置hosts,hosts包含所有的主机名和代理主机名

172.16.100.7 kafka1
172.16.100.8 kafka2
172.16.100.9 kafka3
172.16.100.10 kafka.linuxea.com

如果没用问题配置开启启动

systemctl enable kafka
systemctl start kafka
systemctl status kafka

并且以此启动其他两台

验证用户

创建主题

/usr/local/kafka/bin/kafka-topics.sh --zookeeper 10.100.63.7:2181 --create --topic test --partitions 12  --replication-factor 3

发送消息

export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_client_jaas.conf"

cat producer.conf

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="xyt#*admin.com&!k4";

内网访问

/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 172.16.100.7:9092,172.16.100.8:9093,172.16.100.9:9094 --topic test  --producer.config producer.conf
> hello

远程

/usr/local/kafka/bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9093,kafka3:9094 --topic test  --producer.config producer.conf

消费消息

cat consumer.conf

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin.com&!k4";

内网访问

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.16.100.7:9092,172.16.100.8:9093,172.16.100.9:9094  --topic test --from-beginning --consumer.config consumer.conf
hello

远程

/usr/local/kafka/bin/kafka-console-consumer.sh --broker-list kafka1:9092,kafka2:9093,kafka3:9094 --topic test --from-beginning --consumer.config consumer.conf

构建代理层

nginx stream配置

stream {
log_format proxy '$remote_addr [$time_local]'
                '$protocol $status $bytes_sent $bytes_received'
                '$session_time "$upstream_addr" '
                '"$upstream_bytes_sent" "$upstream_bytes_received" "$upstream_connect_time"';

    upstream kafka1 {
        server kafka1:9092 weight=1;
    }
    server {
        listen 9092;
        proxy_pass kafka1;
        access_log /data/logs/9092.log proxy ;
    }

    upstream kafka2 {
        server kafka2:9093 weight=1;
    }
    server {
        listen 9093;
        proxy_pass kafka2;
        access_log /data/logs/9093.log proxy ;
    }

    upstream kafka3 {
        server kafka3:9094 weight=1;
    }
    server {
        listen 9094;
        proxy_pass kafka3;
        access_log /data/logs/9094.log proxy ;
    }
}

添加hosts,并且kafka节点也要如下配置

172.16.100.7 kafka1
172.16.100.8 kafka2
172.16.100.9 kafka3
172.16.100.10 kafka.linuxea.com

测试kafka连通性

测试节点也需要配置hosts指向proxy

172.16.100.10 kafka.linuxea.com

安装 python 3.8 ,并且安装confluent_kafka

pip install -i https://pypi.tuna.tsinghua.edu.cn/simple confluent_kafka 

And

pip install -i https://mirrors.aliyun.com/pypi/simple confluent_kafka

脚本如下

# !/usr/bin/python
# #encoding=utf-8

from confluent_kafka import Producer

import json
from datetime import datetime

"""
def producer_demo():
    # 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
    producer = KafkaProducer(bootstrap_servers=['IP:9092'],
                             security_protocol='SASL_PLAINTEXT',
                             #sasl_mechanism="SCRAM-SHA-256",
                             sasl_mechanism='PLAIN',
                             #sasl_kerberos_service_name='admin',
                             #sasl_kerberos_domain_name='hadoop.hadoop.com',
                             sasl_plain_username='admin',
                             sasl_plain_password="*admin.com",
                             #key_serializer=lambda k: json.dumps(k).encode('utf-8'),
                             value_serializer=lambda v: json.dumps(v).encode('utf-8')
                            # ,api_version=(0, 10)
                             )  # 连接kafka


    msg_dict = "Hello World".encode('utf-8')  # 发送内容,必须是bytes类型

    for i in range(0, 3):
        #msg = json.dumps(msg_dict)
        future = producer.send('test', msg_dict, partition=0)

        try:
            future.get(timeout=10)  # 监控是否发送成功
        except kafka_errors:  # 发送失败抛出kafka_errors
            traceback.format_exc()

    producer.close()

"""
def confluentKafkaDemo():
    topic_name = 'test'  ## 
    count = 100
    start = 0
    conf = {
        'bootstrap.servers': 'kafka.linuxea.com:9092,kafka.linuxea.com:9093,kafka.linuxea.com:9094',
        'security.protocol': 'SASL_PLAINTEXT',
        'sasl.mechanisms': 'SCRAM-SHA-256',
        'sasl.username': 'linuxea',  ## 用户名
        'sasl.password': 'MwMzA0MGFmOG'  ## 密码
    }
    producer = Producer(**conf)
    data = {
        'name': 'test1 is ok',
        'time': str(datetime.now())
    }

    try:
        while start < count:
            producer.produce(topic_name, (json.dumps(data)).encode(), callback=delivery_report)
            producer.flush()
            start = start+1
    except Exception as e:
        print(e)

def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


if __name__ == '__main__':
    #producer_demo()
    confluentKafkaDemo()

执行脚本查看是否插入成功

image-20220311235259397.png

kafka-eagle

kafka-eagle在被使用了用户验证的集群将能不能够正常使用,总会有一些瑕疵

kafka-eagle上仍然需要做hosts解析

172.16.100.7 kafka1
172.16.100.8 kafka2
172.16.100.9 kafka3

下载2.0.5

tar xf kafka-eagle-bin-2.0.5.tar.gz  -C /usr/local/
cd /usr/local/kafka-eagle-bin-2.0.5
tar xf kafka-eagle-web-2.0.5-bin.tar.gz
ln -s /usr/local/kafka-eagle-bin-2.0.5/ /usr/local/kafka-eagle
cp /usr/local/kafka-eagle/kafka-eagle-web-2.0.5/conf/system-config.properties /usr/local/kafka-eagle/kafka-eagle-web-2.0.5/conf/system-config.properties.bak
mkdir /data/kafka-eagle/db/ -p

为了方便,kafka-eagle必须修改hostname为ip地址

hostnamectl set-hostname 192.168.3.6

配置环境变量

cat > /etc/profile.d/kafka-eagle.sh <<EOF
export KE_HOME=/usr/local/kafka-eagle/kafka-eagle-web-2.0.5
export PATH=\$PATH:\$JAVA_HOME/bin:\$KE_HOME/bin
EOF
source /etc/profile.d/kafka-eagle.sh

java

export JAVA_HOME=/usr/local/jdk1.8.0_211
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH
export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH

创建ke.db

cd /data/kafka-eagle/db/ && sqlite3 ke.db

配置文件中删掉cluster2的配置。修改zk地址,sasl开启验证

######################################
# multi zookeeper & kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=172.16.100.7:2181,172.16.100.8:2181,172.16.100.9:2181

######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123

######################################
# broker size online list
######################################
cluster1.kafka.eagle.broker.size=20

######################################
# zk client thread limit
######################################
kafka.zk.limit.size=32

######################################
# kafka eagle webui port
######################################
kafka.eagle.webui.port=8048

######################################
# kafka jmx acl and ssl authenticate
######################################
cluster1.kafka.eagle.jmx.acl=false
cluster1.kafka.eagle.jmx.user=keadmin
cluster1.kafka.eagle.jmx.password=keadmin123
cluster1.kafka.eagle.jmx.ssl=false
cluster1.kafka.eagle.jmx.truststore.location=/Users/dengjie/workspace/ssl/certificates/kafka.truststore
cluster1.kafka.eagle.jmx.truststore.password=ke123456

######################################
# kafka offset storage
######################################
cluster1.kafka.eagle.offset.storage=kafka

######################################
# kafka jmx uri
######################################
cluster1.kafka.eagle.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi

######################################
# kafka metrics, 15 days by default
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.metrics.retain=15

######################################
# kafka sql topic records max
######################################
kafka.eagle.sql.topic.records.max=5000

######################################
# delete kafka topic token
######################################
kafka.eagle.topic.token=keadmin

######################################
# kafka sasl authenticate
######################################
cluster1.kafka.eagle.sasl.enable=true
cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256
cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="markadmin" password="markadmin.com";
cluster1.kafka.eagle.sasl.client.id=
cluster1.kafka.eagle.blacklist.topics=
cluster1.kafka.eagle.sasl.cgroup.enable=false
cluster1.kafka.eagle.sasl.cgroup.topics=


######################################
# kafka sqlite jdbc driver address
######################################
kafka.eagle.driver=org.sqlite.JDBC
kafka.eagle.url=jdbc:sqlite:/data/kafka-eagle/db2.0.5/ke.db
kafka.eagle.username=root
kafka.eagle.password=www.kafka-eagle.org

启动

/usr/local/kafka-eagle/kafka-eagle-web-2.0.5/bin/ke.sh start

20210522224349.png

kafka增加副本

{
"topics": [
{"topic": "linuxea_position_shaanxi_1"}
],
"version": 1
}
[root@linuxea06 bin]#  ./kafka-reassign-partitions.sh --zookeeper 172.16.100.9:2181 --topics-to-move-json-file linuxea_position_shaanxi_1.json --broker-list "0,1,2,3,4,5" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"linuxea_position_shaanxi_1","partition":3,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":5,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":4,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":2,"replicas":[2,1,3],"log_dirs":["any","any","any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"linuxea_position_shaanxi_1","partition":3,"replicas":[3,2,1],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":5,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":2,"replicas":[2,3,1],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"linuxea_position_shaanxi_1","partition":4,"replicas":[1,3,2],"log_dirs":["any","any","any"]}]}

复制

{"version":1,"partitions":[
{"topic":"linuxea_position_shaanxi_1","partition":3,"replicas":[3,2,1],"log_dirs":["any","any","any"]},
{"topic":"linuxea_position_shaanxi_1","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]},
{"topic":"linuxea_position_shaanxi_1","partition":5,"replicas":[2,1,3],"log_dirs":["any","any","any"]},
{"topic":"linuxea_position_shaanxi_1","partition":2,"replicas":[2,3,1],"log_dirs":["any","any","any"]},
{"topic":"linuxea_position_shaanxi_1","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},
{"topic":"linuxea_position_shaanxi_1","partition":4,"replicas":[1,3,2],"log_dirs":["any","any","any"]}]}

执行

./kafka-reassign-partitions.sh --zookeeper   172.16.100.9:2181  --reassignment-json-file linuxea_position_shaanxi_1.json -execute

查看

[root@linuxea06 bin]# ./kafka-reassign-partitions.sh --zookeeper   172.16.100.9:2181  --reassignment-json-file linuxea_position_shaanxi_1.json --verify
Status of partition reassignment:
Reassignment of partition linuxea_position_shaanxi_1-3 completed successfully
Reassignment of partition linuxea_position_shaanxi_1-0 completed successfully
Reassignment of partition linuxea_position_shaanxi_1-5 completed successfully
Reassignment of partition linuxea_position_shaanxi_1-4 completed successfully
Reassignment of partition linuxea_position_shaanxi_1-1 completed successfully
Reassignment of partition linuxea_position_shaanxi_1-2 completed successfully
[root@linuxea06 bin]#

kafka删除topics

leder列出

/usr/local/kafka/bin/kafka-topics.sh --list  --zookeeper 127.0.0.1:2181

删除

/usr/local/kafka/bin/kafka-topics.sh --delete  --zookeeper 17.168.0.174:2181  --topic test1
Topic test1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

创建

节点数:--replication-factor 3

分区: --partitions 18

/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 18 --topic topic1

删除group

我们使用kafka-consumer-groups.sh列出consumer

[root@linuxea06 bin]# ./kafka-consumer-groups.sh --bootstrap-server 172.16.100.9:9092 --list  --command-config ../config/admin.conf
linuxea-python-consumer-group-position_3
spring-boot-group-position-agg-linuxea-pre
spring-boot-group-position-agg-linuxea-1
linuxea-consumer-third-party
spring-boot-alarm-outage-shaanxi-group1
spring-boot-alarm-offlinenotask-linuxea-group1
spring-boot-group-position-linuxea-2
spring-boot-alarm-outage-linuxea-group1

开始删除

./kafka-consumer-groups.sh \
  --bootstrap-server <bootstrap-server-url> \
  --delete-offsets \
  --group linuxea_position_shaanxi_1 \
  --topic spring-boot-group-position-agg-linuxea-1

报错解决

Error: Executing consumer group command failed due to org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
java.util.concurrent.ExecutionException: org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
    at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.listGroups(ConsumerGroupCommand.scala:180)
    at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:61)
    at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
Caused by: org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
    at org.apache.kafka.clients.admin.KafkaAdminClient$23.handleFailure(KafkaAdminClient.java:2773)
    at org.apache.kafka.clients.admin.KafkaAdminClient$Call.fail(KafkaAdminClient.java:641)
    at org.apache.kafka.clients.admin.KafkaAdminClient$TimeoutProcessor.handleTimeouts(KafkaAdminClient.java:757)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.timeoutPendingCalls(KafkaAdminClient.java:825)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1119)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

添加认证信息

 --command-config ../config/admin.conf

参考

kafka-consumer-group-script-to-see-all-consumer-group-not-working
how-to-remove-a-kafka-consumer-group-from-a-specific-topic
kafka-consumer-groups.sh消费者组管理
Kafka 实现内外网访问流量分离

0

评论

博主关闭了所有页面的评论