返回顶部
首页 > 资讯 > 后端开发 > Python >Kafka 集群配置SASL+ACL
  • 179
分享到

Kafka 集群配置SASL+ACL

集群KafkaACL 2023-01-31 08:01:43 179人浏览 安东尼

Python 官方文档:入门教程 => 点击学习

摘要

在kafka0.9版本之前,Kafka集群时没有安全机制的。Kafka Client应用可以通过连接ZooKeeper地址,例如zk1:2181:zk2:2181,zk3:2181等。来获取存储在Zookeeper中的Kafka元数据信息。

kafka0.9版本之前,Kafka集群时没有安全机制的。Kafka Client应用可以通过连接ZooKeeper地址,例如zk1:2181:zk2:2181,zk3:2181等。来获取存储在Zookeeper中的Kafka元数据信息。拿到Kafka Broker地址后,连接到Kafka集群,就可以操作集群上的所有主题了。由于没有权限控制,集群核心的业务主题时存在风险的。

本文主要使用SASL+ACL

配置文件

修改broker启动所需的server.properties文件,你至少需要配置(或修改)以下这些参数:

listeners=SASL_PLaiNTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://$advertised_hostname:9092
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
allow.everyone.if.no.acl.found=true
super.users=User:admin


 

其他参数讲解,请参考链接:

https://www.cnblogs.com/xiao987334176/p/10065844.html

 

这里主要讲解几个重点参数

 

默认情况下,如果资源R没有关联acl,除了超级用户,没有用户允许访问。如果你想改变这种方式你可以做如下配置

allow.everyone.if.no.acl.found=true

什么意思呢?上面的配置已经启动了acl,除了超级用户之外,其他用户无法访问。那么问题就来了,在kafka集群中,其它节点需要同步数据,需要相互访问。

它默认会使用ANONYMOUS的用户名连接集群。在这种情况下,启动kafka集群,必然失败!所以这个参数一定要配置才行!

 

listeners=SASL_PLAINTEXT://:9092

这个参数,表示kafka监听的地址。此参数必须要配置,默认是注释掉的。默认会使用listeners=PLAINTEXT://:9092,但是我现在开启了SASL,必须使用SASL协议连接才行。

//:9092 这里虽然没有写IP地址,根据官方解释,它会监听所有IP。注意:这里只能是IP地址,不能是域名。否则启动时,会提示无法绑定IP。

 

advertised.listeners 这个参数,表示外部的连接地址。这里可以写域名,也可以写IP地址。建议使用域名,为什么呢?因为IP可能会变动,但是主机名是不会变动的。

所以在java代码里面写死,就可以了!注意:必须是SASL协议才行!

 

super.users=User:admin  表示启动超级用户admin,注意:此用户名不允许更改,否则使用生产模式时,会有异常!

 

启动脚本

bin/kafka-server-start.sh 这个是kafka的启动脚本,要使用ACL,需要增加一个参数才行。

有2种方法修改,这里分别介绍一下:

1. 增加环境变量KAFKA_OPTS(推荐)

先来看一下,默认的bin/kafka-server-start.sh的最后一行

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

只需要在最后一行的上面一行,添加一个环境变量即可

export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_cluster_jaas.conf"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

 

2. 增加参数-Djava.security.auth.login.config

直接将最后一行修改为

exec $base_dir/kafka-run-class.sh -Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_cluster_jaas.conf $EXTRA_ARGS kafka.Kafka "$@"

 

JAAS文件

kafka_cluster_jaas.conf

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
passWord="123456"
user_admin="123456"
user_reader="123456"
user_writer="123456";
};


 

这个文件,是专门用来做认证的。用户名和密码的格式如下:

user_用户名="密码"

 

注意:对于超级用户,这几行是固定的

username="admin"
password="123456"
user_admin="admin"

这里指定的是admin用户密码为123456,密码可自行更改。

下面的,才是普通用户。最后一个用户,要有一个分号才行!

 

环境介绍

本文采用的环境,参考以下链接

Https://www.cnblogs.com/xiao987334176/p/10088497.html#autoid-3-0-0

 

使用了3台zookeeper和5台kafka。都是在一台服务器上面运行的!

其中zookeeper的镜像,不需要变动,直接启动即可。

但是kafka的镜像,需要重新构建,请看下面的内容。

 

创建镜像

