理解Spark SQL(二)—— SQLContext和HiveContext

理解Spark SQL(二)—— SQLContext和HiveContext使用Spark SQL,除了使用之前介绍的方法,实际上还可以使用SQLContext或者HiveContext通过编程的方式实现。前者支持SQL语法解析器(SQL-92语法),后者支持SQL语法解析器

使用Spark SQL,除了使用之前介绍的方法,实际上还可以使用SQLContext或者HiveContext通过编程的方式实现。前者支持SQL语法解析器(SQL-92语法),后者支持SQL语法解析器和HiveSQL语法解析器,默认为HiveSQL语法解析器,用户可以通过配置切换成SQL语法解析器来运行HiveQL不支持的语法,如:select 1。实际上HiveContext是SQLContext的子类,因此在HiveContext运行过程中除了override的函数和变量,可以使用和SQLContext一样的函数和变量。

因为spark-shell工具实际就是运行的scala程序片段,为了方便,下面采用spark-shell进行演示。

首先来看SQLContext,因为是标准SQL,可以不依赖于Hive的metastore,比如下面的例子(没有启动hive metastore):

[root@BruceCentOS4 ~]# $SPARK_HOME/bin/spark-shell –master yarn –conf spark.sql.catalogImplementation=in-memory

 

 scala> case class offices(office:Int,city:String,region:String,mgr:Int,target:Double,sales:Double)
defined class offices

scala> val rddOffices=sc.textFile(“/user/hive/warehouse/orderdb.db/offices/offices.txt”).map(_.split(” “)).map(p=>offices(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble))
rddOffices: org.apache.spark.rdd.RDD[offices] = MapPartitionsRDD[3] at map at <console>:26

scala> val officesDataFrame = spark.createDataFrame(rddOffices)
officesDataFrame: org.apache.spark.sql.DataFrame = [office: int, city: string … 4 more fields]

scala> officesDataFrame.createOrReplaceTempView(“offices”)

scala> spark.sql(“select city from offices where region=”Eastern””).map(t=>”City: ” + t(0)).collect.foreach(println)
City: NewYork                                                                   
City: Chicago
City: Atlanta

scala>

 执行上面的命令后,实际上在yarn集群中启动了一个yarn client模式的Spark Application,然后在scala>提示符后输入的语句会生成RDD的transformation,最后一条命令中的collect会生成RDD的action,即会触发Job的提交和程序的执行。

命令行中之所以加上–conf spark.sql.catalogImplementation=in-memory选项,是因为spark-shell中的默认启动的SparkSession对象spark是默认支持Hive的,不带这个选项启动的话,程序就会去连接hive metastore,因为这里并没有启动hive metastore,因此程序在执行createDataFrame函数时会报错。

程序中的第一行是1个case class语句,这里是定义后面的数据文件的模式的(定义模式除了这个方法,其实还有另外一种方法,后面再介绍)。第二行从hdfs中读取一个文本文件,并工通过map映射到了模式上面。第三行基于第二行的RDD生成DataFrame,第四行基于第三行的DataFrame注册了一个逻辑上的临时表,最后一行就可以通过SparkSession的sql函数来执行sql语句了。

实际上,SQLContext是Spark 1.x中的SQL入口,在Spark 2.x中,使用SparkSession作为SQL的入口,但是为了向后兼容,Spark 2.x仍然支持SQLContext来操作SQL,不过会提示deprecated,所以上面的例子是采用Spark 2.x中的写法。

实际上还有另外一种方法来操作SQL,针对同样的数据,例如:

scala> import org.apache.spark.sql._
import org.apache.spark.sql._

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> val schema = new StructType(Array(StructField(“office”, IntegerType, false), StructField(“city”, StringType, false), StructField(“region”, StringType, false), StructField(“mgr”, IntegerType, true), StructField(“target”, DoubleType, true), StructField(“sales”, DoubleType, false)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(office,IntegerType,false), StructField(city,StringType,false), StructField(region,StringType,false), StructField(mgr,IntegerType,true), StructField(target,DoubleType,true), StructField(sales,DoubleType,false))

scala> val rowRDD = sc.textFile(“/user/hive/warehouse/orderdb.db/offices/offices.txt”).map(_.split(” “)).map(p => Row(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:30

scala> val dataFrame = spark.createDataFrame(rowRDD, schema)
dataFrame: org.apache.spark.sql.DataFrame = [office: int, city: string … 4 more fields]

scala> dataFrame.createOrReplaceTempView(“offices”)

scala> spark.sql(“select city from offices where region=”Eastern””).map(t=>”City: ” + t(0)).collect.foreach(println)
City: NewYork                                                                   
City: Chicago
City: Atlanta

这个例子与之前的例子有一些不同,主要的地方有3个:

1. 之前的例子是采用case class定义模式,Spark采用反射来推断Schema;而这个例子采用StructType类型的对象来定义模式,它接收一个数组,数组成员是StructField对象,代表一个字段的定义,每个字段的定义由字段名称、字段类型和是否允许为空组成;

2. 对于代表数据的RDD,之前的例子是直接用case class定义的类型来分割字段,而这个例子是用的Row类型;

3. 在使用createDataFrame函数生成DataFrame时,该函数的参数不一样,之前的例子只要传入RDD对象即可(对象中隐含了模式),而这个例子需要同时传入RDD和定义的schema;

实际编程中建议采用第二种方法,因为其更加灵活,schema信息可以不必是写死的,而是可以在程序运行的过程中生成。

 

下面接着来看HiveContext的用法,使用HiveContext之前需要确保:

  • 使用的Spark是支持Hive的;
  • Hive的配置文件hive-site.xml已经在Spark的conf目录下;
  • hive metastore已经启动;

举例说明:

首先启动hive metastore:

[root@BruceCentOS ~]# nohup hive –service metastore &

然后仍然通过spark-shell来举例说明,启动spark-shell,如下所示:

[root@BruceCentOS4 ~]# $SPARK_HOME/bin/spark-shell –master yarn

scala> spark.sql(“show databases”).collect.foreach(println)
[default]
[orderdb]

scala> spark.sql(“use orderdb”)
res2: org.apache.spark.sql.DataFrame = []

scala> spark.sql(“show tables”).collect.foreach(println)
[orderdb,customers,false]
[orderdb,offices,false]
[orderdb,orders,false]
[orderdb,products,false]
[orderdb,salesreps,false]

scala> spark.sql(“select city from offices where region=”Eastern””).map(t=>”City: ” + t(0)).collect.foreach(println)
City: NewYork                                                                   
City: Chicago
City: Atlanta

scala>

可以看到这次启动spark-shell没有带上最后那个选项,这是因为这里我们打算用HiveContext来操作Hive中的数据,需要支持Hive。前面说过spark-shell是默认开启了Hive支持的。同SQLContext类似,Spark 2.x中也不需要再用HiveContext对象来操作SQL了,直接用SparkSession对象来操作就好了。可以看到这里可以直接操作表,不用再定义schema,这是因为schema是由外部的hive metastore定义的,spark通过连接到hive metastore来读取表的schema信息,因此这里能直接操作SQL。

 

另外,除了上面的使用SQLContext操作普通文件(需要额外定义模式)和使用HiveContext操作Hive表数据(需要开启hive metastore)之外,SQLContext还能操作JSON、PARQUET等文件,由于这两种数据文件自己带了模式信息,因此可以直接基于文件创建DataFrame,例如:

scala> val df = spark.read.json(“file:///opt/spark/examples/src/main/resources/people.json”)
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]                

scala> df.createOrReplaceTempView(“people”)

scala> spark.sql(“select name,age from people where age>19″).map(t=>”Name :” + t(0) + “, Age: ” + t(1)).collect.foreach(println)
Name :Andy, Age: 30    

 

最后来看下DataFrame的另一种叫做DSL(Domain Specific Language)的用法。

scala> val df = spark.read.json(“file:///opt/spark/examples/src/main/resources/people.json”)
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]                

scala> df.show()
+—-+——-+                                                                  
| age|   name|
+—-+——-+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+—-+——-+

scala> df.select(“name”).show()
+——-+                                                                       
|   name|
+——-+
|Michael|
|   Andy|
| Justin|
+——-+

scala> df.select(df(“name”), df(“age”) + 1).show()
+——-+———+                                                             
|   name|(age + 1)|
+——-+———+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+——-+———+

scala> df.filter(df(“age”) > 21).show()
+—+—-+
|age|name|
+—+—-+
| 30|Andy|
+—+—-+

scala> df.groupBy(“age”).count().show()
+—-+—–+                                                                    
| age|count|
+—-+—–+
|  19|    1|
|null|    1|
|  30|    1|
+—-+—–+

scala>

以上是对Spark SQL的SQLContext和HiveContext基本用法的一些总结,都是采用spark-shell工具举的例子。实际上由于spark-shell是运行scala程序片段的工具,上述例子完全可以改成独立的应用程序。我将在下一篇博文当中尝试使用Scala、Java和Python来编写独立的程序来操作上面的示例hive数据库orderdb,可以适当使用一些较为复杂的SQL来统计分析数据。

 

 

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/10832.html

(0)
上一篇 2022-12-21
下一篇 2022-12-21

相关推荐

  • redis持久化存储[亲测有效]

    redis持久化存储[亲测有效]redis持久化存储

    2023-03-17
    158
  • Python字符串格式化:让输出更加美观优雅

    Python字符串格式化:让输出更加美观优雅Python中的字符串格式化是一种非常强大的工具,它可以将字符串中的变量替换为具体的值,并且可以格式化输出,使得输出的结果更加美观和优雅。本文将从多个方面对Python字符串格式化进行介绍和阐述。

    2024-01-01
    123
  • 优化代码流程的小技巧——Python中的elif语句

    优化代码流程的小技巧——Python中的elif语句在Python中,我们经常需要根据一定的条件来执行相应的代码,而多个条件的情况下,我们常常会使用if-else语句来完成不同的分支判断。但是,如果条件分支较多的情况下,嵌套的if-else语句将会使程序难以理解和维护,这时,我们可以使用Python中的elif语句,来优化代码流程,提高程序的可读性和可维护性。

    2024-01-01
    106
  • MySQL主从半同步复制「建议收藏」

    MySQL主从半同步复制「建议收藏」 一、半同步复制 1.半同步复制概念 从MYSQL5.5开始,支持半自动复制。之前版本的MySQL Replication都是异步(asynchronous)的,主库在执行完一些事务后, 是不…

    2023-03-27
    139
  • 使用Python的Tkinter创建丰富的用户界面

    使用Python的Tkinter创建丰富的用户界面在Python中,Tkinter是最流行的GUI库之一。Tkinter使得在Python中创建图形化用户界面变得非常简单。它使用Tk GUI工具包,该工具包最初是为Tcl语言编写的,但现在也可用于其他编程语言。Tkinter除了可以用于创建基本的GUI组件之外,还可以创建复杂的、高度可定制的GUI组件,如表格、treeview和编辑器。在本文中,我们将介绍如何使用Tkinter创建具有丰富用户界面的Python应用程序。

    2023-12-20
    110
  • 用python取证调查(python向上取证)

    用python取证调查(python向上取证)链接:

    2023-10-30
    131
  • 用一行Python代码将列表内容打印成字符串

    用一行Python代码将列表内容打印成字符串Python中的join方法可以将列表中的元素拼接成字符串,而不需要使用循环。我们可以直接使用join将列表内容打印出来。

    2024-03-28
    73
  • 如何在Python中安装tkinter模块

    如何在Python中安装tkinter模块在Python中,Tkinter是最常用的用户图形界面(GUI)编程模块之一。Tkinter是Python自带的一个GUI模块,它提供了用户界面上常用的组件。Tkinter是一个跨平台的图形用户界面(GUI)模块。如果您正在学习Python的GUI编程,那么Tkinter将是您的良好选择。在本文中,我们将学习关于如何在Python中安装Tkinter模块的方法。

    2024-08-19
    26

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注