kafka datahub_hadoop HA

kafka datahub_hadoop HA一、概述 EFAK(Eagle For Apache Kafka,以前称为 Kafka Eagle)是一款由国内公司开源的Kafka集群监控系统,可以用来监视kafka集群的broker状态、Topi

大数据Hadoop之——Kafka 图形化工具 EFAK(EFAK环境部署)

目录
  • 一、概述
  • 二、EFAK架构
  • 三、EFAK数据采集原理
  • 四、安装Kafka
    • 1)Kafka下载
    • 2)配置环境变量
    • 3)创建logs目录
    • 4)修改kafka配置
    • 5)修改zookeeper配置
    • 6)配置Zookeeper myid
    • 7)开启Kafka JMX监控
    • 8)将kafka目录推送到其它节点
    • 9)启动服务
  • 五、安装EFAK
    • 1)下载EFAK
    • 2)创建数据库
    • 2)设置环境变量
    • 3)配置
    • 4)调整启动参数
    • 6)修改 works配置
    • 7)将EFAK推送其它节点
    • 8)启动
  • 六、简单使用
    • 1)Kafka CLI简单使用
    • 2)EFAK常用命令

一、概述

EFAK(Eagle For Apache Kafka,以前称为 Kafka Eagle)是一款由国内公司开源的Kafka集群监控系统,可以用来监视kafka集群的broker状态、Topic信息、IO、内存、consumer线程、偏移量等信息,并进行可视化图表展示。独特的KQL还可以通过SQL在线查询kafka中的数据。

源码: https://github.com/smartloli/kafka-eagle/
下载: http://download.kafka-eagle.org/
官方文档:https://www.kafka-eagle.org/articles/docs/documentation.html

二、EFAK架构

EFAK分布式模式部署,这里以5个节点为例子(1个Master和4个Slave),各个节点的角色如下如所示:

kafka datahub_hadoop HA

三、EFAK数据采集原理

对于 Kafka,我们可以收集以下数据

  • Kafka broker常用机器加载信息:内存、cpu、IP、版本等。
  • 服务监控数据:TPS、QPS、RT等
  • 应用程序监控:组、消费者、生产者、主题等。

kafka datahub_hadoop HA

因为EFAK是kafka的监控系统,所以前提是需先安装Kafka和Zookeeper。

四、安装Kafka

kafka官网文档:https://kafka.apache.org/documentation/

1)Kafka下载

$ cd /opt/bigdata/hadoop/software
$ wget https://dlcdn.apache.org/kafka/3.1.1/kafka_2.13-3.1.1.tgz
$ tar -xf kafka_2.13-3.1.1.tgz -C /opt/bigdata/hadoop/server/

2)配置环境变量

$ vi /etc/profile
export KAFKA_HOME=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1
export PATH=$PATH:$KAFKA_HOME/bin

$ source /etc/profile

3)创建logs目录

$ mkdir $KAFKA_HOME/logs

4)修改kafka配置

$ cd $KAFKA_HOME
# 查看现有配置,去掉空行和注释
$ cat config/server.properties |grep -v "^$|^#"
$ cat > $KAFKA_HOME/config/server.properties <<EOF
#broker的全局唯一编号,不能重复
broker.id=0

#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka数据的存储位置
log.dirs=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop-node1:12181,hadoop-node2:12181,hadoop-node3:12181
#zookeeper连接超时时间
zookeeper.connection.timeout.ms=60000
EOF

5)修改zookeeper配置

# 创建zookeeper data和logs目录
$ mkdir $KAFKA_HOME/zookeeper_data $KAFKA_HOME/zookeeper_logs 
$ vi $KAFKA_HOME/config/zookeeper.properties
# 配置主要修改如下:
#数据目录
dataDir=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/zookeeper_data
#日志目录
dataLogDir=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/zookeeper_logs
#心跳间隔时间,zookeeper中使用的基本时间单位,毫秒值。每隔2秒发送一个心跳,session时间tickTime*2
tickTime=2000
#leader与客户端连接超时时间。表示5个心跳间隔
initLimit=5
#Leader与Follower之间的超时时间,表示2个心跳间隔
syncLimit=2
#客户端连接端口,默认端口2181
clientPort=12181
# zookeeper集群配置项,server.1,server.2,server.3是zk集群节点;hadoop-node1,hadoop-node2,hadoop-node3是主机名称;2888是主从通信端口;3888用来选举leader
server.1=hadoop-node1:2888:3888
server.2=hadoop-node2:2888:3888
server.3=hadoop-node3:2888:3888

6)配置Zookeeper myid

