大量数据迁移_clickhouse 时序数据

大量数据迁移_clickhouse 时序数据45亿数据迁移记录后续-日数据量千万级别到clickhouse 相关文档地址 flume 参考地址 waterdrop 参考地址 clickhouse 参考地址 kafka 参考地址 环境 日志在一…

45亿数据迁移记录后续-日数据量千万级别到clickhouse

  • 思路2 : 扔掉flume,使用waterdrop直接读取日志,清洗处理,然后直接到clickhous。关于waterdrop处理日志,之前博客,可翻阅进行参考。

    这个方案是错误的,因为我之前并没有正确了解waterdrop,waterdrop并不支持实时数据处理,所有针对日志文件,是不可行的, 虽然我这边已经处理,并且从日志中清洗得到了需要的数据。

  • 思路3 : flume 读取log 日志,清洗得到数据,然后写入kafka,关于kafka 与 clickhouse 的交互,clickhouse 这边已经提供了支持,相关查看,clickhouse 的外部表引擎。然后在clickhouse建议kafka 的引擎,对与kafka进行连接,然后建立物化视图,进行实时消费。

    这个方案是可行的,其中在使用clickhouse de 外部表引擎kafka 的时候,遇到了很多问题,最后查看官方的提问,以及作者的回复,得以解决。

  • 思路4 : flume 读取 数据落在kafka,然后使用waterdrop进行定时的消费kafka,offset 有kafka自动管理进行记录。

    未进行测试验证。但是waterdrop 群里有人做过。

  • 实现
    • flume 的 avro 需要注意相关。
      • flume 发送方的slink 需要填写的是接受方的ip,以及端口。我这边本地发送方是 104 ,接受方式118。 相关配置

        # 定义三大组件的名称
        agent1.sources = source1
        agent1.sinks = sink1
        agent1.channels = channel1
        
        # 配置 source 组件
        agent1.sources.source1.type = exec
        agent1.sources.source1.command = tail -F /home/logs/gps.log
        
        # 将数据流复制给多个channel
        #agent1.sources.source1.selector.type = replicating
        
        # 配置 sink 组件
        agent1.sinks.sink1.type = avro
        agent1.sinks.sink1.hostname = 192.168.108.118
        agent1.sinks.sink1.port = 4141
        agent1.sinks.sink1.request-timeout = 500000
        
        # 配置 channel 组件
        agent1.channels.channel1.type = memory
        agent1.channels.channel1.capacity = 100000
        agent1.channels.channel1.transactionCapacity = 10000
        agent1.channels.channel1.keep.alive = 80
        
        # 给 source 和 sink 绑定 channel
        agent1.sources.source1.channels = channel1
        agent1.sinks.sink1.channel = channel1
        
        agent1.sources.source1.interceptors = filter1 search-replace1
        agent1.sources.source1.interceptors.filter1.type = REGEX_FILTER
        agent1.sources.source1.interceptors.filter1.regex = (接收到的数据为:)
        agent1.sources.source1.interceptors.filter1.excludeEvents = false
        
        #agent1.sources.source1.interceptors.filter2.type = REGEX_FILTER
        #agent1.sources.source1.interceptors.filter2.regex = (接收到的数据为:)
        #agent1.sources.source1.interceptors.filter2.excludeEvents = false
        
        #agent1.sources.source1.interceptors = search-replace1
        agent1.sources.source1.interceptors.search-replace1.type = search_replace
        agent1.sources.source1.interceptors.search-replace1.searchPattern = [^{]*(?=\{)
        agent1.sources.source1.interceptors.search-replace1.replaceString =
        
        

        代码100分

      • 这里的source 就是读取的日志文件地址

      • channel 配置的规则是 capacity > transactionCapacity > batchSize

      • 接收方的配置如下

        代码100分#定义三大组件的名称
        agent1.sources = source1
        agent1.sinks = sink1
        agent1.channels = channel1
        
        
        # 配置 source 组件
        agent1.sources.source1.type = avro
        # 要监听的 hostname 或者IP地址
        agent1.sources.source1.bind = 192.168.108.118
        agent1.sources.source1.port = 4141
        
        
        # 配置 sink 组件
        agent1.sinks.sink1.type = hdfs
        agent1.sinks.sink1.hdfs.path = hdfs://cluster1/user/oracle/%Y-%m-%d
        #Flume在HDFS文件夹下创建新文件的固定前缀
        agent1.sinks.sink1.hdfs.filePrefix = access_log
        #允许打开的最大文件数,如果超过这个数量,最先打开的文件会被关闭
        #agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
        #向 HDFS 写入内容时每次批量操作的 Event 数量
        agent1.sinks.sink1.hdfs.batchSize= 5000
        #文件格式,目前支持: SequenceFile 、 DataStream 、 CompressedStream 。 1. DataStream 不会压缩文件,不需要设置hdfs.codeC 2. CompressedStream 必须设置hdfs.codeC参数
        agent1.sinks.sink1.hdfs.fileType = DataStream
        #文件写入格式
        agent1.sinks.sink1.hdfs.writeFormat = Text
        #替换转义序列时是否使用本地时间戳
        agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
        #Flume在HDFS文件夹下创建新文件的后缀
        #agent1.sinks.sink1.hdfs.fileSuffix = .txt
        #当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件)
        agent1.sinks.sink1.hdfs.rollSize = 0
        #当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
        agent1.sinks.sink1.hdfs.rollInterval = 0
        #当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)
        agent1.sinks.sink1.hdfs.rollCount = 0
        #允许HDFS操作的毫秒数,例如:open,write, flush, close。如果很多HFDS操作超时,这个配置应该增大。
        agent1.sinks.sink1.hdfs.callTimeout = 480000
        #每个HDFS sink的HDFS的IO操作线程数(例如:open,write)
        agent1.sinks.sink1.hdfs.threadsPoolSize = 10
        #在发起一个关闭命令后,HDFS sink必须尝试重命名文件的次数
        agent1.sinks.sink1.hdfs.closeTries = 0
        #在几秒钟之间连续尝试关闭文件
        agent1.sinks.sink1.hdfs.retryInterval = 60
        
        
        # 配置 channel 组件
        agent1.channels.channel1.type = memory
        agent1.channels.channel1.transactionCapacity = 10000
        agent1.channels.channel1.keep.alive = 80
        agent1.channels.channel1.capacity = 100000
        
        
        
        # 给 source 和 sink 绑定 channel
        agent1.sources.source1.channels = channel1
        agent1.sinks.sink1.channel = channel1
        
        
      • 接收方的source 这里是118自己,因为它需要把自己的端口发布出去,114 才可以连接上,进行传输。这里建议填写自己的ip,不要填写localhost以及127.0.0.0

      • 发送读取日志文件的时候,需要注意,是大写的F,注意这里与 linux 的区分。

    • kafka 相关问题以及集群的搭建。
      • 这里的同事之前都搭建好了,直接使用就可以了。相关写入的flume 配置文件如下(这里只是展示 sink1 的配置)

        # 配置 sink 组件
        agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
        #agent1.sinks.sink1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
        agent1.sinks.sink1.kafka.bootstrap.servers = 192.168.1.8.111:9092,192.168.108.112:9092,192.168.108.113:9092,192.168.108.118:9092,192.168.108.119:9092
        # agent1.sinks.sink1.brokerList = master:9092,slave1:9092,slave2:9092
        agent1.sinks.sink1.topic = s
        agent1.sinks.sink1.kafka.flumeBatchSize = 20
        agent1.sinks.sink1.kafka.producer.acks = 1
        agent1.sinks.sink1.kafka.producer.linger.ms = 1
        #agent1.sinks.sink1.kafka.compression.type = snappy
        
        
      • kafka 相关命令以及原理,参考上面的文档地址

    • clickhouse 相关
      • kafka 的外部表引擎

        代码100分CREATE TABLE kafka_structure_test (encrypt String,date DateTime,lon String,lat String,vec1 String,vec2 String,vec3 String,direction String,altitude String,state String,alarm String,vehicleNo String,vehicleColor String,id String,createBy String,createDt DateTime) ENGINE = Kafka SETTINGS kafka_broker_list = "192.168.108.118:9092,192.168.108.119:9092",  kafka_topic_list = "wl_vehicle_data_clean",  kafka_group_name = "wl_vehicle_data_up", kafka_format = "JSONEachRow", kafka_row_delimiter = "
        ", kafka_num_consumers = 1,kafka_max_block_size = 500;
        
        
      • kafka 引擎相关文档参考 文档地址

      • 物化视图建立

        CREATE MATERIALIZED VIEW consumer TO  t_plt_vehicle_location_test  AS  select id,encrypt,date as up_date,lon,createBy as create_by,createDt as create_dt,lat,vec1,vec2,vec3,direction,altitude,state,alarm, vehicleNo as vehicleno,vehicleColor as vehiclecolor from  kafka_structure_test ;
        
      • clickhouse 的测试表建立

        create table t_plt_vehicle_location_test
        (
        id String default "MSG0",
        encrypt String default "0",
        up_date DateTime default "1970-01-01 00:00:01",
        lon String default -1,
        create_by String default "UP_EXG_MSG_REAL_LOCATION",
        create_dt DateTime default now() ,
        lat String default -1,
        vec1 String default -1,
        vec2 String default -1,
        vec3 String default -1,
        direction String default -1,
        altitude String default -1,
        state String default -1,
        alarm String default -1,
        vehicleno String default "-1",
        vehiclecolor String default "-1",
        alarm_code  String default "-1"
        )
        ENGINE = MergeTree() partition by toYYYYMM(up_date) ORDER BY
        (vehicleno,up_date) SETTINGS index_granularity = 8192
        
      • 我们这边在使用clickhouse 的kafka 引擎的时候,遇到了一个问题,kafka 引擎连接上kafka 以后,隔一段时间就自动掉线,连接中断无法消费,后面查看github 的相关提问,发现这是clickhouse 的相关bug, 修复好是在 ClickHouse 19.13.2.19 版本。相关链接

      • 后续其他问题,也可以在上面进行查看。报错查看clickhouse 的日志。或者在kafka 查看消费者消费的offset 即可看到 kafka 引擎是否还在连接kafka 进行消费。

  • 相关问题
    • clickhouse 的问题,clickhouse 相关对于kafka 的支持相关还不是很稳定,我们这边查看升级的版本也是19年年底才修复的问题。
    • 遇到小的问题,别人都没遇到的问题,请在三仔细查看,因为那多半是你犯了基础的低级错误,而别人没犯。所以你问别人,别人多半都不知道。
    • 细心,在细心。遇到错误,不要相信之前的逻辑判断。从新进行分析梳理。一点点的验证。
  • 版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
    转载请注明出处: https://daima100.com/10174.html

    (0)
    上一篇 2023-01-24 11:00
    下一篇 2023-01-24

    相关推荐

    • Jupyter安装方法

      Jupyter安装方法Jupyter是一款支持多种编程语言的交互式计算环境,可以帮助用户轻松地将代码、文本、图像、视频等多种形式的内容融合在一起,进行数据分析和数据可视化。

      2024-05-11
      78
    • Spyder安装第三方库

      Spyder安装第三方库Spyder是一个开源的Python开发环境,旨在帮助研究人员和程序员更有效地编写Python代码。Spyder集成了许多有用的工具和库,如IPython控制台、文件编辑器、查找和替换功能、调试器和可视化工具。

      2024-06-25
      51
    • mysql整理_MySQL架构

      mysql整理_MySQL架构1.在 数据库内如何让自动增加字段从0开始 2.表中有A B C三列时,用SQL语句实现:当A列大于B列时选择A列,否则选择B列,当B列大于C列时选择B,否则选择C列 3. 是一个 存储系统。支持五种

      2023-02-10
      186
    • Python字典:快速索引数据,实现高效运算

      Python字典:快速索引数据,实现高效运算Python字典是一种灵活且高效的数据结构,可以用于存储和操作键-值对,其中键必须是唯一的,且不可变的数据类型(例如字符串、数字、元组)。字典是Python标准库中的内置类型之一,使用花括号{}表示。

      2023-12-16
      108
    • 如何打开Python?

      如何打开Python?Python是一种高级编程语言,已经成为数据科学、机器学习、Web开发以及许多其他领域中最受欢迎的编程语言之一。如果你想要开始使用Python,首先要打开Python。在这篇文章中,我们将介绍如何打开Python,以及如何使用Python的解释器和IDE。

      2024-04-19
      72
    • 远程连接centos图形界面_xshell远程服务器可视化界面

      远程连接centos图形界面_xshell远程服务器可视化界面安装Xmanager 5 下载链接:https://pan.baidu.com/s/1JwBk3UB4ErIDheivKv4-NA 提取码:cw04 双击xmgr5_wm.exe进行安装 点击‘下一…

      2023-03-26
      161
    • Python bytes转str方法详解

      Python bytes转str方法详解在Python中,bytes和str是两种最基本的数据类型,它们经常在文件 I/O 或网络传输过程中使用。在这些操作中,bytes类型用于表示二进制数据,而str类型则用于表示文本数据。

      2024-08-18
      29
    • mysql全局变量和局部变量「建议收藏」

      mysql全局变量和局部变量「建议收藏」全局变量和局部变量 在服务器启动时,会将每个全局变量初始化为其默认值(可以通过命令行或选项文件中指定的选项更改这些默认值)。然后服务器还为每个连接的客户端维护一组会话变量,客户端的会话变量在连接时使用

      2022-12-16
      123

    发表回复

    您的电子邮箱地址不会被公开。 必填项已用*标注