创建空目录

mkdir /opt/kafka_cluster_acl

 

Dockerfile

FROM ubuntu:16.04
# 修改更新源为阿里云
ADD sources.list /etc/apt/sources.list
ADD kafka_2.12-2.1.0.tgz /
ADD kafka_cluster_jaas.conf /
# 安装jdk
RUN apt-get update && apt-get install -y openjdk-8-jdk --allow-unauthenticated && apt-get clean all

EXPOSE 9092
# 添加启动脚本
ADD run.sh .
RUN chmod 755 run.sh
ENTRYPOINT [ "/run.sh"]


 

kafka_cluster_jaas.conf

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="123456"
user_admin="123456"
user_reader="123456"
user_writer="123456";
};


 

run.sh

#!/bin/bash

if [ -z $broker_id ];then
    echo "broker_id变量不能为空"
    exit 1
fi

if [ -z $zookeeper ];then
    echo "zookeeper变量不能为空"
    exit 2
fi

if [ -z $advertised_hostname ];then
    echo "advertised_hostname变量不能为空"
    exit 3
fi

# 开启kafka acl验证
echo "

listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://$advertised_hostname:9092
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
allow.everyone.if.no.acl.found=true
super.users=User:admin

" >> /kafka_2.12-2.1.0/config/server.properties

cd /kafka_2.12-2.1.0
# 设置唯一id
sed -i "21s/0/$broker_id/" /kafka_2.12-2.1.0/config/server.properties
# 设置zookeeper连接地址
sed -i "123s/localhost/$zookeeper/" /kafka_2.12-2.1.0/config/server.properties

# 配置启动脚本,最后一行之前添加环境变量
sed -i -e "44"i'\export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_cluster_jaas.conf"' bin/kafka-server-start.sh

# 添加配置文件
mv /kafka_cluster_jaas.conf /kafka_2.12-2.1.0/config/

# 临时添加5条hosts
echo "172.168.0.5 kafka-1.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.6 kafka-2.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.7 kafka-3.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.8 kafka-4.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.9 kafka-5.default.svc.cluster.local" >> /etc/hosts

# 启动kafka
bin/kafka-server-start.sh config/server.properties

 

注意:由于没有DNS,这里临时添加了5条hosts记录。5台kafka之间,必须要相互连通,否则会报错

WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.networkClient)

 

sources.list

deb http://mirrors.aliyun.com/ubuntu/ xenial main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial main

deb http://mirrors.aliyun.com/ubuntu/ xenial-updates main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates main

deb http://mirrors.aliyun.com/ubuntu/ xenial universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial universe
deb http://mirrors.aliyun.com/ubuntu/ xenial-updates universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates universe

deb http://mirrors.aliyun.com/ubuntu/ xenial-security main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security main
deb http://mirrors.aliyun.com/ubuntu/ xenial-security universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security universe


 

此时,目录结构如下:

./
├── dockerfile
├── kafka_2.12-2.1.0.tgz
├── kafka_cluster_jaas.conf
├── run.sh
└── sources.list


 

生成镜像

docker build -t kafka_cluster_acl /opt/kafka_cluster_acl

 

启动镜像

请确保已经启动了3台zookeeper的镜像!

第一个kafka节点

docker run -it -p 9092:9092 -e broker_id=1 -e zookeeper=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 -e advertised_hostname=kafka-1.default.svc.cluster.local  --network br1 --ip=172.168.0.5 kafka_cluster_acl

 

第二个kafka节点

docker run -it -p 9093:9092 -e broker_id=2 -e zookeeper=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 -e advertised_hostname=kafka-2.default.svc.cluster.local --network br1 --ip=172.168.0.6 kafka_cluster_acl

 

第三个kafka节点

docker run -it -p 9094:9092 -e broker_id=3 -e zookeeper=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 -e advertised_hostname=kafka-3.default.svc.cluster.local --network br1 --ip=172.168.0.7 kafka_cluster_acl

 

第四个kafka节点

docker run -it -p 9095:9092 -e broker_id=4 -e zookeeper=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 -e advertised_hostname=kafka-4.default.svc.cluster.local --network br1 --ip=172.168.0.8 kafka_cluster_acl

 

第五个kafka节点

