欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你。
/ 发家史 /
熟悉 Spark SQL 的都知道,Spark SQL 是从 Shark 发展而来。Shark 为了实现 Hive 兼容,在 HQL 方面重用了 Hive 中 HQL 的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从 MR 作业替换成了 Spark 作业(辅以内存列式存储等各种和 Hive 关系不大的优化);同时还依赖 Hive Metastore 和 Hive SerDe(用于兼容现有的各种 Hive 存储格式)。
Spark SQL 在 Hive 兼容层面仅依赖 HQL parser、Hive Metastore 和 Hive SerDe。也就是说,从 HQL 被解析成抽象语法树(AST)起,就全部由 Spark SQL 接管了。执行计划生成和优化都由 Catalyst 负责。借助 Scala 的模式匹配等函数式语言特性,利用 Catalyst 开发执行计划优化策略比 Hive 要简洁得多。
Spark SQL
Spark SQL 提供了多种接口:
纯 Sql 文本;
dataset/dataframe api。
当然,相应的,也会有各种客户端:
sql 文本,可以用 thriftserver/spark-sql;
编码,Dataframe/dataset/sql。
/ Dataframe/Dataset API 简介 /
Dataframe/Dataset 也是分布式数据集,但与 RDD 不同的是其带有 schema 信息,类似一张表。
可以用下面一张图详细对比 Dataset/dataframe 和 RDD 的区别:
Dataset 是在 spark1.6 引入的,目的是提供像 RDD 一样的强类型、使用强大的 lambda 函数,同时使用 Spark SQL 的优化执行引擎。到 spark2.0 以后,DataFrame 变成类型为 Row 的 Dataset,即为:
type DataFrame = Dataset[Row]
所以,很多移植 spark1.6 及之前的代码到 spark2+的都会报错误,找不到 dataframe 类。
基本操作
val df = spark.read.json(“file:///opt/meitu/bigdata/src/main/data/people.json”) df.show() import spark.implicits._ df.printSchema() df.select("name").show() df.select($"name", $"age" + 1).show() df.filter($"age" > 21).show() df.groupBy("age").count().show() spark.stop()
分区分桶 排序
分桶排序保存hive表 df.write.bucketBy(42,“name”).sortBy(“age”).saveAsTable(“people_bucketed”) 分区以parquet输出到指定目录 df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet") 分区分桶保存到hive表 df.write .partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("users_partitioned_bucketed")
cube rullup pivot
cube sales.cube("city", "year”).agg(sum("amount")as "amount”) .show() rull up sales.rollup("city", "year”).agg(sum("amount")as "amount”).show() pivot 只能跟在groupby之后 sales.groupBy("year").pivot("city",Seq("Warsaw","Boston","Toronto")).agg(sum("amount")as "amount”).show()
/ SQL 编程 /
Spark SQL 允许用户提交 SQL 文本,支持以下三种手段编写 SQL 文本:
1. spark 代码
2. spark-sql的shell
3. thriftserver
支持 Spark SQL 自身的语法,同时也兼容 HSQL。
1. 编码
要先声明构建 SQLContext 或者 SparkSession,这个是 SparkSQL 的编码入口。早起的版本使用的是 SQLContext 或者 HiveContext,spark2 以后,建议使用的是 SparkSession。
SQLContext
new SQLContext(SparkContext)
HiveContext
new HiveContext(spark.sparkContext)
SparkSession
不使用 hive 元数据:
val spark = SparkSession.builder() .config(sparkConf) .getOrCreate()
使用 hive 元数据:
val spark = SparkSession.builder() .config(sparkConf) .enableHiveSupport().getOrCreate()
使用
val df =spark.read.json("examples/src/main/resources/people.json") df.createOrReplaceTempView("people") spark.sql("SELECT * FROM people").show()
2. spark-sql 脚本
spark-sql 启动的时候类似于 spark-submit 可以设置部署模式资源等,可以使用
bin/spark-sql –help 查看配置参数。
需要将 hive-site.xml 放到 ${SPARK_HOME}/conf/ 目录下,然后就可以测试
show tables; select count(*) from student;
3. thriftserver
thriftserver jdbc/odbc 的实现类似于 hive1.2.1 的 hiveserver2,可以使用 spark 的 beeline 命令来测试 jdbc server。
安装部署
/1 开启 hive 的 metastore
bin/hive --service metastore
/2 将配置文件复制到spark/conf/目录下
/3 thriftserver
sbin/start-thriftserver.sh --masteryarn --deploy-mode client
对于 yarn 只支持 client 模式。
/4 启动 bin/beeline
/5 连接到 thriftserver
!connect jdbc:hive2://localhost:10001
/ 用户自定义函数 /
1. UDF
定义一个 udf 很简单,例如我们自定义一个求字符串长度的 udf:
val len = udf{(str:String) => str.length} spark.udf.register("len",len) val ds =spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json") ds.createOrReplaceTempView("employees") ds.show() spark.sql("select len(name) from employees").show()
2. UserDefinedAggregateFunction
定义一个 UDAF
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.types._ object MyAverageUDAF extends UserDefinedAggregateFunction { //Data types of input arguments of this aggregate function definputSchema:StructType = StructType(StructField("inputColumn", LongType) :: Nil) //Data types of values in the aggregation buffer defbufferSchema:StructType = { StructType(StructField("sum", LongType):: StructField("count", LongType) :: Nil) } //The data type of the returned value defdataType:DataType = DoubleType //Whether this function always returns the same output on the identical input defdeterministic: Boolean = true //Initializes the given aggregation buffer. The buffer itself is a `Row` that inaddition to // standard methods like retrieving avalue at an index (e.g., get(), getBoolean()), provides // the opportunity to update itsvalues. Note that arrays and maps inside the buffer are still // immutable. definitialize(buffer:MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 0L } //Updates the given aggregation buffer `buffer` with new input data from `input` defupdate(buffer:MutableAggregationBuffer, input: Row): Unit ={ if(!input.isNullAt(0)) { buffer(0) = buffer.getLong(0)+ input.getLong(0) buffer(1) = buffer.getLong(1)+ 1 } } // Mergestwo aggregation buffers and stores the updated buffer values back to `buffer1` defmerge(buffer1:MutableAggregationBuffer, buffer2: Row): Unit ={ buffer1(0) = buffer1.getLong(0)+ buffer2.getLong(0) buffer1(1) = buffer1.getLong(1)+ buffer2.getLong(1) } //Calculates the final result defevaluate(buffer:Row): Double =buffer.getLong(0).toDouble /buffer.getLong(1) }
使用 UDAF
val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json") ds.createOrReplaceTempView("employees") ds.show() spark.udf.register("myAverage", MyAverageUDAF) val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees") result.show()
3. Aggregator
定义一个 Aggregator
import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.apache.spark.sql.expressions.Aggregator case class Employee(name: String, salary: Long) case class Average(var sum: Long, var count: Long) object MyAverageAggregator extends Aggregator[Employee, Average, Double] { // A zero value for this aggregation. Should satisfy the property that any b + zero = b def zero: Average = Average(0L, 0L) // Combine two values to produce a new value. For performance, the function may modify `buffer` // and return it instead of constructing a new object def reduce(buffer: Average, employee: Employee): Average = { buffer.sum += employee.salary buffer.count += 1 buffer } // Merge two intermediate values def merge(b1: Average, b2: Average): Average = { b1.sum += b2.sum b1.count += b2.count b1 } // Transform the output of the reduction def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count // Specifies the Encoder for the intermediate value type def bufferEncoder: Encoder[Average] = Encoders.product // Specifies the Encoder for the final output value type def outputEncoder: Encoder[Double] = Encoders.scalaDouble }
使用
spark.udf.register("myAverage2", MyAverageAggregator) import spark.implicits._ val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json").as[Employee] ds.show() val averageSalary = MyAverageAggregator.toColumn.name("average_salary") val result = ds.select(averageSalary) result.show()
/ 数据源 /
1. 通用的 laod/save 函数
可支持多种数据格式:json, parquet, jdbc, orc, libsvm, csv, text
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json") peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
默认的是 parquet,可以通过 spark.sql.sources.default,修改默认配置。
2. Parquet 文件
val parquetFileDF =spark.read.parquet("people.parquet") peopleDF.write.parquet("people.parquet")
3. ORC 文件
val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json") ds.write.mode("append").orc("/opt/outputorc/") spark.read.orc("/opt/outputorc/*").show(1)
4. JSON
ds.write.mode("overwrite").json("/opt/outputjson/") spark.read.json("/opt/outputjson/*").show()
5. Hive 表
spark 1.6 及以前的版本使用 hive 表需要 hivecontext。Spark2 开始只需要创建 sparksession 增加 enableHiveSupport()即可。
val spark = SparkSession .builder() .config(sparkConf) .enableHiveSupport() .getOrCreate() spark.sql("select count(*) from student").show()
6. JDBC
写入 mysql
wcdf.repartition(1).write.mode("append").option("user", "root") .option("password", "mdh2018@#").jdbc("jdbc:mysql://localhost:3306/test","alluxio",new Properties())
从 mysql 里读
val fromMysql = spark.read.option("user", "root") .option("password", "mdh2018@#").jdbc("jdbc:mysql://localhost:3306/test","alluxio",new Properties())
7. 自定义数据源
自定义 source 比较简单,首先我们要看看 source 加载的方式。指定的目录下,定义一个 DefaultSource 类,在类里面实现自定义 source,就可以实现我们的目标。
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport} class DefaultSource extends DataSourceV2 with ReadSupport { def createReader(options: DataSourceOptions) = new SimpleDataSourceReader() }
import org.apache.spark.sql.Row import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, DataSourceReader} import org.apache.spark.sql.types.{StringType, StructField, StructType} class SimpleDataSourceReader extends DataSourceReader { def readSchema() = StructType(Array(StructField("value", StringType))) def createDataReaderFactories = { val factoryList = new java.util.ArrayList[DataReaderFactory[Row]] factoryList.add(new SimpleDataSourceReaderFactory()) factoryList } }
import org.apache.spark.sql.Row import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} class SimpleDataSourceReaderFactory extends DataReaderFactory[Row] with DataReader[Row] { def createDataReader = new SimpleDataSourceReaderFactory() val values = Array("1", "2", "3", "4", "5") var index = 0 def next = index < values.length def get = { val row = Row(values(index)) index = index + 1 row } def close() = Unit }
使用
val simpleDf = spark.read .format("bigdata.spark.SparkSQL.DataSources") .load() simpleDf.show()
/ 优化器及执行计划 /
1. 流程简介
总体执行流程如下:从提供的输入 API(SQL,Dataset, dataframe)开始,依次经过 unresolved 逻辑计划,解析的逻辑计划,优化的逻辑计划,物理计划,然后根据 cost based 优化,选取一条物理计划进行执行。
简单化成四个部分:
/1 analysis
Spark 2.0 以后语法树生成使用的是 antlr4,之前是 scalaparse。
/2 logical optimization
常量合并,谓词下推,列裁剪,boolean 表达式简化,和其它的规则。
/3 physical planning
eg:SortExec 。
/4 Codegen
codegen 技术是用 scala 的字符串插值特性生成源码,然后使用 Janino 编译成 java字节码,Eg: SortExec。
2. 自定义优化器
/1 实现
继承 Rule[LogicalPlan]
object MultiplyOptimizationRule extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { case Multiply(left,right) if right.isInstanceOf[Literal] && right.asInstanceOf[Literal].value.asInstanceOf[Double] == 1.0 => println("=========> optimization of one applied") left } } spark.experimental.extraOptimizations = Seq(MultiplyOptimizationRule) val multipliedDFWithOptimization = df.selectExpr("amountPaid * 1") println("after optimization")
/2 注册
spark.experimental.extraOptimizations= Seq(MultiplyOptimizationRule)
/3 使用
selectExpr("amountPaid* 1")
3. 自定义执行计划
/1 物理计划
继承 SparkLan 实现 doExecute 方法。
/2 逻辑计划
继承 SparkStrategy 实现 apply。
case class FastOperator(output: Seq[Attribute],child:SparkPlan) extends SparkPlan { override def children: Seq[SparkPlan] = Nil override protected def doExecute(): RDD[InternalRow] = { val row = org.apache.spark.sql.Row("hi",12L) val unsafeRow = toUnsafeRow(row, Array(org.apache.spark.sql.types.StringType,org.apache.spark.sql.types.LongType)) sparkContext.parallelize(Seq(unsafeRow),1) } def toUnsafeRow(row: org.apache.spark.sql.Row, schema: Array[org.apache.spark.sql.types.DataType]): org.apache.spark.sql.catalyst.expressions.UnsafeRow = { val converter = unsafeRowConverter(schema) converter(row) } def unsafeRowConverter(schema: Array[org.apache.spark.sql.types.DataType]): org.apache.spark.sql.Row => org.apache.spark.sql.catalyst.expressions.UnsafeRow = { val converter = org.apache.spark.sql.catalyst.expressions.UnsafeProjection.create(schema) (row: org.apache.spark.sql.Row) => { converter(org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToCatalyst(row).asInstanceOf[org.apache.spark.sql.catalyst.InternalRow]) } } } case object NeverPlanned extends LeafNode { override def output: Seq[Attribute] = Nil } object TestStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case Project(pblist, child) => println("mt fastOperator ------------>") FastOperator(pblist.map(_.toAttribute),planLater(child)) :: Nil case Union(children) => println("mt union ========>") UnionExec(children.map(planLater)) :: Nil case LocalRelation(output, data, _) => LocalTableScanExec(output, data):: Nil case _ => Nil } }
/3 注册到 Spark 执行策略
spark.experimental.extraStrategies =Seq(countStrategy)
/4 使用
spark.sql("select count(*) fromtest")