kafkaCenter是什么 KafkaCenter是一个针对Kafka的一站式,解决方案。用于Kafka集群的维护与管理,生产者和消费者的监控,以及Kafka部分生态组件的使用。 对于Kafka的平台化,一直缺少一个成熟的解决
KafkaCenter是一个针对Kafka的一站式,解决方案。用于Kafka集群的维护与管理,生产者和消费者的监控,以及Kafka部分生态组件的使用。
对于Kafka的平台化,一直缺少一个成熟的解决方案,之前比较流行的kafka监控方案,如kafka-manager提供了集群管理与topic管理等等功能。但是对于生产者、消费者的监控,以及Kafka的新生态,如Connect,Ksql还缺少响应的支持。Confluent Control Center功能要完整一些,但却是非开源收费的。
对于Kafka的使用,一直都是一个让人头疼的问题,由于实时系统的强运维特性,我们不得不投入大量的时间用于集群的维护,kafka的运维,比如:
功能模块介绍
系统截图:
组件 | 是否必须 | 功能 |
---|---|---|
mysql | 必须 | 配置信息存在mysql |
elasticsearch(7.0+) | 可选 | 各种监控信息的存储 |
email server | 可选 | Apply, approval, warning e-mail alert |
在MySQL中执行sql建表
-- Dumping database structure for kafka_center
CREATE DATABASE IF NOT EXISTS `kafka_center` ;
USE `kafka_center`;
-- Dumping structure for table kafka_center.alert_group
CREATE TABLE IF NOT EXISTS `alert_group` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`cluster_id` int(11) NOT NULL,
`topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
`consummer_group` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
`consummer_api` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
`threshold` int(11) DEFAULT NULL,
`dispause` int(11) DEFAULT NULL,
`mail_to` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT "",
`WEBhook` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT "",
`create_date` datetime DEFAULT NULL,
`owner_id` int(11) DEFAULT NULL,
`team_id` int(11) DEFAULT NULL,
`disable_alerta` tinyint(1) DEFAULT 0,
`enable` tinyint(1) NOT NULL DEFAULT 1,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
-- Data exporting was unselected.
-- Dumping structure for table kafka_center.cluster_info
CREATE TABLE IF NOT EXISTS `cluster_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) COLLATE utf8_bin NOT NULL,
`zk_address` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
`broker` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
`create_time` datetime DEFAULT NULL,
`comments` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
`enable` int(11) DEFAULT NULL,
`broker_size` int(4) DEFAULT 0,
`kafka_version` varchar(10) COLLATE utf8_bin DEFAULT "",
`location` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
`graf_addr` varchar(255) COLLATE utf8_bin DEFAULT "",
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
-- Data exporting was unselected.
-- Dumping structure for table kafka_center.ksql_info
CREATE TABLE IF NOT EXISTS `ksql_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`cluster_id` int(11) DEFAULT NULL,
`cluster_name` varchar(255) DEFAULT NULL,
`ksql_url` varchar(255) DEFAULT NULL,
`ksql_serverId` varchar(255) DEFAULT NULL,
`version` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- Data exporting was unselected.
-- Dumping structure for table kafka_center.task_info
CREATE TABLE IF NOT EXISTS `task_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`cluster_ids` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
`location` varchar(20) COLLATE utf8_bin NOT NULL DEFAULT "",
`topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
`partition` int(11) DEFAULT NULL,
`replication` int(11) DEFAULT NULL,
`message_rate` int(50) DEFAULT NULL,
`ttl` int(11) DEFAULT NULL,
`owner_id` int(11) DEFAULT NULL,
`team_id` int(11) DEFAULT NULL,
`comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT "",
`create_time` datetime DEFAULT NULL,
`approved` int(11) DEFAULT NULL,
`approved_id` int(11) DEFAULT NULL,
`approved_time` datetime DEFAULT NULL,
`approval_opiNIOns` varchar(1000) COLLATE utf8_bin DEFAULT "",
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
-- Data exporting was unselected.
-- Dumping structure for table kafka_center.team_info
CREATE TABLE IF NOT EXISTS `team_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
`own` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
-- Data exporting was unselected.
-- Dumping structure for table kafka_center.topic_collection
CREATE TABLE IF NOT EXISTS `topic_collection` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`cluster_id` int(11) NOT NULL,
`user_id` int(11) NOT NULL,
`name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
`type` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
-- Data exporting was unselected.
-- Dumping structure for table kafka_center.topic_info
CREATE TABLE IF NOT EXISTS `topic_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`cluster_id` int(11) NOT NULL,
`topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
`partition` int(11) DEFAULT NULL,
`replication` int(11) DEFAULT NULL,
`ttl` bigint(11) DEFAULT NULL,
`config` varchar(512) COLLATE utf8_bin DEFAULT NULL,
`owner_id` int(11) DEFAULT NULL,
`team_id` int(11) DEFAULT NULL,
`comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT "",
`create_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
-- Data exporting was unselected.
-- Dumping structure for table kafka_center.user_info
CREATE TABLE IF NOT EXISTS `user_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
`real_name` varchar(255) COLLATE utf8_bin DEFAULT "",
`email` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
`role` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "100",
`create_time` datetime DEFAULT NULL,
`passWord` varchar(255) COLLATE utf8_bin DEFAULT "",
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
-- Data exporting was unselected.
-- Dumping structure for table kafka_center.user_team
CREATE TABLE IF NOT EXISTS `user_team` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` int(11) DEFAULT NULL,
`team_id` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
相关配置位于application.properties
可对端口 日志等信息做一些修改
server.port=8080
debug=false
# 设置session timeout为6小时
server.servlet.session.timeout=21600
spring.security.user.name=admin
spring.security.user.password=admin
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/kafka_center?useUnicode=true&characterEncoding=utf-8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.minimum-idle=5
spring.datasource.hikari.maximum-pool-size=15
spring.datasource.hikari.pool-name=KafkaCenterHikariCP
spring.datasource.hikari.max-lifetime =30000
spring.datasource.hikari.connection-test-query=SELECT 1
management.health.defaults.enabled=false
public.url=Http://localhost:8080
connect.url=http://localhost:8000/#/
system.topic.ttl.h=16
monitor.enable=true
monitor.collect.period.minutes=5
monitor.elasticsearch.hosts=localhost:9200
monitor.elasticsearch.index=kafka_center_monitor
#是否启用收集线程指定集群收集
monitor.collector.include.enable=false
#收集线程指定location,必须属于remote.locations之中
monitor.collector.include.location=dev
collect.topic.enable=true
collect.topic.period.minutes=10
# remote的功能是为了提高lag查询和收集,解决跨location网络延迟问题
remote.query.enable=false
remote.hosts=gqc@localhost2:8080
remote.locations=dev,gqc
#发送consumer group的lag发送给alert service
alert.enable=false
alert.dispause=2
alert.service=
alert.threshold=1000
alter.env=other
#是否开启邮件功能,true:启用,false:禁用
mail.enable=false
spring.mail.host=
spring.mail.username=KafkaCenter@xaecbd.com
# oauth2
generic.enabled=false
generic.name=oauth2 Login
generic.auth_url=
generic.token_url=
generic.redirect_utl=
generic.api_url=
generic.client_id=
generic.client_secret=
generic.scopes=
推荐使用Docker
docker run -d -p 8080:8080 --name KafkaCenter -v ${PWD}/application.properties:/opt/app/kafka-center/config/application.properties xaecbd/kafka-center:2.1.0
不用docker
$ git clone https://GitHub.com/xaecbd/KafkaCenter.git
$ cd KafkaCenter
$ mvn clean package -DMaven.test.skip=true
$ cd KafkaCenterKafkaCenter-Core arget
$ java -jar KafkaCenter-Core-2.1.0-SNAPSHOT.jar
访问http://localhost:8080 管理员用户与密码默认:admin / admin
Topics
用户可以在此模块完成Topic查看,已经申请新建Topic,同时可以对Topic进行生产消费测试。
Monitor
用户可以在此模块中可以查看Topic的生成以及消费情况,同时可以针对消费延迟情况设置预警信息。
Alerts
此模块用于维护预警信息。用户可以看到自己所有预警信息,管理员可以看到所有人的预警信息。
Kafka Connect
实现用户快速创建自己的Connect Job,并对自己的Connect进行维护。
KSQL
实现用户快速创建自己的KSQL Job,并对自己的Job进行维护。
Approve
此模块主要用于当普通用户申请创建Topic 或者Job时,管理员进行审批操作。
Setting
此模块主要功能为管理员维护User、Team以及kafka cluster信息
Cluster Manager
此模块用于管理员对集群的正常维护操作。
这里是一些基本的统计信息
集群与topic列表
这里是一些topic的管理功能
操作范围:
用户所属Team的所有Topic
Important: admin不能申请task,普通用户必须先让管理员新建team后,将用户加入指定team后,才可以申请task。
操作范围:
用户所属Team的所有Task
Topic -> My Task -> Detail 查看申请的Task信息
Topic -> My Task -> Delete 删除被拒绝或待审批的Task
Topic -> My Task -> Edit 修改被拒绝的Task
Topic -> My Task -> Create Topic Task 创建Task
审批结果:
Topic命名规则:
只能包含:数字、大小写字母、下划线、中划线、点;长度大于等于3小于等于100。
不推荐:下划线开头;
可对所有Topic进行消费测试
监控模块
生产者监控
消费者监控
消息积压
报警功能
这里是一些Connect的操作
可以进行KQL的查询操作
这里主要是管理员做一些审核操作
这些主要是用户的一些设置
KafkaCenter还是一个非常不错的kafka管理工具,可以满足大部分需求。
更多实时数据分析相关博文与科技资讯,欢迎关注 “实时流式计算”
--结束END--
本文标题: 一站式Kafka平台解决方案——KafkaCenter
本文链接: https://lsjlt.com/news/6156.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-10-23
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0