docker run -it -p 9096:9092 -e broker_id=5 -e zookeeper=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 -e advertised_hostname=kafka-5.default.svc.cluster.local --network br1 --ip=172.168.0.9 kafka_cluster_acl

 

客户端测试

shell脚本客户端

先来查看docker进程

复制代码

root@jqb-node128:~# docker ps
CONTAINER ID        IMAGE               COMMAND             CREATED              STATUS              PORTS                    NAMES
a5ff3c8f5c2a        kafka_cluster_acl   "/run.sh"           About a minute aGo   Up About a minute   0.0.0.0:9096->9092/tcp   gifted_jones
36a4d94054b5        kafka_cluster_acl   "/run.sh"           3 minutes ago        Up 3 minutes        0.0.0.0:9095->9092/tcp   modest_khorana
f614d734ac8b        kafka_cluster_acl   "/run.sh"           3 minutes ago        Up 3 minutes        0.0.0.0:9094->9092/tcp   tender_kare
29ef9a2edd08        kafka_cluster_acl   "/run.sh"           3 minutes ago        Up 3 minutes        0.0.0.0:9093->9092/tcp   reverent_jepsen
d9cd45c62e86        kafka_cluster_acl   "/run.sh"           3 minutes ago        Up 3 minutes        0.0.0.0:9092->9092/tcp   silly_mcclintock
69dba560bc09        zookeeper_cluster   "/run.sh"           4 minutes ago        Up 4 minutes        0.0.0.0:2183->2181/tcp   confident_fermat
d73a01e76949        zookeeper_cluster   "/run.sh"           4 minutes ago        Up 4 minutes        0.0.0.0:2182->2181/tcp   admiring_snyder
7ccab68252e7        zookeeper_cluster   "/run.sh"           4 minutes ago        Up 4 minutes        0.0.0.0:2181->2181/tcp   gifted_wilson

复制代码

 

确保已经运行了5个kafka和3个zk

 

随便进入一个kafka容器

root@jqb-node128:~# docker exec -it a5ff3c8f5c2a /bin/bashroot@a5ff3c8f5c2a:/# cd /kafka_2.12-2.1.0/

 

新增一个配置文件 kafka_client_jaas.conf 

root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# apt-get install -y vimroot@a5ff3c8f5c2a:/kafka_2.12-2.1.0# vi config/kafka_client_jaas.conf

 

内容如下:

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="writer"password="123456";
};

 

同理我们也要将配置文件内容传递给JVM, 因此需要修改。

生产者

root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# vi bin/kafka-console-producer.sh

最后一行的上面,添加 KAFKA_OPTS 变量

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M"
fi
export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_client_jaas.conf"
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"


修改生产者配置文件,最后一行追加2行内容

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

 

使用echo 追加

root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# echo 'security.protocol=SASL_PLAINTEXT' >> config/producer.properties
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# echo 'sasl.mechanism=PLAIN' >> config/producer.properties


消费者

修改生产者配置文件,使用echo追加

root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# echo 'security.protocol=SASL_PLAINTEXT' >> config/consumer.properties
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# echo 'sasl.mechanism=PLAIN' >> config/consumer.properties


编辑测试脚本

root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# vi bin/kafka-console-consumer.sh

最后一行的上面,添加 KAFKA_OPTS 变量

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M"
fi
export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_client_jaas.conf"
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"


测试生产者

目前还没有topic,先来创建一个topic

root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-topics.sh --create --zookeeper 172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 --topic test --partitions 1 --replication-factor 1

Created topic "test".
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0#

 

进入生产者模式,指定kafka的服务器为第一个kafka。当然,只要是5个kafka中的任意一个即可!

输入消息 fdsa,回车

root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test --producer.config config/producer.properties
>fdsa
[2018-12-17 08:45:15,455] ERROR Error when sending message to topic test with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLogginGCallback)
org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []
[2018-12-17 08:45:15,457] ERROR [Producer clientId=console-producer] Connection to node -1 (d9cd45c62e86.br1/172.168.0.5:9092) failed authentication due to: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are [] (org.apache.kafka.clients.NetworkClient)


会出现报错,则说明配置的security 已生效, 要想普通用户能读写消息,需要配置ACL

 

配置ACL

kafka的ACL规则,是存储在zookeeper中的,只需要连接zookeeper即可!

topic权限

允许writer用户有所有权限,访问所有topic

--operation All 表示所有权限,

--topic=* 表示所有topic

root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 --add --allow-principal User:writer --operation All --topic=*
Adding ACLs for resource `Topic:LITERAL:*`: 
     User:writer has Allow permission for operations: All from hosts: * 

Current ACLs for resource `Topic:LITERAL:*`: 
     User:writer has Allow permission for operations: All from hosts: *


 

组权限

root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 --add --allow-principal User:writer --operation All -group=*
Adding ACLs for resource `Group:LITERAL:*`: 
     User:writer has Allow permission for operations: All from hosts: * 

Current ACLs for resource `Group:LITERAL:*`: 
     User:writer has Allow permission for operations: All from hosts: *


 

再次测试

root@e0bb740ac0ce:/kafka_2.12-2.1.0# bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test --producer.config config/producer.properties
>123
>


注意:在config/server.properties 文件中,设置了

advertised.listeners=SASL_PLAINTEXT://kafka-1.default.svc.cluster.local:9092

所以连接地址,必须是指定域名才可以!

 

再开一个窗口,连接同样的容器

root@jqb-node128:~# docker exec -it a5ff3c8f5c2a /bin/bash
root@a5ff3c8f5c2a:/# cd /kafka_2.12-2.1.0/


启动消费者模式

root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-console-consumer.sh --bootstrap-server kafka-1.default.svc.cluster.local:9092 --topic test --from-beginning  --consumer.config config/consumer.properties
123

收到123表示成功了!

 

python客户端测试

由于真实主机无法直接连接到网桥的地址172.168.0.5,那么因此代码需要在

创建空目录

mkdir /opt/py_test

 

放2个文件

sources.list

deb http://mirrors.aliyun.com/ubuntu/ xenial main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial main

deb http://mirrors.aliyun.com/ubuntu/ xenial-updates main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates main

deb http://mirrors.aliyun.com/ubuntu/ xenial universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial universe
deb http://mirrors.aliyun.com/ubuntu/ xenial-updates universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates universe

deb http://mirrors.aliyun.com/ubuntu/ xenial-security main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security main
deb http://mirrors.aliyun.com/ubuntu/ xenial-security universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security universe

 

produer_consumer_acl_test.py

#!/usr/bin/env python3coding: utf-8
# 注意:需要手动创建topic才行执行此脚本

import sys
import io

def setup_io():  # 设置默认屏幕输出为utf-8编码
    sys.stdout = sys.__stdout__ = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8', line_buffering=True)
    sys.stderr = sys.__stderr__ = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8', line_buffering=True)
setup_io()

import time
from kafka import KafkaProducer
from kafka import KafkaConsumer


class KafkaClient(object):  # kafka客户端程序
    def __init__(self, kafka_server, port, topic,content,username,password):
        self.kafka_server = kafka_server  # kafka服务器ip地址
        self.port = port  # kafka端口
        self.topic = topic  # topic名
        self.content = content  # 发送内容
        self.username = username  # 用户名
        self.password = password  # 密码

    def producer(self):
        """
        生产者模式
        :return: object
        """

        # 连接kafka服务器,比如['192.138.150.193:9092']
        producer = KafkaProducer(bootstrap_servers=['%s:%s' % (self.kafka_server, self.port)],
                                 security_protocol="SASL_PLAINTEXT",  # 指定SASL安全协议
                                 sasl_mechanism='PLAIN',  # 配置SASL机制
                                 sasl_plain_username=self.username,  # 认证用户名
                                 sasl_plain_password=self.password,  # 密码
                                 )

        producer.send(self.topic, self.content)  # 发送消息,必须是二进制
        producer.flush()  # flush确保所有meg都传送给broker
        producer.close()
        return producer

    def consumer(self):
        """
        消费者模式
        :return: object
        """

        # 连接kafka,指定组为test_group
        consumer = KafkaConsumer(topic, group_id='test_group', bootstrap_servers=['%s:%s' % (kafka_server, port)],
                                 sasl_mechanism="PLAIN",
                                 security_protocol='SASL_PLAINTEXT',
                                 sasl_plain_username=self.username,
                                 sasl_plain_password=self.password,
                                 )

        return consumer
        # for msg in consumer:
        #     recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
        #     print(recv)

    def main(self):
        startime = time.time()  # 开始时间

        client = KafkaClient(self.kafka_server, self.port, self.topic, self.content,self.username,self.password)  # 实例化客户端
        client.producer()  # 执行生产者
        print('执行生产者')
        consumer = client.consumer()  # 执行消费者
        print('执行消费者')
        print('等待结果....')
        flag = False
        for msg in consumer:
            # recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
            # 判断生产的消息和消费的消息是否一致
            print(msg.value)
            # print(self.content)
            if msg.value == self.content:
                flag = True
                break

        consumer.close()  # 关闭消费者对象
        endtime = time.time()  # 结束时间

        if flag:
            # %.2f %(xx) 表示保留小数点2位
            return "kafka验证消息成功,花费时间", '%.2f 秒' % (endtime - startime)
        else:
            return "kafka验证消息失败,花费时间", '%.2f 秒' % (endtime - startime)


