博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
大数据 — Spark笔记2
阅读量:4344 次
发布时间:2019-06-07

本文共 3071 字,大约阅读时间需要 10 分钟。

Spark(笔记)

RDD:

RDD是什么?

  • 弹性
    • 内存和磁盘之间同步数据
    • RDD可以变成另外一个RDD
    • RDD内部存储数据类型丰富
  • 存什么数据?
    • RDD不存数据,只存数据的分区信息和读取方法(HDFS,其他RDD)
    • 依赖(向上依赖):
      • 顶部RDD
      • 非顶部RDD:记录自己来源于谁 — lineage
    • 宽依赖:有shuffle
    • 窄依赖:无shuffle
    • 失效问题?:
      • 窄依赖:只需要计算丢失RDD分区的父分区,而且不同节点之间可以并行
      • 宽依赖:单个节点失效,可能导致这个RDD所有的祖先丢失部分的分区重新计算
  • 怎么读取?
    • 有存储级别:先判断缓存是否有数据,如果有,直接读,如果没有,从磁盘读
    • 无存储级别:直接从磁盘读

参数:

内存分配:

Executor的内存分为3块:

  • executor内存【20%】:执行内存,join,aggregate都在这部分内存执行,shuffle也会缓存这个内存区,如果内存区满,写磁盘
  • storage内存【60%】:cache,persist
  • other内存:留给应用程序自己的,通常占用空间比较少

一般,executor和storage内存比较大

  • spark-1.6.0以前的版本,每类内存相互隔离,导致内存利用率不高,限制了app运行
  • spark-1.6.0以上的版本,executor和storage内存之间是可以互相借用,提高利用率,减少了OOM(out of memory)的情况

提交类参数:

  • executor - memory:每个executor内存多大
    • 执行内存:可以设置大一些,80%
    • 需要同时考虑driver-memory的设置
      一般driver不会做任何计算和存储,只是下发任务和task交互,通常默认1G,除非把不同节点数据全部都汇集(collect)到总控上,此时设置大一些
  • num - executor:多少个executor进程

  • executor - cores:每个executor有多少个core

    • cores最好不要设为1,一般设置2~4个

executor-cores * num-executors = 一共需要多少虚拟core,一般不要超过总队列的25%

举例:队列总cores400,最大不要超过100,也不建议低于10%(40)

提交任务(spark-submit)

提交任务需要Context上下文,SparkContext(SC)在Spark应用程序的执行过程中起到主导作用,负责Spark集群进行交互(申请资源,创建RDD,广播)

. / bin / spark-submit \--master yarn-cluster \--num-executors 100 \--executor-memory 6 G \--executor-cores 4 \--driver-memory 1 G \--conf spark.default.parallelism = 1000 \--conf spark.storage.memoryFraction = 0.5 \--conf spark.shuffle.memoryFraction = 0.3 \

 

Spark开发调优?

  • 避免创建重复RDD
    • 对同一份数据,之应该创建一个RDD,不能创建多个RDD来代表同一份数据
    • 极大浪费内存

  • 尽可能复用同一个RDD
    • 比如:一个RDD数据格式是k-v,另一个是单独value类型,这两个RDD的value部分完全一样,这样可以复用,达到减少算子执行次数
#优化前:JavaPairRdd
rdd1 = ... JavaRdd
rdd2 = rdd1.map(...)#优化后:JavaPairRdd
rdd1 = ... rdd1.reduceByKey(...)rdd1.map(tuple._2...)

  • 对多次使用的RDD进行持久化处理
    • 每次对一个RDD执行一个算子操作时,都会重新从源头处理计算一边,计算出那个RDD出来,然后进一步操作,这种方式性能很差
    • 对多次使用的RDD进行持久化,将RDD的数据保存在内存或磁盘中,避免重复劳动
    • 借助cache() 和persist() 方法
#cache() ://只会缓存到内存val rdd1 = sc.textFile(...).cache()rdd1.map(...)rdd1.reduce(...)

cache() 是 persist() 的特殊形式 — MEMORY_ONLY

#persist() :val rdd1 = sc.textFile(...).persist(StorageLevel.MEMORY_AND_DISK_SER)//内存充足以内存持久化优先,_SER表示序列化rdd1.map(...)rdd1.reduce(...)

 


  • 避免使用shuffle类算子
    • spark作业运行过程中,最小号性能的地方就是shuffle过程
    • 将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合和join处理,比如groupByKey,reduceByKey,join等算子,都会触发shuffle
#优化前:val rdd3 = rdd1.join(rdd2)#优化后:val rdd2Data = rdd2.collect()   //打开val rdd2DataBroadcast = sc.vroadcast(rdd2Data)  //分发val rdd3 = rdd1.map(rdd2DataBroadcast)

Broadcast+map的join操作,不会导致shuffle操作,但前提适合R得到数量较少时使用


  • 使用map-side预聚合的shuffle操作
    • 一定要使用shuffle的,无法用map类算子替代的,那么尽量使用map-site预聚合的算子
    • 思想类似MapReduce中的Combiner
    • 可能的情况下用reduceByKey或者aggregateByKey算子替代groupByKey算子,因为reduceByKey或aggregateByKey算子会使用用户自定义的函数对每个节点本低相同key进行预聚合,而groupByKey算子不会预聚合

  • 使用Kryo优化序列化性能
    • Kryo是一个序列化类库,来优化序列化和反序列化性能
    • Spark默认使用Java序列化机制(ObjectOutputStream/ObjectInputStreamAPI)进行序列 化和反序列化
    • Spark支持使用Kryo序列化库,性能比Java序列化库高很多,10倍左右
//创建SparkConf对象val conf = new SparkConf().setMaster(...)setAppName(...)//设置序列化器为KryoSerializerconf.set("spark,serializer","org.apache.spark.serializer,KryoSerializer")//注册要序列化的自定义类型conf.registerKryoClasses(Array(classOf[MyClass1],classOf[MyClass2]))

转载于:https://www.cnblogs.com/pipemm/articles/11380652.html

你可能感兴趣的文章
USACO 4.4.2 追查坏牛奶 oj1341 网络流最小割问题
查看>>
_016_信号
查看>>
[学习笔记] CS131 Computer Vision: Foundations and Applications:Lecture 1 课程介绍
查看>>
解决:对COM组件的调用返回了错误HRESULT E_FAIL
查看>>
DHTMLX 前端框架 建立你的一个应用程序 教程(五)--添加一个表格Grid
查看>>
XML的四种解析器原理及性能比较
查看>>
233 Matrix HDU - 5015 矩阵递推
查看>>
在局域网中发布 https证书问题
查看>>
iframe自适应高度计算,iframe自适应
查看>>
【POJ 3169 Layout】
查看>>
call()和原型继承的方法
查看>>
UVALive - 6893 The Big Painting 字符串哈希
查看>>
总结五个在办公中使用很爽的软件
查看>>
使用对象属性
查看>>
PWM输出,呼吸灯
查看>>
QTcpSocket发送结构体
查看>>
redmine install 菜鸟的艰难历程
查看>>
总结2:指出代码中的错误
查看>>
[唐胡璐]Selenium技巧- 抓图并保存到TestNG报告中
查看>>
P1352 没有上司的舞会
查看>>