irpas技术客

06-SparkSQL_被数据处理的蜗牛_spark sql

未知 1598

1.spark sql 1.1.spark sql概述

官网地址:http://spark.apache.org/sql/

1.1.1.什么是spark sql

spark sql是spark用来处理结构化数据的一个模块,它提供了一个编程抽象DataFrame,作为分布式SQL查询的引擎,它是将spark sql转换成RDD,然后提交到集群中去运行,执行效率非常快。支持多种使用方式:SQL、DataFrame API、DataSet API。

相比于spark RDD API,spark sql包含了对结构化数据和在其上运算的更多信息,spark sql使用这些信息进行了额外的优化,对结构化数据的操作更加高效和方便。

1.1.2.spark sql优点

易整合

将sql查询与spark程序无缝整合。可以使用java、scala、python、R等语言的API操作。

统一的数据访问方式

以相同的方式连接到任何数据源。

兼容Hive

支持hive SQL的语法。

标准的数据连接

可以使用行业标准的JDBC或ODBC进行连接

六字诀: 易用、兼容、标准

1.2.DataFrame 1.2.1.什么是DataFrame

在spark中,DataFrame是一种以RDD为基础的分布式数据集,它的前身是SchemaRDD,类似于传统数据库的二维表格。

从Spark 1.3.0开始由SchemaRDD更名为DataFrame。它与SchemaRDD的主要区别:DataFrame不再直接继承自RDD,而是自己实现了RDD的绝大多数功能。可以在DataFrame上调用rdd方法将其转换为一个RDD。

DataFrame带有Schema元数据信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型,底层做了更多的优化。可以通过多种方式构建DataFrame:已经存在的RDD、结构化文件、外部数据库、Hive表。

dataframe记录了对应列的名称和类型

dataFrame引入schema和off-heap(使用操作系统层面上的内存) 1、解决了RDD的缺点 序列化和反序列化开销大频繁的创建和销毁对象造成大量的GC 2、丢失了RDD的优点 RDD编译时进行类型检查RDD具有面向对象编程的特性 1.2.2.DataFrame与RDD区别

RDD是分布式的对象的集合,Spark并不知道对象的详细模式信息

DataFrame是分布式的Row对象的集合,其提供了由列组成的详细模式信息

DataFrame还引入了off-heap,意味着JVM堆以外的内存, 这些内存直接受操作系统管理(而不是JVM)

DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是减少数据读取以及执行计划的优化,提升执行效 率。易用性更好

1.2.3.DataFrame与RDD的优缺点 RDD优缺点 优点: 编译时类型安全面向对象的编程风格 缺点: 序列化和反序列化的性能开销大GC的性能开销大 DataFrame优缺点 优点: DataFrame引入了schema,存储了结构化数据的元数据信息,操作方便DataFrame引入了off-heap(非堆内存),解决了RDD 中GC性能开销大的问题 缺点: DataFrame不是类型安全的DataFrame不是面向对象的 1.2.4.如何创建DataFrame

在spark2.0版本之前,spark sql中SqlContext是创建DataFrame和执行SQL的入口。利用hiveContext通过hive sql语句操作hive表数据,兼容hive操作,并且hiveContext继承自SQLContext。

在spark2.0之后,这些都统一于SparkSession,SparkSession 封装了SparkContext,SqlContext,通过SparkSession可以获取到SparkConetxt,SqlContext对象。

启动spark-shell:(node03)

#cd /export/servers/spark-2.0.2-bin-hadoop2.7/bin spark-shell --master local[2]

1.2.4.1.读取文本文件创建DataFrame

1.准备数据文件

people.txt:文件内容有三列(id、name、age)

1 zhangsan 20 2 lisi 29 3 wangwu 25 4 zhaoliu 30 5 tianqi 35 6 kobe 40

2.加载数据文件、关联样例类