if __name__ == '__main__':
    kafka_server = "kafka-1.default.svc.cluster.local"
    port = "9092"
    topic = "test"
    content = "hello honey".encode('utf-8')

    username = "writer"
    password = "123456"

    client = KafkaClient(kafka_server,port,topic,content,username,password)  # 实例化客户端
    print(client.main())

 

此时目录结构如下:

./
├── produer_consumer_acl_test.py
└── sources.list

进入容器,更新ubuntu更新源

root@jqb-node128:/opt/py_test# docker run -it -v /opt/py_test:/mnt --network br1 --ip=172.168.0.10 ubuntu:16.04
root@064f2f97aad2:/# cp /mnt/sources.list /etc/apt/ 
root@064f2f97aad2:/# apt-get update

安装Python3-pip

root@064f2f97aad2:/# apt-get install -y python3-pip

 

安装kafka模块

root@064f2f97aad2:/# pip3 install kafka

 

添加hosts记录

echo "172.168.0.5 kafka-1.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.6 kafka-2.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.7 kafka-3.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.8 kafka-4.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.9 kafka-5.default.svc.cluster.local" >> /etc/hosts

执行Python文件

root@064f2f97aad2:/# python3 /mnt/produer_consumer_acl_v1_test.py 
执行生产者
执行消费者
等待结果....
b'hello honey'
('kafka验证消息成功,花费时间', '28.59 秒')


注意:第一次执行时,会非常慢。等待30秒,如果没有输出hello honey。终止掉,再次执行。

反复5次。就可以了!

 

之后再次执行几次,就会很快了!

root@064f2f97aad2:/# python3 /mnt/produer_consumer_acl_v1_test.py 
执行生产者
执行消费者
等待结果....
b'hello honey'
('kafka验证消息成功,花费时间', '5.37 秒')
root@064f2f97aad2:/# python3 /mnt/produer_consumer_acl_v1_test.py 
执行生产者
执行消费者
等待结果....
b'hello honey'
('kafka验证消息成功,花费时间', '0.43 秒')

 

为啥,前面几次会很慢。之后就很快了,什么原因,我也不知道!

总之,只要经历过慢的阶段,之后就很快了!

 

本文参考链接:

http://blog.51cto.com/xiaoyouyou/2061143


--结束END--

本文标题: Kafka 集群配置SASL+ACL

