返回顶部
首页 > 资讯 > 数据库 >理解Spark SQL(三)—— Spark SQL程序举例
  • 668
分享到

理解Spark SQL(三)—— Spark SQL程序举例

理解SparkSQL(三)——SparkSQL程序举例 2020-09-04 18:09:04 668人浏览 绘本
摘要

上一篇说到,在spark 2.x当中,实际上sqlContext和HiveContext是过时的,相反是采用SparkSession对象的sql函数来操作SQL语句的。使用这个函数执行SQL语句前需要先调用DataFrame的createO

上一篇说到,在spark 2.x当中,实际上sqlContext和HiveContext是过时的,相反是采用SparkSession对象的sql函数来操作SQL语句的。使用这个函数执行SQL语句前需要先调用DataFrame的createOrReplaceTempView注册一个临时表,所以关键是先要将RDD转换成DataFrame。实际上,在Spark中实际声明了

type DataFrame = Dataset[Row]

所以,DataFrame是Dataset[Row]的别名。RDD是提供面向低层次的api,而DataFrame/Dataset提供面向高层次的API(适合于SQL等面向结构化数据的场合)。

下面提供一些Spark SQL程序的例子。

例子一:SparkSQLExam.Scala

 1 package bruce.bigdata.spark.example
 2 
 3 import org.apache.spark.sql.Row
 4 import org.apache.spark.sql.SparkSession
 5 import org.apache.spark.sql.types._
 6 
 7 object SparkSQLExam {
 8 
 9     case class offices(office:Int,city:String,region:String,mgr:Int,target:Double,sales:Double)
10     
11     def main(args: Array[String]) {
12 
13         val spark = SparkSession
14           .builder
15           .appName("SparkSQLExam")
16           .getOrCreate()
17         
18         runSparkSQLExam1(spark)
19         runSparkSQLExam2(spark)
20         
21         spark.stop()
22     
23     }
24     
25     
26     private def runSparkSQLExam1(spark: SparkSession): Unit = {
27     
28         import spark.implicits._
29         
30         val rddOffices=spark.sparkContext.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("	")).map(p=>offices(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble))
31         val officesDataFrame = spark.createDataFrame(rddOffices)
32         
33         officesDataFrame.createOrReplaceTempView("offices")
34         spark.sql("select city from offices where region="Eastern"").map(t=>"City: " + t(0)).collect.foreach(println)
35         
36     
37     }
38     
39     private def runSparkSQLExam2(spark: SparkSession): Unit = {
40     
41          import spark.implicits._
42          import org.apache.spark.sql._
43          import org.apache.spark.sql.types._
44         
45          val schema = new StructType(Array(StructField("office", IntegerType, false), StructField("city", StringType, false), StructField("region", StringType, false), StructField("mgr", IntegerType, true), StructField("target", DoubleType, true), StructField("sales", DoubleType, false)))
46          val rowRDD = spark.sparkContext.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("	")).map(p => Row(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble))
47          val dataFrame = spark.createDataFrame(rowRDD, schema)
48          
49          dataFrame.createOrReplaceTempView("offices2")        
50          spark.sql("select city from offices2 where region="Western"").map(t=>"City: " + t(0)).collect.foreach(println)
51         
52     }
53     
54 }

使用下面的命令进行编译:

[root@BruceCentos4 scala]# scalac SparkSQLExam.scala

在编译之前,需要在CLASSPATH中增加路径:

export CLASSPATH=$CLASSPATH:$SPARK_HOME/jars/*:$(/opt/hadoop/bin/hadoop classpath)

然后打包成jar文件:

[root@BruceCentOS4 scala]# jar -cvf spark_exam_scala.jar bruce

然后通过spark-submit提交程序到yarn集群执行,为了方便从客户端查看结果,这里采用yarn cient模式运行。

[root@BruceCentOS4 scala]# $SPARK_HOME/bin/spark-submit --class bruce.bigdata.spark.example.SparkSQLExam --master yarn --deploy-mode client spark_exam_scala.jar

运行结果截图:

 

例子二:SparkSQLExam.scala(需要启动hive metastore)

 1 package  bruce.bigdata.spark.example
 2 
 3 import org.apache.spark.sql.{SaveMode, SparkSession}
 4 
 5 object SparkHiveExam {
 6 
 7     def main(args: Array[String]) {
 8         
 9         val spark = SparkSession
10           .builder()
11           .appName("Spark Hive Exam")
12           .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
13           .enableHiveSupport()
14           .getOrCreate()
15        
16         import spark.implicits._
17         
18         //使用hql查看hive数据
19         spark.sql("show databases").collect.foreach(println)
20         spark.sql("use orderdb")
21         spark.sql("show tables").collect.foreach(println)
22         spark.sql("select city from offices where region="Eastern"").map(t=>"City: " + t(0)).collect.foreach(println)
23         
24         //将hql查询出的数据保存到另外一张新建的hive表
25         //找出订单金额超过1万美元的产品
26         spark.sql("""create table products_high_sales(mfr_id string,product_id string,description string) 
27                    ROW FORMAT DELIMITED FIELDS TERMINATED BY "	" LINES TERMINATED BY "
" STORED AS TEXTFILE""")
28         spark.sql("""select mfr_id,product_id,description
29                    from products a inner join orders b
30                    on a.mfr_id=b.mfr and a.product_id=b.product
31                    where b.amount>10000""").write.mode(SaveMode.Overwrite).saveAsTable("products_high_sales")
32         
33         //hdfs文件数据导入到hive表中            
34         spark.sql("""CREATE TABLE IF NOT EXISTS offices2 (office int,city string,region string,mgr int,target double,sales double ) 
35                    ROW FORMAT DELIMITED FIELDS TERMINATED BY "	" LINES TERMINATED BY "
" STORED AS TEXTFILE""")
36         spark.sql("LOAD DATA INPATH "/user/hive/warehouse/orderdb.db/offices/offices.txt" INTO TABLE offices2")
37         
38         spark.stop()
39     }
40 }

使用下面的命令进行编译:

[root@BruceCentOS4 scala]# scalac SparkHiveExam.scala

使用下面的命令打包:

[root@BruceCentOS4 scala]# jar -cvf spark_exam_scala.jar bruce

使用下面的命令运行:

[root@BruceCentOS4 scala]# $SPARK_HOME/bin/spark-submit --class bruce.bigdata.spark.example.SparkHiveExam --master yarn --deploy-mode client spark_exam_scala.jar

程序运行结果:

 

另外上述程序运行后,hive中多了2张表:

 

 

例子三:spark_sql_exam.py

 1 from __future__ import print_function
 2 
 3 from pyspark.sql import SparkSession
 4 from pyspark.sql.types import *
 5 
 6 
 7 if __name__ == "__main__":
 8     spark = SparkSession 
 9         .builder 
10         .appName("python Spark SQL exam") 
11         .config("spark.some.config.option", "some-value") 
12         .getOrCreate()
13 
14     schema = StructType([StructField("office", IntegerType(), False), StructField("city", StringType(), False), 
15         StructField("region", StringType(), False), StructField("mgr", IntegerType(), True), 
16         StructField("Target", DoubleType(), True), StructField("sales", DoubleType(), False)])
17         
18     rowRDD = spark.sparkContext.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(lambda p: p.split("	")) 
19         .map(lambda p: (int(p[0].strip()), p[1], p[2], int(p[3].strip()), float(p[4].strip()), float(p[5].strip())))
20             
21     dataFrame = spark.createDataFrame(rowRDD, schema)
22     dataFrame.createOrReplaceTempView("offices")
23     spark.sql("select city from offices where region="Eastern"").show()
24     
25     spark.stop()

 执行命令运行程序:

[root@BruceCentOS4 spark]# $SPARK_HOME/bin/spark-submit --master yarn --deploy-mode client spark_sql_exam.py

程序运行结果:

 

例子四:JavaSparkSQLExam.java

 1 package bruce.bigdata.spark.example;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 
 6 import org.apache.spark.api.java.JavaRDD;
 7 import org.apache.spark.api.java.function.Function;
 8 import org.apache.spark.api.java.function.MapFunction;
 9 import org.apache.spark.sql.Dataset;
10 import org.apache.spark.sql.Row;
11 import org.apache.spark.sql.RowFactory;
12 import org.apache.spark.sql.SparkSession;
13 import org.apache.spark.sql.types.DataTypes;
14 import org.apache.spark.sql.types.StructField;
15 import org.apache.spark.sql.types.StructType;
16 import org.apache.spark.sql.AnalysisException;
17 
18 
19 public class JavaSparkSQLExam {
20     public static void main(String[] args) throws AnalysisException {
21         SparkSession spark = SparkSession
22           .builder()
23           .appName("Java Spark SQL exam")
24           .config("spark.some.config.option", "some-value")
25           .getOrCreate();    
26         
27         List fields = new ArrayList<>();
28         fields.add(DataTypes.createStructField("office", DataTypes.IntegerType, false));
29         fields.add(DataTypes.createStructField("city", DataTypes.StringType, false));
30         fields.add(DataTypes.createStructField("region", DataTypes.StringType, false));
31         fields.add(DataTypes.createStructField("mgr", DataTypes.IntegerType, true));
32         fields.add(DataTypes.createStructField("target", DataTypes.DoubleType, true));
33         fields.add(DataTypes.createStructField("sales", DataTypes.DoubleType, false));
34         
35         StructType schema = DataTypes.createStructType(fields);
36         
37         
38         JavaRDD officesRDD = spark.sparkContext()
39           .textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt", 1)
40           .toJavaRDD();
41         
42         JavaRDD rowRDD = officesRDD.map((Function) record -> {
43           String[] attributes = record.split("	");
44           return RowFactory.create(Integer.valueOf(attributes[0].trim()), attributes[1], attributes[2], Integer.valueOf(attributes[3].trim()), Double.valueOf(attributes[4].trim()), Double.valueOf(attributes[5].trim()));
45         });
46         
47         Dataset dataFrame = spark.createDataFrame(rowRDD, schema);
48         
49         dataFrame.createOrReplaceTempView("offices");
50         Dataset results = spark.sql("select city from offices where region="Eastern"");
51         results.collectAsList().forEach(r -> System.out.println(r));
52         
53         spark.stop();
54     }
55 }

编译打包后通过如下命令执行:

[root@BruceCentOS4 spark]# $SPARK_HOME/bin/spark-submit --class bruce.bigdata.spark.example.JavaSparkSQLExam --master yarn --deploy-mode client spark_exam_java.jar

运行结果:

 

上面是一些关于Spark SQL程序的一些例子,分别采用了Scala/Python/Java来编写的。另外除了这三种语言,Spark还支持R语言编写程序,因为我自己也不熟悉,就不举例了。不管用什么语言,其实API都是基本一致的,主要是采用DataFrame和Dataset的高层次API来调用和执行SQL。使用这些API,可以轻松的将结构化数据转化成SQL来操作,同时也能够方便的操作Hive中的数据。

 

 

 

 

 

 

 

 

 

 

 

您可能感兴趣的文档:

--结束END--

本文标题: 理解Spark SQL(三)—— Spark SQL程序举例

本文链接: https://lsjlt.com/news/2662.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • 理解Spark SQL(三)—— Spark SQL程序举例
    上一篇说到,在Spark 2.x当中,实际上SQLContext和HiveContext是过时的,相反是采用SparkSession对象的sql函数来操作SQL语句的。使用这个函数执行SQL语句前需要先调用DataFrame的createO...
    99+
    2020-09-04
    理解Spark SQL(三)—— Spark SQL程序举例
  • Spark-Sql入门程序示例详解
    SparkSQL运行架构 Spark SQL对SQL语句的处理,首先会将SQL语句进行解析(Parse),然后形成一个Tree,在后续的如绑定、优化等处理过程都是对Tree的操作,而...
    99+
    2024-04-02
  • springboot集成spark并使用spark-sql的示例详解
    首先添加相关依赖: <xml version="1.0" encoding="UTF-8"> <project xmlns="http://maven.apache...
    99+
    2024-04-02
  • 理解Spark SQL(一)—— CLI和ThriftServer
    Spark SQL主要提供了两个工具来访问hive中的数据,即CLI和ThriftServer。前提是需要Spark支持Hive,即编译Spark时需要带上hive和hive-thriftserver选项,同时需要确保在$SPARK_HOM...
    99+
    2017-01-02
    理解Spark SQL(一)—— CLI和ThriftServer
  • 理解Spark SQL(二)—— SQLContext和HiveContext
    使用Spark SQL,除了使用之前介绍的方法,实际上还可以使用SQLContext或者HiveContext通过编程的方式实现。前者支持SQL语法解析器(SQL-92语法),后者支持SQL语法解析器和HiveSQL语法解析器,默认为Hiv...
    99+
    2017-07-25
    理解Spark SQL(二)—— SQLContext和HiveContext
  • Spark-Sql的示例分析
    这篇文章主要介绍Spark-Sql的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!SparkSQL运行架构Spark SQL对SQL语句的处理,首先会将SQL语句进行解析(Parse),然后形成一个Tree,...
    99+
    2023-06-21
  • Spark Streaming+Spark SQL的数据倾斜示例分析
    这篇文章将为大家详细讲解有关Spark Streaming+Spark SQL的数据倾斜示例分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。1.现象 三台机器都有产生executor,每台...
    99+
    2023-06-03
  • Spark SQL小文件问题处理
    目录1.1、小文件危害1.2、产生小文件过多的原因1.3、如何解决这种小文件的问题呢?1.3.1、调优参数1.1、小文件危害 大量的小文件会影响Hadoop集群管理或者Spark在处理数据时的稳定性: 1.Spark S...
    99+
    2023-04-07
    Spark SQL小文件问题处理 SQL小文件问题处理 Spark处理小文件
  • Spark SQL小文件问题处理
    目录1.1、小文件危害1.2、产生小文件过多的原因1.3、如何解决这种小文件的问题呢?1.3.1、调优参数1.1、小文件危害 大量的小文件会影响Hadoop集群管理或者Spark在处...
    99+
    2023-05-14
    Spark SQL小文件问题处理 SQL小文件问题处理 Spark处理小文件
  • Spark SQL配置及使用教程
    目录SparkSQL版本: SparkSQL DSL语法 SparkSQL和Hive的集成Spark应用依赖第三方jar包文件解决方案        SparkSQL的ThriftS...
    99+
    2024-04-02
  • 什么是扩展Spark SQL解析
    这篇文章主要讲解了“什么是扩展Spark SQL解析”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“什么是扩展Spark SQL解析”吧! 理论基础...
    99+
    2024-04-02
  • Spark SQL小文件问题如何处理
    本篇内容主要讲解“Spark SQL小文件问题如何处理”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Spark SQL小文件问题如何处理”吧!1.1、小文件危害大量的小文件会...
    99+
    2023-07-05
  • 如何使用IDEA开发Spark SQL程序(一文搞懂)
    目录前言Spark SQL是什么1、使用IDEA开发Spark SQL 1.1、指定列名添加Schema1.2、通过StructType指定Schema1.3、反射推断Sc...
    99+
    2024-04-02
  • Spark SQL的整体实现逻辑解析
    1、sql语句的模块解析    当我们写一个查询语句时,一般包含三个部分,select部分,from数据源部分,where限制条件部分,这三部...
    99+
    2024-04-02
  • Spark SQL中的RDD与DataFrame转换实例用法
    这篇文章主要讲解了“Spark SQL中的RDD与DataFrame转换实例用法”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Spark SQL中的RDD与DataFrame转换实例用法”吧...
    99+
    2023-06-02
  • Apache Spark SQL入门及实践的实例分析
    Apache Spark SQL入门及实践的实例分析,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。Apache SparkSQL是一个重要的Spark模块,我们...
    99+
    2023-06-02
  • Spark SQL数据加载和保存的实例分析
    今天就跟大家聊聊有关Spark SQL数据加载和保存的实例分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。一、前置知识详解 Spark SQL重要...
    99+
    2024-04-02
  • 怎么理解spark的自定义分区和排序及spark与jdbc
    这篇文章将为大家详细讲解有关怎么理解spark的自定义分区和排序及spark与jdbc,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。//自定义分区import org.apache...
    99+
    2023-06-02
  • Spark SQL中出现CROSS JOIN 问题该怎么解决
    这篇文章将为大家详细讲解有关Spark SQL中出现CROSS JOIN 问题该怎么解决,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。问题显示如下所示:    ...
    99+
    2023-06-04
  • 详解Spark Sql在UDF中如何引用外部数据
    目录前言场景介绍方法一 Driver端加载方法二 Excutor端加载小结前言 Spark Sql可以通过UDF来对DataFrame的Column进行自定义操作。在特定场景下定义...
    99+
    2023-02-01
    Spark Sql UDF引用外部数据 Spark Sql UDF
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作