大家好,我是考100分的小小码 ,祝大家学习进步,加薪顺利呀。今天说一说spark sql语法_可理解性输入,希望您对编程的造诣更进一步.
上一篇说到,在Spark 2.x当中,实际上SQLContext和HiveContext是过时的,相反是采用SparkSession对象的sql函数来操作SQL语句的。使用这个函数执行SQL语句前需要先调用DataFrame的createOrReplaceTempView注册一个临时表,所以关键是先要将RDD转换成DataFrame。实际上,在Spark中实际声明了
type DataFrame = Dataset[Row]
代码100分
所以,DataFrame是Dataset[Row]的别名。RDD是提供面向低层次的API,而DataFrame/Dataset提供面向高层次的API(适合于SQL等面向结构化数据的场合)。
下面提供一些Spark SQL程序的例子。
例子一:SparkSQLExam.scala
代码100分 1 package bruce.bigdata.spark.example 2 3 import org.apache.spark.sql.Row 4 import org.apache.spark.sql.SparkSession 5 import org.apache.spark.sql.types._ 6 7 object SparkSQLExam { 8 9 case class offices(office:Int,city:String,region:String,mgr:Int,target:Double,sales:Double) 10 11 def main(args: Array[String]) { 12 13 val spark = SparkSession 14 .builder 15 .appName("SparkSQLExam") 16 .getOrCreate() 17 18 runSparkSQLExam1(spark) 19 runSparkSQLExam2(spark) 20 21 spark.stop() 22 23 } 24 25 26 private def runSparkSQLExam1(spark: SparkSession): Unit = { 27 28 import spark.implicits._ 29 30 val rddOffices=spark.sparkContext.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split(" ")).map(p=>offices(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble)) 31 val officesDataFrame = spark.createDataFrame(rddOffices) 32 33 officesDataFrame.createOrReplaceTempView("offices") 34 spark.sql("select city from offices where region="Eastern"").map(t=>"City: " + t(0)).collect.foreach(println) 35 36 37 } 38 39 private def runSparkSQLExam2(spark: SparkSession): Unit = { 40 41 import spark.implicits._ 42 import org.apache.spark.sql._ 43 import org.apache.spark.sql.types._ 44 45 val schema = new StructType(Array(StructField("office", IntegerType, false), StructField("city", StringType, false), StructField("region", StringType, false), StructField("mgr", IntegerType, true), StructField("target", DoubleType, true), StructField("sales", DoubleType, false))) 46 val rowRDD = spark.sparkContext.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split(" ")).map(p => Row(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble)) 47 val dataFrame = spark.createDataFrame(rowRDD, schema) 48 49 dataFrame.createOrReplaceTempView("offices2") 50 spark.sql("select city from offices2 where region="Western"").map(t=>"City: " + t(0)).collect.foreach(println) 51 52 } 53 54 }
使用下面的命令进行编译:
[root@BruceCentOS4 scala]# scalac SparkSQLExam.scala
在编译之前,需要在CLASSPATH中增加路径:
export CLASSPATH=$CLASSPATH:$SPARK_HOME/jars/*:$(/opt/hadoop/bin/hadoop classpath)
然后打包成jar文件:
[root@BruceCentOS4 scala]# jar -cvf spark_exam_scala.jar bruce
然后通过spark-submit提交程序到yarn集群执行,为了方便从客户端查看结果,这里采用yarn cient模式运行。
[root@BruceCentOS4 scala]# $SPARK_HOME/bin/spark-submit –class bruce.bigdata.spark.example.SparkSQLExam –master yarn –deploy-mode client spark_exam_scala.jar
运行结果截图:
例子二:SparkSQLExam.scala(需要启动hive metastore)
1 package bruce.bigdata.spark.example 2 3 import org.apache.spark.sql.{SaveMode, SparkSession} 4 5 object SparkHiveExam { 6 7 def main(args: Array[String]) { 8 9 val spark = SparkSession 10 .builder() 11 .appName("Spark Hive Exam") 12 .config("spark.sql.warehouse.dir", "/user/hive/warehouse") 13 .enableHiveSupport() 14 .getOrCreate() 15 16 import spark.implicits._ 17 18 //使用hql查看hive数据 19 spark.sql("show databases").collect.foreach(println) 20 spark.sql("use orderdb") 21 spark.sql("show tables").collect.foreach(println) 22 spark.sql("select city from offices where region="Eastern"").map(t=>"City: " + t(0)).collect.foreach(println) 23 24 //将hql查询出的数据保存到另外一张新建的hive表 25 //找出订单金额超过1万美元的产品 26 spark.sql("""create table products_high_sales(mfr_id string,product_id string,description string) 27 ROW FORMAT DELIMITED FIELDS TERMINATED BY " " LINES TERMINATED BY " " STORED AS TEXTFILE""") 28 spark.sql("""select mfr_id,product_id,description 29 from products a inner join orders b 30 on a.mfr_id=b.mfr and a.product_id=b.product 31 where b.amount>10000""").write.mode(SaveMode.Overwrite).saveAsTable("products_high_sales") 32 33 //将HDFS文件数据导入到hive表中 34 spark.sql("""CREATE TABLE IF NOT EXISTS offices2 (office int,city string,region string,mgr int,target double,sales double ) 35 ROW FORMAT DELIMITED FIELDS TERMINATED BY " " LINES TERMINATED BY " " STORED AS TEXTFILE""") 36 spark.sql("LOAD DATA INPATH "/user/hive/warehouse/orderdb.db/offices/offices.txt" INTO TABLE offices2") 37 38 spark.stop() 39 } 40 }
使用下面的命令进行编译:
[root@BruceCentOS4 scala]# scalac SparkHiveExam.scala
使用下面的命令打包:
[root@BruceCentOS4 scala]# jar -cvf spark_exam_scala.jar bruce
使用下面的命令运行:
[root@BruceCentOS4 scala]# $SPARK_HOME/bin/spark-submit –class bruce.bigdata.spark.example.SparkHiveExam –master yarn –deploy-mode client spark_exam_scala.jar
程序运行结果:
另外上述程序运行后,hive中多了2张表:
例子三:spark_sql_exam.py
代码100分 1 from __future__ import print_function 2 3 from pyspark.sql import SparkSession 4 from pyspark.sql.types import * 5 6 7 if __name__ == "__main__": 8 spark = SparkSession 9 .builder 10 .appName("Python Spark SQL exam") 11 .config("spark.some.config.option", "some-value") 12 .getOrCreate() 13 14 schema = StructType([StructField("office", IntegerType(), False), StructField("city", StringType(), False), 15 StructField("region", StringType(), False), StructField("mgr", IntegerType(), True), 16 StructField("Target", DoubleType(), True), StructField("sales", DoubleType(), False)]) 17 18 rowRDD = spark.sparkContext.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(lambda p: p.split(" ")) 19 .map(lambda p: (int(p[0].strip()), p[1], p[2], int(p[3].strip()), float(p[4].strip()), float(p[5].strip()))) 20 21 dataFrame = spark.createDataFrame(rowRDD, schema) 22 dataFrame.createOrReplaceTempView("offices") 23 spark.sql("select city from offices where region="Eastern"").show() 24 25 spark.stop()
执行命令运行程序:
[root@BruceCentOS4 spark]# $SPARK_HOME/bin/spark-submit –master yarn –deploy-mode client spark_sql_exam.py
程序运行结果:
例子四:JavaSparkSQLExam.java
1 package bruce.bigdata.spark.example; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 6 import org.apache.spark.api.java.JavaRDD; 7 import org.apache.spark.api.java.function.Function; 8 import org.apache.spark.api.java.function.MapFunction; 9 import org.apache.spark.sql.Dataset; 10 import org.apache.spark.sql.Row; 11 import org.apache.spark.sql.RowFactory; 12 import org.apache.spark.sql.SparkSession; 13 import org.apache.spark.sql.types.DataTypes; 14 import org.apache.spark.sql.types.StructField; 15 import org.apache.spark.sql.types.StructType; 16 import org.apache.spark.sql.AnalysisException; 17 18 19 public class JavaSparkSQLExam { 20 public static void main(String[] args) throws AnalysisException { 21 SparkSession spark = SparkSession 22 .builder() 23 .appName("Java Spark SQL exam") 24 .config("spark.some.config.option", "some-value") 25 .getOrCreate(); 26 27 List<StructField> fields = new ArrayList<>(); 28 fields.add(DataTypes.createStructField("office", DataTypes.IntegerType, false)); 29 fields.add(DataTypes.createStructField("city", DataTypes.StringType, false)); 30 fields.add(DataTypes.createStructField("region", DataTypes.StringType, false)); 31 fields.add(DataTypes.createStructField("mgr", DataTypes.IntegerType, true)); 32 fields.add(DataTypes.createStructField("target", DataTypes.DoubleType, true)); 33 fields.add(DataTypes.createStructField("sales", DataTypes.DoubleType, false)); 34 35 StructType schema = DataTypes.createStructType(fields); 36 37 38 JavaRDD<String> officesRDD = spark.sparkContext() 39 .textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt", 1) 40 .toJavaRDD(); 41 42 JavaRDD<Row> rowRDD = officesRDD.map((Function<String, Row>) record -> { 43 String[] attributes = record.split(" "); 44 return RowFactory.create(Integer.valueOf(attributes[0].trim()), attributes[1], attributes[2], Integer.valueOf(attributes[3].trim()), Double.valueOf(attributes[4].trim()), Double.valueOf(attributes[5].trim())); 45 }); 46 47 Dataset<Row> dataFrame = spark.createDataFrame(rowRDD, schema); 48 49 dataFrame.createOrReplaceTempView("offices"); 50 Dataset<Row> results = spark.sql("select city from offices where region="Eastern""); 51 results.collectAsList().forEach(r -> System.out.println(r)); 52 53 spark.stop(); 54 } 55 }
编译打包后通过如下命令执行:
[root@BruceCentOS4 spark]# $SPARK_HOME/bin/spark-submit –class bruce.bigdata.spark.example.JavaSparkSQLExam –master yarn –deploy-mode client spark_exam_java.jar
运行结果:
上面是一些关于Spark SQL程序的一些例子,分别采用了Scala/Python/Java来编写的。另外除了这三种语言,Spark还支持R语言编写程序,因为我自己也不熟悉,就不举例了。不管用什么语言,其实API都是基本一致的,主要是采用DataFrame和Dataset的高层次API来调用和执行SQL。使用这些API,可以轻松的将结构化数据转化成SQL来操作,同时也能够方便的操作Hive中的数据。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/10758.html