本文链接: https://lsjlt.com/news/193543.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • Kafka 集群配置SASL+ACL
    在Kafka0.9版本之前,Kafka集群时没有安全机制的。Kafka Client应用可以通过连接Zookeeper地址,例如zk1:2181:zk2:2181,zk3:2181等。来获取存储在Zookeeper中的Kafka元数据信息。...
    99+
    2023-01-31
    集群 Kafka ACL
  • 基于SASL+ACL 的Kafka集群内外网访问
    需求:  阿里云ECS服务器上部署了Kafka伪集群,要求内网其他机器访问时broker走内网网卡,外部用户访问均走公网网卡IP:  内网:10.130.10.10  &nbs...
    99+
    2024-04-02
  • Kafka中的ACL如何配置
    Apache Kafka中的ACL(Access Control List)用于控制用户对主题的访问权限。ACL可以配置在broke...
    99+
    2024-04-02
  • Ubuntu-16.04中怎么配置Apache Kafka集群
    这期内容当中小编将会给大家带来有关Ubuntu-16.04中怎么配置Apache Kafka集群,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。Apache Kafka是一个免费的开源流处理软件平台,由Ap...
    99+
    2023-06-02
  • kafka集群搭建
     一个典型的Kafka 集群中包含若干Producer(可以是web 前端产生的Page View,或者是服务器日志,系统 CPU、Memory 等),若干broker(Kafka 支持水平扩展,一般broker 数量越多,集群吞...
    99+
    2023-01-31
    集群 kafka
  • Reids配置集群
    为什么要有集群之前我们已经讲了主从的概念,一主可以多从,如果同时的访问量过大(1000w),主服务肯定就会挂掉,数据服务就挂掉了或者发生自然灾难大公司都会有很多的服务器(华东地区、华南地区、华中地区、华北地...
    99+
    2024-04-02
  • kafka---- zookeeper集群搭建
    1.准备工作:##准备3个节点,要求配置好主机名称,服务器之间系统时间保持一致##注意/etc/hostname 和/etc/hosts 配置主机名称(在这个里我准备Prac-zk-133, Prac-zk-134 , Prac-zk-13...
    99+
    2023-08-30
    kafka zookeeper debian
  • elasticsearch集群配置(PreviewVersion) - G
    elasticsearch集群配置(PreviewVersion)   准备 首先需要在每个节点有可以正常启动的单节点elasticsearch   elasticsearch集群配置仅需要在elasticsearch.yml添加...
    99+
    2020-01-16
    elasticsearch集群配置(PreviewVersion) - G
  • mysql主从集群配置
    先编辑master配置:vim /etc/my.cnf(加上红框部分。#server id部分:每个都有独一无二的id作为区分,这id可以用服务器ip后三位。#binary log:日志。#statemen...
    99+
    2024-04-02
  • MySQL集群如何配置
    这篇文章将为大家详细讲解有关MySQL集群如何配置,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。一、介绍========在介绍如何安装配置基于2台服务器的MySQL集群。...
    99+
    2024-04-02
  • 怎么配置ZooKeeper集群
    在配置ZooKeeper集群之前,首先需要确认每台服务器的主机名、IP地址和端口号,并且确保它们之间可以相互通信。以下是配置ZooK...
    99+
    2024-04-02
  • docker如何部署kafka集群
    要部署Kafka集群,可以使用Docker来简化整个过程。下面是一个基本的步骤:1. 安装Docker和Docker Compose...
    99+
    2023-10-08
    kafka docker
  • Linux怎么关闭kafka集群
    要关闭Kafka集群,您需要依次停止各个Kafka节点。以下是关闭Kafka集群的步骤:1. 登录到Kafka集群的每个节点上。2....
    99+
    2023-10-08
    kafka linux
  • docker如何搭建kafka集群
    本文小编为大家详细介绍“docker如何搭建kafka集群”,内容详细,步骤清晰,细节处理妥当,希望这篇“docker如何搭建kafka集群”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。一、原生Docker命令 ...
    99+
    2023-06-30
  • java怎么连接kafka集群
    要连接Kafka集群,您需要使用Kafka的Java客户端库,并在代码中配置正确的连接参数。以下是一个示例代码片段,展示了如何连接到...
    99+
    2023-10-20
    java kafka
  • mongo副本集集群安装配置
    新建用户和目录 useradd mongodb mkdir  -p /comm/mg10000/data mkdir  -p /comm/mg10001/data mkdir...
    99+
    2024-04-02
  • MySQL5.7 集群配置的步骤
    本次针对的MySQL版本为5.7,首先分别在A服务器和B服务器上安装MySQL,可以通过yum安装也可以通过wget下载直接编译安装。安装方式可以多种多样,但必须要确保安装成功。 1.修改A服务器的my.cnf文件 ...
    99+
    2022-05-19
    MySQL 集群配置
  • GBase8s HAC集群配置方法
    GBase8s 同城灾备高可用集群配置方法 Tips :      一个 GBase8s 同城灾备高可用 集群中最多只能有一个 同城灾备 节点。 ...
    99+
    2024-04-02
  • MySQL集群的配置方法
    本篇内容介绍了“MySQL集群的配置方法”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成! 一、介绍 ==...
    99+
    2024-04-02
  • Jboss集群的安装配置
    这篇文章主要介绍“Jboss集群的安装配置”,在日常操作中,相信很多人在Jboss集群的安装配置问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Jboss集群的安装配置”的疑惑...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作