#在spark-shell中加载数据文件 val lineRDD= sc.textFile("file:///export/servers/testdata/people.txt").map(_.split(" ")) #定义People样例类 case class People(id:Int,name:String,age:Int) #将lineRDD和样例类People关联 val peopleRDD=lineRDD.map(x=>People(x(0).toInt,x(1),x(2).toInt))

3.将peopleRDD转换成DataFrame

val peopleDF=peopleRDD.toDF

4.显示DataFrame中数据:

peopleDF.show

5.显示DataFrame中schema信息:

peopleDF.printSchema

6.直接通过SparkSession创建DataFrame

val dataFrame=spark.read.text("file:///export/servers/testdata/people.txt")

1.2.4.2.读取json文件创建DataFrame

1.准备数据文件

使用spark安装包下提供了数据文件:

#文件路径 $SPARK_HOME/examples/src/main/resources/people.json #加载数据文件 val jsonDF=spark.read.json("file:///export/servers/spark-2.0.2-bin-hadoop2.7/examples/src/main/resources/people.json")

1.2.4.3.读取parquet列式存储格式文件创建DataFrame

1.准备数据文件

使用spark安装包下提供了数据文件:

#文件路径 $SPARK_HOME/examples/src/main/resources/users.parquet #加载数据文件 val parquetDF= spark.read.parquet("file:///export/servers/spark-2.0.2-bin-hadoop2.7/examples/src/main/resources/users.parquet")

细节:关于SparkSession读取文件的相关方法

输入**spark.read.**按tab键,自动提示

1.3.DataFrame常用操作 1.3.1.DSL风格语法

说明:DSL(Domain Specific Language)特定领域语言,是一种为了特定任务而设计的开发语言。比如XSLT、Html等。

1.3.1.1.启动spark-shell spark-shell --master local[2]

1.3.1.2.查看DataFrame完整内容 #加载数据文件 val rdd1= sc.textFile("file:///export/servers/testdata/people.txt").map(_.split(" ")) #定义样例类 case class People(id:Int,name:String,age:Int) #将rdd与样例类进行关联 val rdd2=rdd1.map(x=>People(x(0).toInt,x(1),x(2).toInt)) #将rdd2转换成DataFrame val peopleDF=rdd2.toDF

#查看peopleDF中的完整内容 peopleDF.show

1.3.1.3.查看DataFrame部分列内容 peopleDF.select("name").show peopleDF.select(col("name")).show peopleDF.select($"name").show peopleDF.select(peopleDF.col("name")).show

1.3.1.4.查看DataFrame部分列内容,在列上执行操作 peopleDF.select("name","age").show peopleDF.select(col("name"),col("age")).show

#取出年龄列数据,并且执行+1操作 peopleDF.select(col("name"),col("age"),col("age")+1).show

1.3.1.5查看DataFrame满足条件的内容 #查看年龄大于等于30的用户 peopleDF.filter(col("age")>=30).show

1.3.1.6.执行统计操作 #统计用户记录数 peopleDF.count

1.3.1.7.执行分组统计操作 #按照年龄列进行分组统计 peopleDF.groupBy("age").count.show

1.3.1.8.打印DataFrame的schema信息 peopleDF.printSchema

1.3.2.SQL风格语法

说明:DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,可以通过在程序中使用spark.sql() 来执行SQL语句查询,结果返回一个DataFrame。

使用SQL风格的语法,需要将DataFrame注册成表,采用如下的方式:

DataFrame.registerTempTable("表名称") 1.3.2.1.将DataFrame注册成表 peopleDF.registerTempTable("t_people")

1.3.2.2.查询年龄最大的前两名 spark.sql("select * from t_people order by age desc limit 2").show

1.3.2.3.查询年龄大于30的人信息 spark.sql("select * from t_people where age>30").show

1.3.2.4.显示表的schema信息 spark.sql("desc t_people").show

1.4.DataSet 1.4.1.什么是DataSet

