Spark实战:工程实践

工欲善其事,必先利其器。

(本文是基于 sbt 来配置 Spark 开发的工程化,支持 Scala/Java 编程语言。Python 和 R 用户需要使用其它方式来实现工程化。)

今天谈谈Spark开发中的工程化问题。我们都知道Spark程序是在集群上跑的,需要把程序打包后使用 $SPARK_HOME/bin/spark-sumibt 到Spark集群上。

在开发、测试时,每次代码修改后都打包、提交、运行……效率还是比较差的。而提交到集群上的程序一般情况下都是连接的生产环境的数据,先不说安全问题,就当每次都要完整跑完生产环境的数据也是很费时的事情。

更佳的实践是我们在开发时连接 local 模式,或者使用一个单独的比较小的集群,以及一个数量比较少的数据集来进行测试。

使用sbt配置开发环境

我们来看看 build.sbt 配置,在开发 spark 应用时需要的基本设置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
scalaVersion := "2.11.7"

scalacOptions ++= Seq(
"-encoding", "utf8",
"-unchecked",
"-feature",
"-deprecation"
)

assemblyJarName in assembly := "spark-startup.jar"

assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

test in assembly := {}


val verSpark = "1.5.2"
val verHadoop = "2.6.2"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.5.2" % "provided,test",
"org.apache.spark" %% "spark-sql" % "1.5.2" % "provided,test",
"org.scalatest" %% "scalatest" % "2.2.6"
)

我们添加 spark-corespark-sqlscalatest两个依赖库,前两个提供了 Spark RDD和 Spark SQL/DataFrame 编程支持,后一个提供了测试支持。

这里需要注意的点是:provided 配置,它的含意是在打包所指定的库由运行时环境提供,这里只是开发时需要依赖它。这一点很重要,若我们在提交给 Spark 的jar包里包含了 spark-xxx 库,会引起运行时错误。

我们还需要给 sbt 添加 sbt-assembly 插件,用于更好的控制打包过程。project/plugins.sbt

1
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.2")

开发

Spark的开发不是本文重点,这里会分享些我在日常开发中的经验。关于:测试和提交到集群运行。先来看看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package example

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Yang Jing (yangbajing@gmail.com) on 2016-03-12.
*/
class SparkApp(sc: SparkContext) extends Serializable {
val sqlContext = new SQLContext(sc)

def run() = {
val jsonStrings = Seq(
"""{"name":"杨景","age":31}""",
"""{"name":"羊八井","age"31}""",
"""{"name":"yangbajing","age":31}"""
)
val rdd = sc.parallelize(jsonStrings)
val sql = sqlContext.read.json(rdd)
sql.show()
}

}

object SparkApp {

def main(args: Array[String]): Unit = {
val conf = new SparkConf()
val sc = new SparkContext(conf)

val app = new SparkApp(sc)
app.run()
}

}

这里我们定义了 SparkApp 类和它的伴生对象,业务逻辑将写在 SparkApp 类里,SparkConf 将在伴生对象的 main 方法中定义。SparkContext 是做为参数传入 SparkApp 类的,在 object SparkAPP 中并未设置 master 和 appName ,因为这两个值将会在提交脚本(一个简单的shell脚本,用以指定 spark-submit 的相关参数)中设置。

(需要注意的是 SparkApp class要实现 Serializable 接口,只有这样 Spark 才能正常的使用序列化将代码分配的集群中运行。)

提交脚本

1
2
3
4
5
6
7
8
9
10
11
#!/usr/bin/env bash

$SPARK_HOME/bin/spark-submit \
--class sample.SparkApp \
--master spark://sc-data-server-1:7077 \
--name "sample.SparkApp" \
--executor-memory 10G \
--driver-memory 2G \
--total-executor-cores 4 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
../target/scala-2.11/spark-startup.jar &

提交脚本里我们设置了将程序提交到 Spark 集群运行所需要的参数:

  • –class: 指定 jar 包里要执行的 main 方法所在类
  • –master: 指定 spark master 地址
  • –name: 设置程序名字
  • –executor-memory: 指定每个 Work 所能使用内存
  • –total-executor-cores: 设置集群使用CPU core总数
  • –driver-memory: 指定驱动程序使用内存
  • –conf: 设置额外参数,有多个参数需要设置可使用多个 –conf 配置项
  • jar path: 脚本的最后一荐设置 jar 包路径

这里 –class, –master, –mane 和最后的 jar 包路径是必需设置的参数。

提交脚本已经写好了,那怎样生成 jar(../target/scala-2.11/spark-startup.jar) 包呢?使用 sbt assembly 命令可以生成 jar 包在 target/scala-2.11` 目录下。

使用 sbt 来开发 spark 的产品级程序,这样就可以了。但若每次修改代码都执行以上打包、提交到集群等步骤。效率还是有点慢,且实际上也不是个好的方式。接下来,再介绍下怎么使用 scalatest 来测试我们的 spark 程序。

测试程序

首先来看看我们的 SparkAppTest 测序程序代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class SparkAppTest extends WordSpec {

"SparkAppTest" should {

"run" in {
val conf = new SparkConf()
.setAppName("SparkAppTest")
.setMaster("local[*]")
val sc = new SparkContext(conf)

val app = new SparkApp(sc)
app.run()
}

}
}

在测试程序中,我们设置了 appName,并将 master 地址设置为:local[*],含义是使用本地 Spark 模式,并使用所有CPU核。

总结

做为一篇实战文章,是肯定会有代码详示的,代码在此:https://github.com/yangbajing/scala-applications/tree/master/spark-startup

sbt 是一款不错的项目构建工具,但是国内用户使用时会遇到“墙”的问题。现在好了,国内 Scala 社区建立了 Repox 社区公服,它解决了 Scala 开发者除了语法外最大的一个难题。

Spark是优化的大数据工具(平台),希望能和各位爱好者多交流、学习。

相关文章: