accumulator
技术交流qq群: 659201069
鄙人的新书《elasticsearch7完全开发指南》,欢迎订阅!
https://wenku.baidu.com/view/8ff2ce94591b6bd97f192279168884868762b8e7
《kibana权威指南》
https://wenku.baidu.com/view/24cfee1ce43a580216fc700abb68a98270feac21
<spark累加器accumulator是spark提共的两种共享变量(广播变理和累加器)的一种。为什么要使用共享变量呢?通常情况下,当向Spark操作(如map,reduce)传递一个函数时,它会在一个远程集群节点上执行,它会使用函数中所有变量的副本。这些变量被复制到所有的机器上,远程机器上并没有被更新的变量会向驱动程序回传,也就是说有结果Driver程序是拿不到的!共享变量就是为了解决这个问题。本博文介绍其中的一种累加器Accumulator。
累加器只能够增加。 只有driver能获取到Accumulator的值(使用value方法),Task(excutor)只能对其做增加操作(使用 +=)。下面是累加器的实现代码
def accumulator[T](initialValue: T,name: String)(implicit param: org.apache.spark.AccumulatorParam[T]): org.apache.spark.Accumulator[T]
第一个参数应是数值类型,是累加器的初始值,第二个参数是该累加器的命字,这样就会在spark web ui中显示,可以帮助你了解程序运行的情况。
下面看行累加器具体的例子:
val accum = sc.longAccumulator("longAccum") //统计奇数的个数
val sum = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).filter(n=>{
if(n%2!=0) accum.add(1L)
n%2==0
}).reduce(_+_)
println("sum: "+sum)
println("accum: "+accum.value)
结果为:
sum: 20
accum: 5
这是结果正常的情况,但是在使用累加器的过程中如果对于spark的执行过程和运算模型理解的不够深入就会遇到意想不到的错误。
下面看错误的情况:
val accum= sc.accumulator(0, "ERROR Accumulator")
val data = sc.parallelize(1 to 10)
//用accumulator统计偶数出现的次数,同时偶数返回0,奇数返回1
val newData = data.map{x => {
if(x%2 == 0){
accum += 1
0
}else 1
}}
//使用action操作触发执行
newData.count
//此时accum的值为5,是我们要的结果
accum.value
//继续操作,查看刚才变动的数据,foreach也是action操作
newData.foreach(println)
//上个步骤没有进行累计器操作,可是累加器此时的结果已经是10了
//这并不是我们想要的结果
accum.value
spark中的一系列transform操作会构成DAG,此时需要通过一个action操作来触发,accumulator也是一样。因此在一个action操作之前,你调用value方法查看其数值,肯定是没有任何变化的。
所以在第一次count(action操作)之后,我们发现累加器的数值变成了5,是我们要的答案。
之后又对新产生的的newData进行了一次foreach(action操作),其实这个时候又执行了一次map(transform)操作,所以累加器又增加了5。最终获得的结果变成了10。
这种问题如何解决呢?看了上面的分析,大家都有这种印象了,那就是使用累加器的过程中只能使用一次action的操作才能保证结果的准确性。事实上,还是有解决方案的,只要将任务之间的依赖关系切断就可以了。什么方法有这种功能呢?你们肯定都想到了,cache,persist。调用这个方法的时候会将之前的依赖切除,后续的累加器就不会再被之前的transfrom操作影响到了。
//
val accum= sc.accumulator(0, "Error Accumulator")
val data = sc.parallelize(1 to 10)
//代码和上方相同
val newData = data.map{x => {...}}
//使用cache缓存数据,切断依赖。
newData.cache.count
//此时accum的值为5
accum.value
newData.foreach(println)
//此时的accum依旧是5
accum.value
总之,使用Accumulator时,为了保证准确性,只使用一次action操作。
文章最后发布于: 2018-05-07 17:04:11
相关阅读
shuffle过程:由ShuffleManager负责,计算引擎HashShuffleManager(Spark 1.2)—>SortShuffleManagerspark根据shuffle类算子进行stage的
spark2.1.0自定义累加器AccumulatorV2的使用
类 类继承AccumulatorV2 class MyAccumulatorV2 extends AccumulatorV2[String, String] 覆写抽象方法: /** * @author lcjasa
public class AccumulatorDemo { public static void main(String[]args){ System.setProperty("hadoop.home.dir",
第9章Spark 2.1.0新一代Tungsten优化引擎彻底解析
第9章 Spark 2.1.0新一代Tungsten优化引擎彻底解析 1.1 概述Spark作为一个一体化多元化的大数据处理通用平台,性能
Accumulator 比如需要对Driver端的某个变量做累加操作,累加说的就是,数值相加或字符串的拼接,直接使用foreach是实现不了的,因为该算