实时计算框架:Spark集群搭建与入门案例

实时计算框架:Spark集群搭建与入门案例Spark是专为大规模数据处理而设计的,基于内存快速通用,可扩展的集群计算引擎,实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流,运算速度相比于MapReduce得到了显著的提高。

实时计算框架:Spark集群搭建与入门案例

一、Spark概述

1、Spark简介

Spark是专为大规模数据处理而设计的,基于内存快速通用,可扩展的集群计算引擎,实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流,运算速度相比于MapReduce得到了显著的提高。

2、运行结构

实时计算框架:Spark集群搭建与入门案例

Driver

运行Spark的Applicaion中main()函数,会创建SparkContext,SparkContext负责和Cluster-Manager进行通信,并负责申请资源、任务分配和监控等。

ClusterManager

负责申请和管理在WorkerNode上运行应用所需的资源,可以高效地在一个计算节点到数千个计算节点之间伸缩计算,目前包括Spark原生的ClusterManager、ApacheMesos和HadoopYARN。

Executor

Application运行在WorkerNode上的一个进程,作为工作节点负责运行Task任务,并且负责将数据存在内存或者磁盘上,每个 Application都有各自独立的一批Executor,任务间相互独立。

二、环境部署

1、Scala环境

安装包管理

[root@hop01 opt]# tar -zxvf scala-2.12.2.tgz
[root@hop01 opt]# mv scala-2.12.2 scala2.12

配置变量

[root@hop01 opt]# vim /etc/profile

export SCALA_HOME=/opt/scala2.12
export PATH=$PATH:$SCALA_HOME/bin

[root@hop01 opt]# source /etc/profile

版本查看

[root@hop01 opt]# scala -version

Scala环境需要部署在Spark运行的相关服务节点上。

2、Spark基础环境

安装包管理

[root@hop01 opt]# tar -zxvf spark-2.1.1-bin-hadoop2.7.tgz
[root@hop01 opt]# mv spark-2.1.1-bin-hadoop2.7 spark2.1

配置变量

[root@hop01 opt]# vim /etc/profile

export SPARK_HOME=/opt/spark2.1
export PATH=$PATH:$SPARK_HOME/bin

[root@hop01 opt]# source /etc/profile

版本查看

[root@hop01 opt]# spark-shell

实时计算框架:Spark集群搭建与入门案例

3、Spark集群配置

服务节点

[root@hop01 opt]# cd /opt/spark2.1/conf/
[root@hop01 conf]# cp slaves.template slaves
[root@hop01 conf]# vim slaves

hop01
hop02
hop03

环境配置

[root@hop01 conf]# cp spark-env.sh.template spark-env.sh
[root@hop01 conf]# vim spark-env.sh

export JAVA_HOME=/opt/jdk1.8
export SCALA_HOME=/opt/scala2.12
export SPARK_MASTER_IP=hop01
export SPARK_LOCAL_IP=安装节点IP
export SPARK_WORKER_MEMORY=1g
export HADOOP_CONF_DIR=/opt/hadoop2.7/etc/hadoop

注意SPARK_LOCAL_IP的配置。

4、Spark启动

依赖Hadoop相关环境,所以要先启动。

启动:/opt/spark2.1/sbin/start-all.sh
停止:/opt/spark2.1/sbin/stop-all.sh

这里在主节点会启动两个进程:Master和Worker,其他节点只启动一个Worker进程。

5、访问Spark集群

默认端口是:8080。

http://hop01:8080/

实时计算框架:Spark集群搭建与入门案例

运行基础案例:

[root@hop01 spark2.1]# cd /opt/spark2.1/
[root@hop01 spark2.1]# bin/spark-submit --class org.apache.spark.examples.SparkPi --master local examples/jars/spark-examples_2.11-2.1.1.jar

运行结果:Pi is roughly 3.1455357276786384

三、开发案例

1、核心依赖

依赖Spark2.1.1版本:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.1.1</version>
</dependency>

引入Scala编译插件:

<plugin>
    <groupId>net.alchim31.maven</groupId>
    <artifactId>scala-maven-plugin</artifactId>
    <version>3.2.2</version>
    <executions>
        <execution>
            <goals>
                <goal>compile</goal>
                <goal>testCompile</goal>
            </goals>
        </execution>
    </executions>
</plugin>

2、案例代码开发

读取指定位置的文件,并输出文件内容单词统计结果。

@RestController
public class WordWeb implements Serializable {