DataSet是分布式的数据集合,提供了强类型支持,是在RDD的每行数据加了类型约束。DataSet是在spark1.6中添加的新的接口,它集中了RDD的优点和spark sql优化的执行引擎。

DataSet包含了DataFrame的功能,DataFrame表示为DataSet[Row],即DataSet的子集。在spark2.0中两者已经统一。

1.4.2.RDD、DataFrame、DataSet区别

RDD:

DataFrame:

DataSet:

**细节:**DataSet融合了RDD和DataFrame两者的优点。

DataSet中,数据有类型信息(DataFrame的优点)DataSet会在编译时检查类型(RDD的优点)DataSet是面向对象编程的接口(RDD的优点) 1.4.3.DataFrame与DataSet转换 1.4.3.1.DataFrame转换为DataSet DataFrame.as[ElementType] 1.4.3.2.DataSet转换为DataFrame DataSet.toDF() 1.4.4.创建DataSet 1.4.4.1.DataFrame转换生成DataSet #加载数据文件 val df1= spark.read.json("file:///export/servers/spark-2.0.2-bin-hadoop2.7/examples/src/main/resources/people.json") #定义样例类 case class Person(age:BigInt,name:String) #通过as[类型]转换成DataSet val ds1=df1.as[Person]

1.4.4.2.通过toDS方法生成DataSet #定义样例类 case class Person(id:Long,name:String,age:Long) #定义集合数据 val data=List(Person(1,"zhangsan",18),Person(2,"lisi",28)) #通过toDs方法转换成DataSet val ds2=data.toDS

1.4.4.3.spark.createDataSet方法创建DataSet #从已经存在的scala集合中构建DataSet val ds3=spark.createDataset(1 to 5) #从已经存在的rdd构建DataSet val rdd4=sc.textFile("file:///export/servers/testdata/people.txt") val ds4=spark.createDataset(rdd4)

2.编程方式执行spark sql查询 2.1.编写spark sql程序将RDD转换成DataFrame 2.1.1.通过反射推断schema方式

scala支持使用case class类型导入RDD转换为DataFrame,通过case class创建schema,case class的参数名称会被利用反射机制作为列名。这种RDD可以高效的转换为DataFrame并注册为表。

2.1.1.1.创建项目 2.1.1.2.导入依赖 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.liny</groupId> <artifactId>spark-teach-day06-01project</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <properties> <scala.version>2.11.8</scala.version> <spark.version>2.0.2</spark.version> </properties> <!--配置依赖--> <dependencies> <!--scala依赖--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!--spark依赖--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> </dependencies> <!--配置插件--> <build> <plugins> <!--scala编译插件--> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <!-- java 编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project> 2.1.1.3.编写代码

scala版本:

package cn.liny.rdd.df import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} /** * 学习通过反射推断方式,把RDD转换成DataFrame(scala版本) */ // 定义样例类 case class People(id:Int,name:String,age:Int) object RddToDFScala { // 执行入口 def main(args: Array[String]): Unit = { // 1.创建SparkSession对象 val spark: SparkSession = SparkSession.builder().appName("RddToDFScala").master("local[2]").getOrCreate() // 2.从SparkSession中,获取SparkContext对象 val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") // 3.加载数据文件 val peopleRDD: RDD[String] = sc.textFile("D:\\02teach\\02resources\\0322\\04bigdata\\testdata\\people.txt") // 4.切分每一行记录 val splitRDD: RDD[Array[String]] = peopleRDD.map(x=>x.split(" ")) // 5.将RDD与样例类进行关联 val casePeopleRDD: RDD[People] = splitRDD.map(x=>People(x(0).toInt,x(1),x(2).toInt)) // 6.通过toDF方法把RDD转换成DataFrame /** * 细节: * 调用toDF方法,需要手动导入隐士转换 */ import spark.implicits._ val peopleDF: DataFrame = casePeopleRDD.toDF // 7.通过DSL方式操作DataFrame==============================DSL // 显示schema信息 peopleDF.printSchema() // 显示所有字段 peopleDF.columns.foreach(x=>println(x)) // 显示数据,默认显示20行 peopleDF.show() // 显示第一行记录 val firstRow: Row = peopleDF.head() println(firstRow) // 统计记录数量 println(peopleDF.count()) // 显示指定列的值 peopleDF.select("id","name").show() // 过滤操作 peopleDF.filter($"age">30).show() // 分组统计操作 peopleDF.groupBy("age").count().show() // 8.通过sql语句方式操作DataFrame===============================sql println("通过sql语句方式操作DataFrame===============================sql") // 把DataFrame注册为表 peopleDF.createOrReplaceTempView("t_people") // 显示表的schema信息 spark.sql("desc t_people").show() // 查询全表数据 spark.sql("select * from t_people").show() // 9.释放资源 sc.stop() spark.stop() } }

