Spark(笔记)
RDD:
RDD是什么?
- 弹性
- 内存和磁盘之间同步数据
- RDD可以变成另外一个RDD
- RDD内部存储数据类型丰富
- 存什么数据?
- RDD不存数据,只存数据的分区信息和读取方法(HDFS,其他RDD)
- 依赖(向上依赖):
- 顶部RDD
- 非顶部RDD:记录自己来源于谁 — lineage
- 宽依赖:有shuffle
- 窄依赖:无shuffle
- 失效问题?:
- 窄依赖:只需要计算丢失RDD分区的父分区,而且不同节点之间可以并行
- 宽依赖:单个节点失效,可能导致这个RDD所有的祖先丢失部分的分区重新计算
- 怎么读取?
- 有存储级别:先判断缓存是否有数据,如果有,直接读,如果没有,从磁盘读
- 无存储级别:直接从磁盘读
参数:
内存分配:
Executor的内存分为3块:- executor内存【20%】:执行内存,join,aggregate都在这部分内存执行,shuffle也会缓存这个内存区,如果内存区满,写磁盘
- storage内存【60%】:cache,persist
- other内存:留给应用程序自己的,通常占用空间比较少
一般,executor和storage内存比较大
- spark-1.6.0以前的版本,每类内存相互隔离,导致内存利用率不高,限制了app运行
- spark-1.6.0以上的版本,executor和storage内存之间是可以互相借用,提高利用率,减少了OOM(out of memory)的情况
提交类参数:
- executor - memory:每个executor内存多大
- 执行内存:可以设置大一些,80%
- 需要同时考虑driver-memory的设置 一般driver不会做任何计算和存储,只是下发任务和task交互,通常默认1G,除非把不同节点数据全部都汇集(collect)到总控上,此时设置大一些
-
num - executor:多少个executor进程
-
executor - cores:每个executor有多少个core
- cores最好不要设为1,一般设置2~4个
executor-cores * num-executors = 一共需要多少虚拟core,一般不要超过总队列的25%
举例:队列总cores400,最大不要超过100,也不建议低于10%(40)
提交任务(spark-submit)
提交任务需要Context上下文,SparkContext(SC)在Spark应用程序的执行过程中起到主导作用,负责Spark集群进行交互(申请资源,创建RDD,广播)
. / bin / spark-submit \--master yarn-cluster \--num-executors 100 \--executor-memory 6 G \--executor-cores 4 \--driver-memory 1 G \--conf spark.default.parallelism = 1000 \--conf spark.storage.memoryFraction = 0.5 \--conf spark.shuffle.memoryFraction = 0.3 \
Spark开发调优?
- 避免创建重复RDD
- 对同一份数据,之应该创建一个RDD,不能创建多个RDD来代表同一份数据
- 极大浪费内存
- 尽可能复用同一个RDD
- 比如:一个RDD数据格式是k-v,另一个是单独value类型,这两个RDD的value部分完全一样,这样可以复用,达到减少算子执行次数
#优化前:JavaPairRddrdd1 = ... JavaRdd rdd2 = rdd1.map(...)#优化后:JavaPairRdd rdd1 = ... rdd1.reduceByKey(...)rdd1.map(tuple._2...)
- 对多次使用的RDD进行持久化处理
- 每次对一个RDD执行一个算子操作时,都会重新从源头处理计算一边,计算出那个RDD出来,然后进一步操作,这种方式性能很差
- 对多次使用的RDD进行持久化,将RDD的数据保存在内存或磁盘中,避免重复劳动
- 借助cache() 和persist() 方法
#cache() ://只会缓存到内存val rdd1 = sc.textFile(...).cache()rdd1.map(...)rdd1.reduce(...)
cache() 是 persist() 的特殊形式 — MEMORY_ONLY
#persist() :val rdd1 = sc.textFile(...).persist(StorageLevel.MEMORY_AND_DISK_SER)//内存充足以内存持久化优先,_SER表示序列化rdd1.map(...)rdd1.reduce(...)
- 避免使用shuffle类算子
- spark作业运行过程中,最小号性能的地方就是shuffle过程
- 将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合和join处理,比如groupByKey,reduceByKey,join等算子,都会触发shuffle
#优化前:val rdd3 = rdd1.join(rdd2)#优化后:val rdd2Data = rdd2.collect() //打开val rdd2DataBroadcast = sc.vroadcast(rdd2Data) //分发val rdd3 = rdd1.map(rdd2DataBroadcast)
Broadcast+map的join操作,不会导致shuffle操作,但前提适合R得到数量较少时使用
- 使用map-side预聚合的shuffle操作
- 一定要使用shuffle的,无法用map类算子替代的,那么尽量使用map-site预聚合的算子
- 思想类似MapReduce中的Combiner
- 可能的情况下用reduceByKey或者aggregateByKey算子替代groupByKey算子,因为reduceByKey或aggregateByKey算子会使用用户自定义的函数对每个节点本低相同key进行预聚合,而groupByKey算子不会预聚合
- 使用Kryo优化序列化性能
- Kryo是一个序列化类库,来优化序列化和反序列化性能
- Spark默认使用Java序列化机制(ObjectOutputStream/ObjectInputStreamAPI)进行序列 化和反序列化
- Spark支持使用Kryo序列化库,性能比Java序列化库高很多,10倍左右
//创建SparkConf对象val conf = new SparkConf().setMaster(...)setAppName(...)//设置序列化器为KryoSerializerconf.set("spark,serializer","org.apache.spark.serializer,KryoSerializer")//注册要序列化的自定义类型conf.registerKryoClasses(Array(classOf[MyClass1],classOf[MyClass2]))