Born to be proud
3/29
2018

Spark 笔记

Spark 特点

  • Spark是快速的. Spark是基于内存的计算,扩充了MapReduce.
  • Spark是通用的.容纳了其他分布式系统的功能.批处理,迭代计算,交互查询和流处理.降低了维护成本.
  • Spark是高度开放的.提供了JAVA,Python,Scala,SQL的API和丰富的内置库.和其他大数据工具整合很好,hadoop,kafca.

Spark 组件

  • Spark Core
  • Spark SQL
  • Spark Streaming
  • Spark Mlib
  • Spark Graphx
  • Cluster Manager

Spark 组件紧密集成的优点

  • Spark底层优化了,基于Spark底层的组件,也得到相应的优化
  • 节省了各组件部署,测试时间
  • 向Spark增加新组件时,其他组件可立即共享新组件功能

Spark 与 Hadoop 比较

  • Hadoop主要应用于离线处理,对实时性要求不高的场景
  • Spark主要应用于对实时性要求较高的场景,机器学习等领域

Spark Shell

  • 处理分布在集群上的数据
  • 加载到节点内存中,运行速度是秒级的
  • 快速迭代计算,实时查询,分析一般可以在Shell中完成
  • 提供了 Python Shell(bin/pyspark) 和 Scala Shell(bin/spark-shell)
  • Scala Shell 修改日志级别 log4j.rootCategory = WARN, console

Spark 开发环境搭建

  • 下载 Spark
  • 下载 Intellj IDEA(注册码地址: http://idea.lanyus.com)
  • 插件安装:Plugins,搜索Scala直接安装,插件中有scala和sbt

    //配置免密登录
    touch authorized_keys
    cat id_rsa.pub > authorized_keys
    chmod 600 authorized_keys

Spark与Scala版本匹配

  • Spark1.6.2 - Scala2.10
  • Spark2.0.0 - Scala2.11

项目创建

//java home 目录, jdk与scala版本
 /Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home/bin

//配置sbt
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.2"

//写代码

//配置jar包
进入 File > Project Structure > Artifacts > + > JARs > From Module... > 第二个选项

//build

Spark 相关命令

启动 master
./sbin/start_master.sh
:quit

启动 worker
./bin/spark-class

hdfs dfs -rm -R /user/algo/liujiaqi/wordcount-output-spark/

提交作业
//单机
spark-submit --master yarn-client --class WordCount myspark.jar

//集群
spark-submit --master yarn-cluster --conf spark.storage.memoryFraction=0.7 --conf spark.shuffle.memoryFraction=0.3 --conf spark.broadcast.blockSize=4096 --conf spark.broadcast.factory=org.apache.spark.broadcast.TorrentBroadcastFactory --conf spark.broadcast.compress=true --conf spark.serializer=org.apache.spark.serializer.JavaSerializer --conf spark.akka.threads=8 --conf spark.default.parallelism=63 --conf spark.port.maxRetries=100 --class WordCount  myspark.jar

//查看结果
hdfs dfs -cat /user/algo/liujiaqi/wordcount-output-spark/part-00000

//查看日志
yarn logs -applicationId xxxx

RDDs 弹性分布式数据集

  • Driver programs: 包含程序main方法,RDDs的定义和操作。管理很多节点,称为executors。
    image

  • SparkContex: Driver programs通过SparkContex对象访问Spark,SparkContex代表和一个集群的连接,在Shell中SparkContex自动创建好了,就是sc

RDDs

image

  • 并行的分布在整个集群中,是Spark分发数据和计算的基础数据类
  • 一个RDD是一个不可改变的分布式集合对象
  • Spark 中所有的计算都是通过RDDs的创建,转换,操作完成的
  • 一个RDD内部由许多Partitions组成

RDDs 创建方法

  1. 把一个存在的集合传给SparkContex的parallelize()方法

    val rdd = sc.parallelize(Array(1,2,3,4),4)
    //第一个参数待处理的集合,第二个参数为分区个数
    
  2. 加载外部数据集

    val addText = sc.textFile("helloSpark.txt");
    

Scala 基础知识

  • 变量声明: 必须用 val(不可修改) 或 var(可改)
  • 匿名函数与类型推断

    lines.filter(line.contains("world"));
    //定义一个匿名函数,接受一个参数line
    

基本操作Transformation

  • map(): 把函数应用到RDD的每一个元素,返回新RDD
  • filter(): 返回只包含满足filter()函数的元素的新RDD
  • flatMap(): 对每个输入元素,输出多个输出元素,flat是压扁的意思,将RDD元素压扁后返回新RDD
  • 支持数据集合运算,例如并集交集计算

基本操作Action

  • reduce(): 接收一个函数,作用在RDD两个类型相同的元素上,返回新元素,实现累加计数等
  • collect(): 遍历整个RDD,向Driver programs返回RDD内容,需要单机内存能够容纳下,大数据时用saveAsTextFile
  • take(n): 返回RDD的n个元素(同时尝试访问最少partition),返回结果无需,测试使用
  • top(n): 排序(根据RDD中数据比较器),前n个
  • foreach(): 遍历每个元素,但不返回本地,可以配合println(),友好的打印出数据
  • reduceByKey
  • groupByKey
  • combineByKey(): 最常用的key的聚合函数,返回类型可以与输入类型不一样

RDDs特性

  • 血统关系图: Spark维护着RDDs间的依赖关系和创建关系,叫做血统关系图,使用血统关系图计算RDD需求,恢复丢失的数据
  • 延迟计算: Spark对RDDs的计算是在第一次使用action操作的时候,可以减少数据的传输
  • 持久化: 默认每次在RDDs上进行action操作时,Spark都重新计算RDDs.若想重复利用一个RDD,可以使用RDD.persist(), unpersist()从缓存中移除

DataFrame & DataSet

RDD

  1. RDD一般和spark mlib同时使用
  2. RDD不支持sparksql操作

DataFrame

  1. 与RDD和Dataset不同,DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值
    testDF.foreach{
      line =>
        val col1=line.getAs[String]("col1")
        val col2=line.getAs[String]("col2")
    }
    
  2. DataFrame与Dataset一般与spark ml同时使用
  3. DataFrame与Dataset均支持sparksql的操作,比如select,groupby之类,还能注册临时表/视窗,进行sql语句操作,如
    dataDF.createOrReplaceTempView("tmp")
    spark.sql("select  ROW,DATE from tmp where DATE is not null order by DATE").show(100,false)
    
  4. DataFrame与Dataset支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然

DataSet

  • 这里主要对比Dataset和DataFrame,因为Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同.
  • DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知
  • 而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息

代码

var lines = sc.textFile("liu")
lines.foreach(println)
lines.count()
var lines2 = lines.foreach(println)
var lines3 = lines.map(word=>(word,1))
var lines4 = lines.fileter(word=>word.contains("hello"))
print(lines.first())
var lines2 = lines.map(line=>(line.split("\t")(34),line))

方法

newAPIHadoopRDD(path,format,key,value)