一站式Kafka平台解决方案——KafkaCenter

一站式Kafka平台解决方案——KafkaCenterKafkaCenter是什么 KafkaCenter是一个针对Kafka的一站式,解决方案。用于Kafka集群的维护与管理,生产者和消费者的监控,以及Kafka部分生态组件的使用。 对于Kafka的平

一站式Kafka平台解决方案——KafkaCenter

一站式Kafka平台解决方案——KafkaCenter

KafkaCenter是什么

KafkaCenter是一个针对Kafka的一站式,解决方案。用于Kafka集群的维护与管理,生产者和消费者的监控,以及Kafka部分生态组件的使用。

对于Kafka的平台化,一直缺少一个成熟的解决方案,之前比较流行的kafka监控方案,如kafka-manager提供了集群管理与topic管理等等功能。但是对于生产者、消费者的监控,以及Kafka的新生态,如Connect,KSQL还缺少响应的支持。Confluent Control Center功能要完整一些,但却是非开源收费的。

对于Kafka的使用,一直都是一个让人头疼的问题,由于实时系统的强运维特性,我们不得不投入大量的时间用于集群的维护,kafka的运维,比如:

  • 人工创建topic,特别费力
  • 相关kafka运维,监控孤岛化
  • 现有消费监控工具监控不准确
  • 无法拿到Kafka 集群的summay信息
  • 无法快速知晓集群健康状态
  • 无法知晓业务对team kafka使用情况
  • kafka管理,监控工具稀少,没有一个好的工具我们直接可以使用
  • 无法快速查询topic消息

一站式Kafka平台解决方案——KafkaCenter

功能模块介绍

  • Home-> 查看平台管理的Kafka Cluster集群信息及监控信息
  • Topic-> 用户可以在此模块查看自己的Topic,发起申请新建Topic,同时可以对Topic进行生产消费测试。
  • Monitor-> 用户可以在此模块中可以查看Topic的生产以及消费情况,同时可以针对消费延迟情况设置预警信息。
  • Connect-> 实现用户快速创建自己的Connect Job,并对自己的Connect进行维护。
  • KSQL-> 实现用户快速创建自己的KSQL Job,并对自己的Job进行维护。
  • Approve-> 此模块主要用于当普通用户申请创建Topic,管理员进行审批操作。
  • Setting-> 此模块主要功能为管理员维护User、Team以及kafka cluster信息
  • Kafka Manager-> 此模块用于管理员对集群的正常维护操作。

系统截图:

一站式Kafka平台解决方案——KafkaCenter

安装与入门

安装需要依赖 mysql es email server

组件 是否必须 功能
mysql 必须 配置信息存在mysql
elasticsearch(7.0+) 可选 各种监控信息的存储
email server 可选 Apply, approval, warning e-mail alert

1、初始化

在MySQL中执行sql建表

-- Dumping database structure for kafka_center
CREATE DATABASE IF NOT EXISTS `kafka_center` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */;
USE `kafka_center`;


-- Dumping structure for table kafka_center.alert_group
CREATE TABLE IF NOT EXISTS `alert_group` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) NOT NULL,
  `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `consummer_group` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `consummer_api` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `threshold` int(11) DEFAULT NULL,
  `dispause` int(11) DEFAULT NULL,
  `mail_to` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT "",
  `webhook` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT "",
  `create_date` datetime DEFAULT NULL,
  `owner_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  `disable_alerta` tinyint(1) DEFAULT 0,
  `enable` tinyint(1) NOT NULL DEFAULT 1,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.cluster_info
CREATE TABLE IF NOT EXISTS `cluster_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) COLLATE utf8_bin NOT NULL,
  `zk_address` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `broker` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `create_time` datetime DEFAULT NULL,
  `comments` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `enable` int(11) DEFAULT NULL,
  `broker_size` int(4) DEFAULT 0,
  `kafka_version` varchar(10) COLLATE utf8_bin DEFAULT "",
  `location` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `graf_addr` varchar(255) COLLATE utf8_bin DEFAULT "",
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.ksql_info
CREATE TABLE IF NOT EXISTS `ksql_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) DEFAULT NULL,
  `cluster_name` varchar(255) DEFAULT NULL,
  `ksql_url` varchar(255) DEFAULT NULL,
  `ksql_serverId` varchar(255) DEFAULT NULL,
  `version` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.task_info
CREATE TABLE IF NOT EXISTS `task_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_ids` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `location` varchar(20) COLLATE utf8_bin NOT NULL DEFAULT "",
  `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `partition` int(11) DEFAULT NULL,
  `replication` int(11) DEFAULT NULL,
  `message_rate` int(50) DEFAULT NULL,
  `ttl` int(11) DEFAULT NULL,
  `owner_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT "",
  `create_time` datetime DEFAULT NULL,
  `approved` int(11) DEFAULT NULL,
  `approved_id` int(11) DEFAULT NULL,
  `approved_time` datetime DEFAULT NULL,
  `approval_opinions` varchar(1000) COLLATE utf8_bin DEFAULT "",
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.team_info
CREATE TABLE IF NOT EXISTS `team_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `own` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.topic_collection
CREATE TABLE IF NOT EXISTS `topic_collection` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) NOT NULL,
  `user_id` int(11) NOT NULL,
  `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `type` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.topic_info
