SparkStreaming两种方式连接Flume

SparkStreaming两种方式连接FlumeSparkStreaming 连接Flume的两种方式分别为:Push(推)和Pull(拉)的方式实现,以Spark Streaming的角度来看,Push方式属于推送(由Flume向Spark推送数

SparkStreaming两种方式连接Flume

SparkStreaming 连接Flume的两种方式分别为:Push(推)和Pull(拉)的方式实现,以Spark Streaming的角度来看,Push方式属于推送(由Flume向Spark推送数据);而Pull属于拉取(Spark 拉取 Flume的输出数据);

 Flume向SparkStreaming推送数据没有研究明白,有大佬指点一下吗?

万分感谢!

1.Spark拉取Flume数据:

导入两个jar包到flume/lib下

SparkStreaming两种方式连接Flume

 否则抛出这两个异常:

org.apache.flume.FlumeException: Unable to load sink type: org.apache.spark.streaming.flume.sink.SparkSink, class: org.apache.spark.streaming.flume.sink.SparkSink

java.lang.IllegalStateException: begin() called when transaction is OPEN!

2.编写flume 工作文件:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/home/zhuzhu/apps/flumeSpooding
a1.sources.r1.fileHeader=true

# Describe the sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
# 当前主机端口
a1.sinks.k1.hostname = 192.168.137.88
a1.sinks.k1.port = 9999

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.编写SparkStreaming程序:

package day02

import java.net.InetSocketAddress

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @ClassName: StreamingFlume
 * @Description TODO 实时监控flume,统计flume数据产生,是Spark
 * @Author: Charon 
 * @Date: 2021/4/7 13:19
 * @Version 1.0
 **/
object StreamingFlume {

  def main(args: Array[String]): Unit = {
    //1.创建SparkConf对象
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamingFlume")
    //2.创建SparkContext对象
    val sc = new SparkContext(conf)
    //设置日志输出格式,只打印异常日志,在这里设置没有用
    //sc.setLogLevel("WARN")
    //3.创建StreamingContext,Seconds(5):轮询机制,多久执行一次
    val ssc = new StreamingContext(sc, Seconds(5))
    //4.定义一个flume集合,可以接受多个flume数据,多个用,隔开需要new
    val addresses = Seq(new InetSocketAddress("127.0.0.1", 5555))
    //5.获取flume中的数据,
    val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK_2)
    // 6.截取flume数据:{"header":xxxxx   "body":xxxxxx}
    val lineDstream: DStream[String] = stream.map(x => new String(x.event.getBody.array()))
    lineDstream.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey(_+_).print()
    ssc.start()
    ssc.awaitTermination()
  }
}

 4。开启flume监控文件,开启SparkStreaming程序:

向指定目录上传文件

SparkStreaming两种方式连接Flume

 

 

SparkStreaming两种方式连接Flume

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/6420.html

(0)
上一篇 2023-04-10
下一篇 2023-04-10

相关推荐

  • SpringMVC使用Redis共享session

    SpringMVC使用Redis共享session在使用之前,请确认项目已经整合了Redis一、加入依赖 org.springframework.session spring-session-data-redis 二、加入注解在加了@SpringBo

    2022-12-23
    237
  • 使用Python replace替换多个字符

    使用Python replace替换多个字符Python是一门强大的编程语言,它拥有很多操作字符串的内置函数,其中replace()函数就是其中之一。replace()函数可以将字符串中的一个字符替换为另一个字符。但是当我们需要替换多个字符时,我们该怎么做呢?在本文中,我们将介绍如何使用Python replace()函数替换多个字符。

    2024-05-12
    75
  • 数据库中Truncate、Delete、Drop区别

    数据库中Truncate、Delete、Drop区别TRUNCATE 用法: TRUNCATE TABLE 表名 DDL语句,删除内容、释放空间,保留表结构。删除表数据,不能删除行数据。 DELETE 用法: DELETE TABLE 表名 WHER…

    2023-04-09
    180
  • Redis基础命令「建议收藏」

    Redis基础命令「建议收藏」Redis基础命令 Redis数据结构介绍 redis是一个key-value的数据库,key一般是String类型,但是value的类型多种多样 前五种是基本类型: String:Hello Wor

    2023-06-18
    158
  • Jupyter安装方法

    Jupyter安装方法Jupyter是一款支持多种编程语言的交互式计算环境,可以帮助用户轻松地将代码、文本、图像、视频等多种形式的内容融合在一起,进行数据分析和数据可视化。

    2024-05-11
    77
  • 使用Python ttk来实现GUI界面快速开发

    使用Python ttk来实现GUI界面快速开发Graphical User Interface, 简称 GUI,是指采用图形方式显示应用程序的程序界面,用户可以通过鼠标、键盘等与应用程序进行直接交互。

    2024-01-14
    107
  • springboot web项目中配置Mybatis[通俗易懂]

    springboot web项目中配置Mybatis[通俗易懂]只是先创建了一个带web功能的springboot项目(创建步骤省略。。。) 没有任何WEB代码,只是在主配置类中打印ok。run时发现:Failed to configure a DataSour…

    2023-02-03
    155
  • Python图像二值化

    Python图像二值化a href=”https://beian.miit.gov.cn/”苏ICP备2023018380号-1/a Copyright www.python100.com .Some Rights Reserved.

    2024-05-09
    67

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注