# 在hadoop-node1配置如下:
$ echo 1 > $KAFKA_HOME/zookeeper_data/myid
# 在hadoop-node2配置如下:
$ echo 2 > $KAFKA_HOME/zookeeper_data/myid
# 在hadoop-node3配置如下:
$ echo 3 > $KAFKA_HOME/zookeeper_data/myid

7)开启Kafka JMX监控

# 在kafka-server-start.sh文件中添加export JMX_PORT="9988",端口自定义就行
$ vi $KAFKA_HOME/bin/kafka-server-start.sh

kafka datahub_hadoop HA

重启kafka

$ $KAFKA_HOME/bin/kafka-server-stop.sh ; $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

【问题】如遇以下报错:

ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) kafka.common.InconsistentClusterIdException: The Cluster ID ELIH-KKbRP-NnPnHt4z-lA doesn"t match stored clusterId Some(2HC_x7bTR_u2bCxrqw0Otw) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong. at kafka.server.KafkaServer.startup(KafkaServer.scala:228) at kafka.Kafka$.main(Kafka.scala:109) at kafka.Kafka.main(Kafka.scala)

kafka datahub_hadoop HA

【解决】在server.properties找到log.dirs配置的路径。将该路径下的meta.properties文件删除,或者编辑meta.properties文件修改里面的cluster.id即可。

8)将kafka目录推送到其它节点

$ scp -r $KAFKA_HOME hadoop-node2:/opt/bigdata/hadoop/server/
$ scp -r $KAFKA_HOME hadoop-node3:/opt/bigdata/hadoop/server/
# 在hadoop-node2和hadoop-node3节点上设置环境变量
$ vi /etc/profile
export KAFKA_HOME=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1
export PATH=$PATH:$KAFKA_HOME/bin

$ source /etc/profile

# 修改advertised.listeners,值改成对应的hostname或者ip

【温馨提示】修改hadoop-node2上server.properties文件的broker.id,设置为1和2,只要不重复就行,advertised.listeners地址改成对应机器IP。

9)启动服务

启动zookeeper集群之后再启动kafka集群

$ cd $KAFKA_HOME
# -daemon后台启动
$ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
# 默认端口2181,可以在配置自定义,这里修改为12181端口
$ netstat -tnlp|grep 12181

kafka datahub_hadoop HA

启动Kafka

$ cd $KAFKA_HOME
# 默认端口9092,这里修改成了19092,可以修改listeners和advertised.listeners
$ ./bin/kafka-server-start.sh -daemon ./config/server.properties
$ netstat -tnlp|grep 9092
$ jps

五、安装EFAK

1)下载EFAK

$ cd /opt/bigdata/hadoop/software
$ wget https://github.com/smartloli/kafka-eagle-bin/archive/v2.1.0.tar.gz
$ tar -xf kafka-eagle-bin-2.1.0.tar.gz -C /opt/bigdata/hadoop/server/
$ cd /opt/bigdata/hadoop/server/
$ tar -xf 

2)创建数据库

$ mysql -uroot -p 
123456

create database ke;

2)设置环境变量

$ vi /etc/profile
export KE_HOME=/opt/bigdata/hadoop/server/kafka-eagle-bin-2.1.0/efak-web-2.1.0
export PATH=$PATH:$KE_HOME/bin

$ source /etc/profile

3)配置

这里设置hadoop-node1为master节点,其它两个节点为slave节点,修改参数

$ vi $KE_HOME/conf/system-config.properties
# Multi zookeeper&kafka cluster list -- The client connection address of the Zookeeper cluster is set here
efak.zk.cluster.alias=cluster1
cluster1.zk.list=hadoop-node1:12181,hadoop-node2:12181,hadoop-node3:12181

######################################
# kafka jmx 地址,默认Apache发布的Kafka基本是这个默认值,
# 对于一些公有云Kafka厂商,它们会修改这个值,
# 比如会将jmxrmi修改为kafka或者是其它的值,
# 若是选择的公有云厂商的Kafka,可以根据实际的值来设置该属性
######################################
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi

# Zkcli limit -- Zookeeper cluster allows the number of clients to connect to
# If you enable distributed mode, you can set value to 4 or 8
kafka.zk.limit.size=16

# EFAK webui port -- WebConsole port access address
efak.webui.port=8048

######################################
# EFAK enable distributed,启用分布式部署
######################################
efak.distributed.enable=true

# 设置节点类型slave or master
# master worknode set status to master, other node set status to slave
efak.cluster.mode.status=master
# deploy efak server address
efak.worknode.master.host=hadoop-node1
efak.worknode.port=8085

# Kafka offset storage -- Offset stored in a Kafka cluster, if stored in the zookeeper, you can not use this option
cluster1.efak.offset.storage=kafka

# Whether the Kafka performance monitoring diagram is enabled
efak.metrics.charts=true

# EFAK keeps data for 30 days by default
efak.metrics.retain=15

# If offset is out of range occurs, enable this property -- Only suitable for kafka sql
efak.sql.fix.error=false
efak.sql.topic.records.max=5000

# Delete kafka topic token -- Set to delete the topic token, so that administrators can have the right to delete
efak.topic.token=keadmin


# 关闭自带的sqlite数据库,使用外部的mysql数据库
# Default use sqlite to store data
# efak.driver=org.sqlite.JDBC
# It is important to note that the "/hadoop/kafka-eagle/db" path must be exist.
# efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
# efak.username=root
# efak.password=smartloli

# 配置外部数据库
# (Optional) set mysql address
efak.driver=com.mysql.jdbc.Driver
efak.url=jdbc:mysql://hadoop-node1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=123456

4)调整启动参数

EFAK默认启动内存大小为2G,考虑到服务器情况可以将其调小

## 在 efak 安装目录执行
$ vi $KE_HOME/bin/ke.sh
## 将 KE_JAVA_OPTS 最大最小容量调小,例如:
export KE_JAVA_OPTS="-server -Xmx512m -Xms512m -XX:MaxGCPauseMillis=20 -XX:+UseG1GC -XX:MetaspaceSize=128m -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"

6)修改 works配置

默认是localhost

$  cat >$KE_HOME/conf/works<<EOF
hadoop-node2
hadoop-node3
EOF

7)将EFAK推送其它节点

# hadoop-node1推送
$ scp -r /opt/bigdata/hadoop/server/kafka-eagle-bin-2.1.0 hadoop-node2:/opt/bigdata/hadoop/server/
$ scp -r /opt/bigdata/hadoop/server/kafka-eagle-bin-2.1.0 hadoop-node3:/opt/bigdata/hadoop/server/
# 在hadoop-node2和hadoop-node3配置环境变量修改节点类型为slave

8)启动

上面配置文件配置的是分布式部署,当然也可以单机跑,但是不建议单机跑

# 在master节点上执行
$ cd $KE_HOME/bin
$ chmod +x ke.sh 
# 单机版启动
$ ke.sh start
# 集群方式启动
$ ke.sh cluster start
$ ke.sh cluster restart

kafka datahub_hadoop HA

web UI:http://192.168.0.113:8048/
账号密码:admin/123456

kafka datahub_hadoop HA
kafka datahub_hadoop HA

六、简单使用

1)Kafka CLI简单使用

【增】添加topic

# 创建topic,1副本,1分区,设置数据过期时间72小时(-1表示不过期),单位ms,72*3600*1000=259200000
$ kafka-topics.sh --create --topic test002 --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092  --partitions 1 --replication-factor 1 --config retention.ms=259200000

【查】

# 查看topic列表
$ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --list
# 查看topic列表详情
$ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --describe
# 指定topic
$ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --describe --topic test002
# 查看消费者组
$ kafka-consumer-groups.sh --bootstrap-server hadoop-node1:9092 --list
$ kafka-consumer-groups.sh --bootstrap-server hadoop-node1:9092 --describe  --group test002

【改】这里主要是修改最常用的三个参数:分区、副本,过期时间

# 修改分区,扩分区,不能减少分区
$ kafka-topics.sh --alter --bootstrap-server hadoop-node1:9092 --topic test002 --partitions 2
# 修改过期时间,下面两行都可以
$ kafka-configs.sh --bootstrap-server hadoop-node1:9092 --alter --topic test002 --add-config retention.ms=86400000
$ kafka-configs.sh --bootstrap-server hadoop-node1:9092 --alter --entity-name test002 --entity-type topics --add-config retention.ms=86400000

# 修改副本数,将副本数修改成3
$ cat >1.json<<EOF
{"version":1,
"partitions":[
{"topic":"test002","partition":0,"replicas":[0,1,2]},
{"topic":"test002","partition":1,"replicas":[1,2,0]},
{"topic":"test002","partition":2,"replicas":[2,0,1]}
]}
EOF
$ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --describe --topic test002

kafka datahub_hadoop HA

【删】

$ kafka-topics.sh --delete --topic test002 --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092

【生成者】

$ kafka-console-producer.sh --broker-list hadoop-node1:9092 --topic test002
{"id":"1","name":"n1","age":"20"}
{"id":"2","name":"n2","age":"21"}
{"id":"3","name":"n3","age":"22"}

【消费者】

# 从头开始消费
$ kafka-console-consumer.sh --bootstrap-server hadoop-node1:9092 --topic test002 --from-beginning
# 指定从分区的某个位置开始消费,这里只指定了一个分区,可以多写几行或者遍历对应的所有分区
$ kafka-console-consumer.sh --bootstrap-server hadoop-node1:9092 --topic test002 --partition 0 --offset 100

【消费组】

$ kafka-console-consumer.sh --bootstrap-server hadoop-node1:9092 --topic test002 --group test002

【查看数据积压】

$ kafka-consumer-groups.sh --bootstrap-server hadoop-node1:9092 --describe --group test002

kafka datahub_hadoop HA

2)EFAK常用命令

$KE_HOME/bin/ke.sh启动脚本中包含以下命令:

命令 描述
ke.sh start 启动 EFAK 服务器。
ke.sh status 查看 EFAK 运行状态。
ke.sh stop 停止 EFAK 服务器。
ke.sh restart 重新启动 EFAK 服务器。
ke.sh stats 查看 linux 操作系统中的 EFAK 句柄数。
ke.sh find [ClassName] 在 jar 中找到类名的位置。
ke.sh gc 查看 EFAK 进程 gc。
ke.sh version 查看 EFAK 版本。
ke.sh jdk 查看 EFAK 安装的 jdk 详细信息。
ke.sh sdate 查看 EFAK 启动日期。
ke.sh cluster start 查看 EFAK 集群分布式启动。
ke.sh cluster status 查看 EFAK 集群分布式状态。
ke.sh cluster stop 查看 EFAK 集群分布式停止。
ke.sh cluster restart 查看 EFAK 集群分布式重启。

EFAK环境部署,和kafka的一些简单操作就到这里了,后续会分享KSQL和其它更详细的页面化操作,请小伙伴耐心等待~

原文地址:https://www.cnblogs.com/liugp/archive/2022/05/26/16307589.html

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/5202.html

(0)
上一篇 2023-05-19
下一篇 2023-05-19

相关推荐

  • 优化Linux环境变量,加速Python程序

    优化Linux环境变量,加速Python程序Linux作为一种高效的操作系统,可以提供快速的运行环境。但是,随着用户数量和数据量的增加,大量的环境变量会降低系统性能,并使进程的启动速度变慢。可以通过以下方法来优化环境变量:

    2024-01-02
    104
  • mysql 8 安装整理「终于解决」

    mysql 8 安装整理「终于解决」1、下载地址 https://dev.mysql.com/downloads/mysql/ 参考文章 1、https://www.cnblogs.com/520BigBear/p/12764499….

    2023-03-28
    155
  • 远程工具 批量管理程序[亲测有效]

    远程工具 批量管理程序[亲测有效]远程工具 批量管理程序远程桌面是微软公司为了便于网络管理员管理维护服务器推出的一项服务。从windows 2000 server版本开始引入,网络管理员时候远程桌面连接器连接到网络任意一台开启了远程…

    2023-02-23
    144
  • 2022 IDC中国未来企业大奖优秀奖颁布,华为云数据库助力德邦快递获奖「终于解决」

    2022 IDC中国未来企业大奖优秀奖颁布,华为云数据库助力德邦快递获奖「终于解决」摘要:华为云数据库助力德邦快递打造的“基于数智融合的一站式物流供应链平台”项目从500多个项目中脱颖而出,荣获2022 IDC中国未来企业大奖优秀奖“未来智能领军者”。 本文分享自华为云社区《华为云数

    2023-06-06
    141
  • Python Library中心

    Python Library中心Python是一种能够处理从简单到复杂的大量数据的高级编程语言。它已经成为数据科学、人工智能、机器学习和Web开发行业中最为流行的编程语言之一。Python拥有一个强大的社区,为其提供了数百个库和模块。本文将介绍Python的核心库和最流行的第三方库,以及它们在不同应用程序领域中的特性和用途。

    2024-09-19
    14
  • Datahub新版本0.9.1更新,列级别数据血缘功能发布![亲测有效]

    Datahub新版本0.9.1更新,列级别数据血缘功能发布![亲测有效]大家好,我是独孤风。 近期Datahub进行了一次大的版本更新,从0.9版本以后Datahub也正式发布了列级别数据血缘的功能。 0.9.1版本又增加了,列的影响分析这个功能。 这样Datahub对于

    2023-06-13
    148
  • 用Python执行Shell命令的方法

    用Python执行Shell命令的方法在进行一次编码工作时,有时候需要在Python代码中执行Shell命令。这个功能对于需要调用系统命令的开发者非常重要。Python提供了很多方法来执行Shell命令,这篇文章将会介绍多种可以使用的方法,并为读者提供实践代码以帮助理解。

    2024-04-19
    70
  • Cassandra简单介绍和二进制安装[亲测有效]

    Cassandra简单介绍和二进制安装[亲测有效]Cassandra简介: Apache Cassandra最初由Facebook开发,用于储存收件箱等简单格式数据,集GoogleBigTable的数据模型与Amazon Dynamo的完全分布式的架

    2023-02-11
    151

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注