CREATE TABLE IF NOT EXISTS `topic_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) NOT NULL,
  `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `partition` int(11) DEFAULT NULL,
  `replication` int(11) DEFAULT NULL,
  `ttl` bigint(11) DEFAULT NULL,
  `config` varchar(512) COLLATE utf8_bin DEFAULT NULL,
  `owner_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT "",
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.user_info
CREATE TABLE IF NOT EXISTS `user_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `real_name` varchar(255) COLLATE utf8_bin DEFAULT "",
  `email` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `role` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "100",
  `create_time` datetime DEFAULT NULL,
  `password` varchar(255) COLLATE utf8_bin DEFAULT "",
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.user_team
CREATE TABLE IF NOT EXISTS `user_team` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

代码100分

2、配置

相关配置位于application.properties

可对端口 日志等信息做一些修改

代码100分server.port=8080
debug=false
# 设置session timeout为6小时
server.servlet.session.timeout=21600
spring.security.user.name=admin
spring.security.user.password=admin
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/kafka_center?useUnicode=true&characterEncoding=utf-8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.minimum-idle=5
spring.datasource.hikari.maximum-pool-size=15
spring.datasource.hikari.pool-name=KafkaCenterHikariCP
spring.datasource.hikari.max-lifetime =30000
spring.datasource.hikari.connection-test-query=SELECT 1
management.health.defaults.enabled=false

public.url=http://localhost:8080
connect.url=http://localhost:8000/#/
system.topic.ttl.h=16

monitor.enable=true
monitor.collect.period.minutes=5
monitor.elasticsearch.hosts=localhost:9200
monitor.elasticsearch.index=kafka_center_monitor
#是否启用收集线程指定集群收集
monitor.collector.include.enable=false
#收集线程指定location,必须属于remote.locations之中
monitor.collector.include.location=dev
collect.topic.enable=true
collect.topic.period.minutes=10
# remote的功能是为了提高lag查询和收集,解决跨location网络延迟问题
remote.query.enable=false
remote.hosts=gqc@localhost2:8080
remote.locations=dev,gqc
#发送consumer group的lag发送给alert service
alert.enable=false
alert.dispause=2
alert.service=
alert.threshold=1000
alter.env=other
#是否开启邮件功能,true:启用,false:禁用
mail.enable=false
spring.mail.host=
spring.mail.username=KafkaCenter@xaecbd.com
# oauth2
generic.enabled=false
generic.name=oauth2 Login
generic.auth_url=
generic.token_url=
generic.redirect_utl=
generic.api_url=
generic.client_id=
generic.client_secret=
generic.scopes=

3、运行

推荐使用docker

docker run -d -p 8080:8080 --name KafkaCenter -v ${PWD}/application.properties:/opt/app/kafka-center/config/application.properties xaecbd/kafka-center:2.1.0

不用docker

代码100分$ git clone https://github.com/xaecbd/KafkaCenter.git
$ cd KafkaCenter
$ mvn clean package -Dmaven.test.skip=true
$ cd KafkaCenterKafkaCenter-Core	arget
$ java -jar KafkaCenter-Core-2.1.0-SNAPSHOT.jar

4、查看

访问http://localhost:8080 管理员用户与密码默认:admin / admin

功能介绍

Topics

用户可以在此模块完成Topic查看,已经申请新建Topic,同时可以对Topic进行生产消费测试。

Monitor

用户可以在此模块中可以查看Topic的生成以及消费情况,同时可以针对消费延迟情况设置预警信息。

Alerts

此模块用于维护预警信息。用户可以看到自己所有预警信息,管理员可以看到所有人的预警信息。

Kafka Connect

实现用户快速创建自己的Connect Job,并对自己的Connect进行维护。

KSQL

实现用户快速创建自己的KSQL Job,并对自己的Job进行维护。

Approve

此模块主要用于当普通用户申请创建Topic 或者Job时,管理员进行审批操作。

Setting

此模块主要功能为管理员维护User、Team以及kafka cluster信息

Cluster Manager

此模块用于管理员对集群的正常维护操作。

Home

这里是一些基本的统计信息

一站式Kafka平台解决方案——KafkaCenter

My Favorite

集群与topic列表

一站式Kafka平台解决方案——KafkaCenter

Topic

这里是一些topic的管理功能

Topic List

操作范围:

用户所属Team的所有Topic

  • Topic -> Topic List -> Detail 查看Topic的详细信息
  • Topic -> Topic List -> Mock 对Topic进行生产测试

一站式Kafka平台解决方案——KafkaCenter

一站式Kafka平台解决方案——KafkaCenter

申请创建topic

Important: admin不能申请task,普通用户必须先让管理员新建team后,将用户加入指定team后,才可以申请task。

操作范围:

用户所属Team的所有Task

  • Topic -> My Task -> Detail 查看申请的Task信息

  • Topic -> My Task -> Delete 删除被拒绝或待审批的Task

  • Topic -> My Task -> Edit 修改被拒绝的Task

  • Topic -> My Task -> Create Topic Task 创建Task

    • 按照表单各字段要求填写信息
    • 点击确认,提交申请

    审批结果:

    • 审批通过:Topic将会被创建在管理员指定的集群
    • 审批拒绝:用户收到邮件,返回到My Task,点击对应Task后面的Edit,针对审批意见进行修改

Topic命名规则:

只能包含:数字、大小写字母、下划线、中划线、点;长度大于等于3小于等于100。

不推荐:下划线开头;

一站式Kafka平台解决方案——KafkaCenter

可对所有Topic进行消费测试

一站式Kafka平台解决方案——KafkaCenter

Monitor

监控模块

生产者监控

一站式Kafka平台解决方案——KafkaCenter

消费者监控

一站式Kafka平台解决方案——KafkaCenter

消息积压

一站式Kafka平台解决方案——KafkaCenter

报警功能

一站式Kafka平台解决方案——KafkaCenter

Connect

这里是一些Connect的操作

一站式Kafka平台解决方案——KafkaCenter

一站式Kafka平台解决方案——KafkaCenter

KSQL

可以进行KQL的查询操作

一站式Kafka平台解决方案——KafkaCenter

Approve

这里主要是管理员做一些审核操作

  • Approve->check 审批用户的Task
  • 根据用户选择的location指定cluster
  • 检查用户设置的partition和replication大小是否合理,如不合理做出调整
  • 检查其他字段是否合理,如需要拒绝该申请,点击Reject并填写意见。

一站式Kafka平台解决方案——KafkaCenter

Kafka Manager
Topic管理

一站式Kafka平台解决方案——KafkaCenter

Cluster管理

一站式Kafka平台解决方案——KafkaCenter

broker管理

一站式Kafka平台解决方案——KafkaCenter

group管理

一站式Kafka平台解决方案——KafkaCenter

Setting

这些主要是用户的一些设置

一站式Kafka平台解决方案——KafkaCenter

KafkaCenter还是一个非常不错的kafka管理工具,可以满足大部分需求。
更多实时数据分析相关博文与科技资讯,欢迎关注 “实时流式计算”

一站式Kafka平台解决方案——KafkaCenter

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/8396.html

(0)
上一篇 2023-02-28
下一篇 2023-02-28

相关推荐

  • Python 3 教程「终于解决」

    Python 3 教程「终于解决」Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python …

    2023-03-31
    164
  • Python如何停止线程

    Python如何停止线程在Python中,线程是一种非常常见且强大的工具。线程可以在同一进程中并行执行多个任务,以提高程序的性能和响应速度。但是,在某些情况下,我们需要停止线程。在这篇文章中,我们将学习如何在Python中停止线程。

    2024-06-25
    41
  • PG-跨库操作_pg copy

    PG-跨库操作_pg copy在PostgreSQL数据库之间进行跨库操作的方式 dblink postgres_fdw 本文先说说dblink;dblink是一个支持从数据库会话中连接到其他PostgreSQL数据库的插件。在其

    2023-03-22
    160
  • 如何清除sql2008r2日志_sql2008怎么手动删除数据

    如何清除sql2008r2日志_sql2008怎么手动删除数据SQL2008 的收缩日志 由于SQL2008对文件和日志管理进行了优化,所以以下语句在SQL2005中可以运行但在SQL2008中已经被取消:SQL2005 清空日志的方法:Backup&

    2023-02-17
    148
  • mysql 分隔字符串的函数_Mysql 字符串分隔函数

    mysql 分隔字符串的函数_Mysql 字符串分隔函数/***字符串分隔方法*获取字符串分隔之后的数组长度*/DROPFUNCTIONIFEXISTS`func_get_split_total`;DELIMITER;;CREATEFUNCTION`func_get_split_total`(f_stringtext,#长度不够会导致临时表数据不全,#mysql字符类型varchar有长度限制(最大65535),改用text文…

    2023-03-01
    147
  • wm_concat()_oracle wm_concat排序

    wm_concat()_oracle wm_concat排序wm_concat(column)结果为的解决办法 某个column的逻辑是如有多个则用英文逗号隔开; 上网查询资料,是用oracle自带的wm_concat()函数; 但select出…

    2022-12-16
    240
  • 在Python中声明数组

    在Python中声明数组在编程中,数组是一种非常常见的数据结构。数组可以存储多个相同类型的数据,并可以通过下标索引来访问数组中的元素。在Python中,声明数组并不像C语言一样需要事先指定数组的大小,这是因为Python中的数组是一种动态的数据类型,可以自动调整大小以适应存储的元素。

    2024-05-10
    54
  • 一个字符串转数字的小功能

    一个字符串转数字的小功能with t as ( select '-' as col1 –isnumeric('-')这里会判断为数字,所以不能用 union all select '

    2023-02-17
    153

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注