    @GetMapping("/word/web")
    public String getWeb (){
        // 1、创建Spark的配置对象
        SparkConf sparkConf = new SparkConf().setAppName("LocalCount")
                                             .setMaster("local[*]");

        // 2、创建SparkContext对象
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        sc.setLogLevel("WARN");

        // 3、读取测试文件
        JavaRDD lineRdd = sc.textFile("/var/spark/test/word.txt");

        // 4、行内容进行切分
        JavaRDD wordsRdd = lineRdd.flatMap(new FlatMapFunction() {
            @Override
            public Iterator call(Object obj) throws Exception {
                String value = String.valueOf(obj);
                String[] words = value.split(",");
                return Arrays.asList(words).iterator();
            }
        });

        // 5、切分的单词进行标注
        JavaPairRDD wordAndOneRdd = wordsRdd.mapToPair(new PairFunction() {
            @Override
            public Tuple2 call(Object obj) throws Exception {
                //将单词进行标记:
                return new Tuple2(String.valueOf(obj), 1);
            }
        });

        // 6、统计单词出现次数
        JavaPairRDD wordAndCountRdd = wordAndOneRdd.reduceByKey(new Function2() {
            @Override
            public Object call(Object obj1, Object obj2) throws Exception {
                return Integer.parseInt(obj1.toString()) + Integer.parseInt(obj2.toString());
            }
        });

        // 7、排序
        JavaPairRDD sortedRdd = wordAndCountRdd.sortByKey();
        List<Tuple2> finalResult = sortedRdd.collect();

        // 8、结果打印
        for (Tuple2 tuple2 : finalResult) {
            System.out.println(tuple2._1 + " ===> " + tuple2._2);
        }

        // 9、保存统计结果
        sortedRdd.saveAsTextFile("/var/spark/output");
        sc.stop();
        return "success" ;
    }
}

打包执行结果:

实时计算框架:Spark集群搭建与入门案例

查看文件输出:

[root@hop01 output]# vim /var/spark/output/part-00000

四、源代码地址

GitHub·地址
https://github.com/cicadasmile/big-data-parent
GitEE·地址
https://gitee.com/cicadasmile/big-data-parent

实时计算框架:Spark集群搭建与入门案例

阅读标签

【Java基础】【设计模式】【结构与算法】【Linux系统】【数据库】

【分布式架构】【微服务】【大数据组件】【SpringBoot进阶】【Spring&Boot基础】

【数据分析】【技术导图】【 职场】

技术系列

OLAP引擎:Druid组件进行数据统计分析

OLAP引擎:Presto组件跨数据源分析

OLAP引擎:ClickHouse高性能列式查询

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/6232.html

(0)
上一篇 2023-04-14
下一篇 2023-04-14

相关推荐

  • mysql系列(十)——Mysql常见的工具

    mysql系列(十)——Mysql常见的工具一、压力测试工具——【mysqlslap】 二、 数据库表物理结构修改工具——【pt-online-schema-change】 三、慢查询分析工具—— 【mysqldumpslow】 四、慢查询分…

    2023-03-20
    154
  • MySQL必知存储引擎「建议收藏」

    MySQL必知存储引擎「建议收藏」Mysql存储引擎 1.MyISAM MySQL 5.0 之前的默认数据库引擎,最为常用。拥有较高的插入,查询速度,但不支持事务. 2.InnoDB事务型数据库的首选引擎,支持ACID事务,支持行级锁

    2022-12-30
    167
  • isleap用法python(c语言isleap什么意思)

    isleap用法python(c语言isleap什么意思)直接使用python calender模块即可。

    2023-10-31
    147
  • 如何设置Python在Linux中的环境变量

    如何设置Python在Linux中的环境变量在Linux中,环境变量是一些用于存储系统和用户信息的变量。一些常见的环境变量包括 PATH,HOME 和 SHELL 等。

    2024-03-17
    87
  • Python中的list clear方法

    Python中的list clear方法在Python编程语言中,list是使用最广泛的数据类型之一。Python中的list对象类似于其他编程语言中的数组。Python中的list可以存储各种类型的数据,包括字符串、数字、对象等等。这些数据可以通过list的方法来进行增加、删除、修改和查询等操作。其中,list clear方法可以清空一个list,使得它变为一个空list,接下来我们将详细介绍Python中的list clear方法。

    2024-07-08
    45
  • python使用apscheduler遇到错误:SQLAlchemyJobStore requires SQLAlchemy installed[亲测有效]

    python使用apscheduler遇到错误:SQLAlchemyJobStore requires SQLAlchemy installed[亲测有效]
    英文:SQLAlchemyJobStore requires SQLAlchemy installed 翻译下:SQLAlchemyJobStore需要安装…

    2023-04-05
    154
  • Python读取JSON文件并生成标题

    Python读取JSON文件并生成标题在Python开发中,读取JSON文件并且生成可视化的标题是常见的需求。JSON文件作为一种轻量级的数据交换格式,它易于人们理解和编写,同时易于计算机解析和生成,因此在很多场景下,JSON文件得到了广泛的应用。Python是一种功能强大且易学易用的编程语言,它提供了强大的JSON解析功能,并且支持生成各种可视化效果,本文将介绍如何使用Python读取JSON文件,并生成h1标题的方法。

    2024-04-22
    56
  • cloudera hadoop大数据平台实战指南_Hadoop

    cloudera hadoop大数据平台实战指南_Hadoop一、概述 在众多 Hadoop 版本中, CDH(Cloudera Hadoop) 是 Hadoop 众多分支中比较出色的版本, 它由Cloudera 发行和维护。CDH 基于 Apache 的 Ha

    2023-05-17
    141

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注