java版本:

People javaBean:

package cn.liny.jrdd.po; import java.io.Serializable; /** * 实体类 */ public class People implements Serializable { private int id; private String name; private int age; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String toString() { return "People{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + '}'; } }

RddToDFJava:

package cn.liny.jrdd.df; import cn.liny.jrdd.po.People; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import java.util.Arrays; /** * 学习通过反射推断方式,把RDD转换成DataFrame(java版本) */ public class RddToDFJava { public static void main(String[] args) { // 1.创建SparkSession对象 SparkSession spark = SparkSession.builder().appName("RddToDFJava").master("local[2]").getOrCreate(); // 2获取SparkContext对象,设置日志输出级别 SparkContext sc = spark.sparkContext(); sc.setLogLevel("WARN"); // 3.加载数据文件 Dataset<String> peopleDS = spark.read().textFile("D:\\02teach\\02resources\\0322\\04bigdata\\testdata\\people.txt"); JavaRDD<String> stringJavaRDD = peopleDS.toJavaRDD(); // 4.切分每一行数据,转换成 JavaRDD<People>对象 JavaRDD<People> peopleJavaRDD = stringJavaRDD.map(line -> { String[] arr = line.split(" "); // 创建People对象 People people = new People(); people.setId(Integer.valueOf(arr[0])); people.setName(arr[1]); people.setAge(Integer.valueOf(arr[2])); return people; }); // 5.调用createDataFrame转换 Dataset<Row> peopleDF = spark.createDataFrame(peopleJavaRDD, People.class); // 6.通过DSL方式操作DataFrame==============================DSL // 显示schema信息 peopleDF.printSchema(); // 显示所有字段 String[] columns = peopleDF.columns(); System.out.println(Arrays.asList(columns)); // 显示数据,默认显示20行 peopleDF.show(); // 显示第一行记录 Row head = peopleDF.head(); System.out.println(head); // 统计记录数量 System.out.println(peopleDF.count()); // 显示指定列的值 peopleDF.select("id","name","age").show(); // 分组统计操作 peopleDF.groupBy("age").count().show(); // 7.通过sql语句方式操作DataFrame===============================sql System.out.println("通过sql语句方式操作DataFrame===============================sql"); // 将DataFrame注册成表 peopleDF.createOrReplaceTempView("t_people"); // 显示表的schema信息 spark.sql("desc t_people").show(); // 查询全表数据 Dataset<Row> allRow = spark.sql("select * from t_people"); allRow.show(); // 查询年龄大于等于30 Dataset<Row> ageRow = spark.sql("select * from t_people where age>=30"); ageRow.show(); // 8.释放资源 sc.stop(); spark.stop(); } } 2.1.2.通过StructType定义schema方式 2.1.2.1.scala版本 package cn.liny.rdd.df import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} /** * 学习通过StructType方式,把RDD转换成DataFrame(scala版本) */ object RddToDFScalaByStructType { def main(args: Array[String]): Unit = { // 1.创建SparkSession val spark: SparkSession = SparkSession.builder().appName("RddToDFScalaByStructType").master("local[2]").getOrCreate() // 2.通过SparkSession,获取SparkContext val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") // 3.加载数据文件 val peopleRDD: RDD[String] = sc.textFile("D:\\02teach\\02resources\\0322\\04bigdata\\testdata\\people.txt") // 4.切分每一行数据 val splitRDD: RDD[Array[String]] = peopleRDD.map(x=>x.split(" ")) // 5.加载数据到Row对象中 val rowRDD: RDD[Row] = splitRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt)) // 6.创建schema val structType: StructType = (new StructType) .add(StructField("id",IntegerType,false)) .add(StructField("name",StringType,false)) .add(StructField("age",IntegerType,false)) // 7.利用RDD和schema创建DataFrame val rowDF: DataFrame = spark.createDataFrame(rowRDD,structType) // 8.通过DSL方式操作DataFrame==============================DSL // 显示schema信息 rowDF.printSchema() // 显示数据,默认显示20行 rowDF.show() // 9.通过sql语句方式操作DataFrame===============================sql println("通过sql语句方式操作DataFrame===============================sql") // 将DataFrame注册为表 rowDF.createOrReplaceTempView("t_people") // 显示表结构schema信息 spark.sql("desc t_people").show() // 查询全部数据 spark.sql("select * from t_people").show() // 10.释放资源 sc.stop() spark.stop() } } 2.1.2.2.java版本 package cn.liny.jrdd.df; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.ArrayList; import java.util.List; import static org.apache.spark.sql.types.DataTypes.IntegerType; import static org.apache.spark.sql.types.DataTypes.StringType; /** * 学习通过StructType方式,把RDD转换成DataFrame(java版本) */ public class RddToDFJavaByStructType { public static void main(String[] args) { // 1.创建SparkSession SparkSession spark = SparkSession.builder().appName("RddToRDDJavaByStructType").master("local[2]").getOrCreate(); // 2.通过SparkSession,获取SparkContext SparkContext sc = spark.sparkContext(); sc.setLogLevel("WARN"); // 3.加载数据文件,并且转换成JavaRDD Dataset<String> stringDS = spark.read().textFile("D:\\02teach\\02resources\\0322\\04bigdata\\testdata\\people.txt"); JavaRDD<String> stringJavaRDD = stringDS.toJavaRDD(); // 4.切分每一行数据,加载数据到Row对象中 JavaRDD<Row> rowJavaRDD = stringJavaRDD.map(line -> { String[] arr = line.split(" "); Integer id = Integer.valueOf(arr[0]); String name = arr[1]; Integer age = Integer.valueOf(arr[2]); // 通过RowFactory工厂,创建Row对象 return RowFactory.create(id, name, age); }); // 5.通过StructType定义schema List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("id",IntegerType,false)); fields.add(DataTypes.createStructField("name",StringType,false)); fields.add(DataTypes.createStructField("age",IntegerType,false)); StructType structType = DataTypes.createStructType(fields); // 6.利用RDD和schema定义DataFrame Dataset<Row> peopleDF = spark.createDataFrame(rowJavaRDD, structType); // 7.通过DSL方式操作DataFrame==============================DSL // 显示schema信息 peopleDF.printSchema(); // 显示数据,默认显示20行 peopleDF.show(); // 8.通过sql语句方式操作DataFrame===============================sql System.out.println("通过sql语句方式操作DataFrame===============================sql"); // 把DataFrame注册成表 peopleDF.createOrReplaceTempView("t_people"); // 显示表结构schema信息 spark.sql("desc t_people").show(); // 查询全部数据 spark.sql("select * from t_people").show(); // 9.释放资源 sc.stop(); spark.stop(); } } 2.2.编写spark sql程序操作HiveContext

HiveContext是对应spark-hive这个项目,与hive有部分耦合, 支持hql,是SqlContext的子类。

在spark2.0之后,HiveContext和SqlContext在SparkSession进行了统一,可以通过操作SparkSession来操作HiveContext和SqlContext。

2.2.1.导入依赖 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>${spark.version}</version> </dependency> 2.2.2.编写代码 package cn.liny.hi import org.apache.spark.SparkContext import org.apache.spark.sql.SparkSession /** * 学习spark sql操作hive */ object SparkHiveSupport { def main(args: Array[String]): Unit = { // 1.创建SparkSession对象 val spark = SparkSession.builder() .appName("SparkHiveSupport") .master("local[2]") .config("spark.sql.warehouse.dir","D:\\02teach\\03tmp\\spark\\spark-warehouse") .enableHiveSupport()// 开启支持hive .getOrCreate() // 2.通过SparkSession,获取SparkContext对象 val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") // 3.创建hive表 spark.sql("CREATE TABLE IF NOT EXISTS student(id int, name string, age int) row format delimited fields terminated by ' '") // 4.导入数据到hive表 spark.sql("LOAD DATA LOCAL INPATH 'D:/02teach/02resources/0322/04bigdata/testdata/people.txt' INTO TABLE student") // 5.执行sql查询 spark.sql("select * from student ").show() // 6.释放资源 sc.stop() spark.stop() } } 3.数据源

spark sql可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,再将数据写回关系型数据库中。

3.1.spark sql 从mysql数据库中加载数据 3.1.1.编写代码操作 3.1.1.1.导入依赖 <mysql.version>5.1.38</mysql.version> <!--依赖mysql--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> 3.1.1.2.编写代码

scala版本:

package cn.liny.db import java.util.Properties import org.apache.spark.sql.{DataFrame, SparkSession} /** * 学习spark sql从mysql加载数据 */ object DataFromMysql { def main(args: Array[String]): Unit = { // 1.创建SparkSession对象 val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate() // 2.创建Properties对象,设置连接mysql的用户名和密码 val prop: Properties =new Properties() prop.setProperty("user","root") prop.setProperty("password","admin") // 3.读取mysql中的数据 val iplocationDF: DataFrame = spark.read.jdbc("jdbc:mysql://127.0.0.1:3308/spark","iplocation",prop) // 4.显示mysql中表的数据 iplocationDF.show() // 5.释放资源 spark.stop() } }

java版本:

package cn.liny.jrdd.db; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import java.util.Properties; /** * 学习spark sql从mysql加载数据(java版本) */ public class DataFromMysqlForJava { public static void main(String[] args) { // 1.创建SparkSession对象 SparkSession spark = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate(); //2.创建Properties对象,设置连接mysql的用户名和密码 Properties prop = new Properties(); prop.setProperty("user","root"); prop.setProperty("password","admin"); // 3.读取mysql中的数据 Dataset<Row> iplocation = spark.read().jdbc("jdbc:mysql://127.0.0.1:3308/spark", "iplocation", prop); // 4.显示数据 iplocation.show(); // 5.释放资源 spark.stop(); } } 3.1.2.通过spark-shell 操作 3.1.2.1.启动spark-shell,需要指定mysql驱动包 spark-shell \ --master spark://node01:7077 \ --executor-memory 1g \ --total-executor-cores 2 \ --jars /export/servers/testjar/mysql-connector-java-5.1.32-bin.jar \ --driver-class-path /export/servers/testjar/mysql-connector-java-5.1.32-bin.jar

3.1.2.2.从mysql中加载数据 val mysqlDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.53.120:3306/mysql", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "iplocation", "user" -> "root", "password" -> "123456")).load() mysqlDF.show

3.2.spark sql将数据写入到mysql数据库中 3.2.1.本地模式运行 package cn.liny.db import java.util.Properties import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} /** * 学习spark sql将数据写入mysql */ // 定义样例类 case class Teacher(id:Int,name:String,age:Int) object DataToMysql { def main(args: Array[String]): Unit = { // 1.创建SparkSession对象 val spark: SparkSession = SparkSession.builder().appName("DataToMysql").master("local[2]").getOrCreate() // 2.读取数据 val data: RDD[String] = spark.sparkContext.textFile(args(0)) // 3.切分每一行 val arrRDD: RDD[Array[String]] = data.map(_.split(" ")) // 4.RDD关联样例类 val teacherRDD: RDD[Teacher] = arrRDD.map(x=>Teacher(x(0).toInt,x(1),x(2).toInt)) // 5.导入隐式转换 import spark.implicits._ // 6.将RDD转换成DataFrame val teacherDF: DataFrame = teacherRDD.toDF() // 7.将DataFrame注册成表 teacherDF.createOrReplaceTempView("teacher") // 8.操作teacher表 ,按照年龄进行降序排列 val resultDF: DataFrame = spark.sql("select * from teacher order by age desc") // resultDF.show() // 9.把结果保存在mysql表中 // 9.1.创建Properties对象,设置连接mysql的用户名和密码 val prop: Properties =new Properties() prop.setProperty("user","root") prop.setProperty("password","admin") // 9.2.操作数据库表 /** * 写入mysql时,可以配置插入mode,overwrite覆盖,append追加,ignore忽略,error默认表存在报错 */ resultDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://127.0.0.1:3308/spark","teacher",prop) // 释放资源 spark.stop() } }

3.2.2.打包提交到集群环境运行 3.2.2.1.改造代码 package cn.liny.db import java.util.Properties import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} /** * 学习spark sql将数据写入mysql(提交集群环境运行) */ // 定义样例类 case class TeacherMaster(id:Int,name:String,age:Int) object DataToMysqlForMaster { def main(args: Array[String]): Unit = { // 1.创建SparkSession对象 val spark: SparkSession = SparkSession.builder().appName("DataToMysqlForMaster").getOrCreate() // 2.读取数据 val data: RDD[String] = spark.sparkContext.textFile(args(0)) // 3.切分每一行 val arrRDD: RDD[Array[String]] = data.map(_.split(" ")) // 4.RDD关联样例类 val teacherRDD: RDD[TeacherMaster] = arrRDD.map(x=>TeacherMaster(x(0).toInt,x(1),x(2).toInt)) // 5.导入隐式转换 import spark.implicits._ // 6.将RDD转换成DataFrame val teacherDF: DataFrame = teacherRDD.toDF() // 7.将DataFrame注册成表 teacherDF.createOrReplaceTempView("teacher") // 8.操作teacher表 ,按照年龄进行降序排列 val resultDF: DataFrame = spark.sql("select * from teacher order by age desc") // resultDF.show() // 9.把结果保存在mysql表中 // 9.1.创建Properties对象,设置连接mysql的用户名和密码 val prop: Properties =new Properties() prop.setProperty("user","root") prop.setProperty("password","bigdata") // 9.2.操作数据库表 /** * 写入mysql时,可以配置插入mode,overwrite覆盖,append追加,ignore忽略,error默认表存在报错 */ resultDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.80.22:3306/spark","teacher",prop) // 释放资源 spark.stop() } } 3.2.2.2.打包

3.2.2.3.提交集群运行 #将文件上传到hdfs hdfs dfs -mkdir -p /spark/sql/input hdfs dfs -put people.txt /spark/sql/input #提交执行任务 spark-submit \ --class cn.liny.db.DataToMysqlForMaster \ --master spark://node01:7077 \ --executor-memory 1g \ --total-executor-cores 2 \ --jars /export/servers/testjar/mysql-connector-java-5.1.32-bin.jar \ --driver-class-path /export/servers/testjar/mysql-connector-java-5.1.32-bin.jar \ /export/servers/testjar/spark_sql-1.0-SNAPSHOT.jar \ hdfs://192.168.53.100:8020/spark/sql/input/people.txt


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #spark #SQL #1Spark #sql11spark #SQLSpark