必威体育Betway必威体育官网
当前位置:首页 > IT技术

Spark累加器(Accumulator)使用详解

时间:2019-11-03 18:41:10来源:IT技术作者:seo实验室小编阅读:86次「手机版」
 

accumulator

技术交流qq群: 659201069

鄙人的新书《elasticsearch7完全开发指南》,欢迎订阅!

https://wenku.baidu.com/view/8ff2ce94591b6bd97f192279168884868762b8e7

《kibana权威指南》

https://wenku.baidu.com/view/24cfee1ce43a580216fc700abb68a98270feac21

<spark累加器accumulator是spark提共的两种共享变量(广播变理和累加器)的一种。为什么要使用共享变量呢?通常情况下,当向Spark操作(如map,reduce)传递一个函数时,它会在一个远程集群节点上执行,它会使用函数中所有变量的副本。这些变量被复制到所有的机器上,远程机器上并没有被更新的变量会向驱动程序回传,也就是说有结果Driver程序是拿不到的!共享变量就是为了解决这个问题。本博文介绍其中的一种累加器Accumulator。

累加器只能够增加。 只有driver能获取到Accumulator的值(使用value方法),Task(excutor)只能对其做增加操作(使用 +=)。下面是累加器的实现代码

def accumulator[T](initialValue: T,name: String)(implicit param: org.apache.spark.AccumulatorParam[T]): org.apache.spark.Accumulator[T] 

第一个参数应是数值类型,是累加器的初始值,第二个参数是该累加器的命字,这样就会在spark web ui中显示,可以帮助你了解程序运行的情况。

下面看行累加器具体的例子:

val accum = sc.longAccumulator("longAccum") //统计奇数的个数  
val sum = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).filter(n=>{  
  if(n%2!=0) accum.add(1L)   
  n%2==0  
}).reduce(_+_)  
  
println("sum: "+sum)  
println("accum: "+accum.value)  

结果为:

sum: 20

accum: 5

这是结果正常的情况,但是在使用累加器的过程中如果对于spark的执行过程和运算模型理解的不够深入就会遇到意想不到的错误。

下面看错误的情况:

val accum= sc.accumulator(0, "ERROR Accumulator")
val data = sc.parallelize(1 to 10)
//用accumulator统计偶数出现的次数,同时偶数返回0,奇数返回1
val newData = data.map{x => {
  if(x%2 == 0){
    accum += 1
      0
    }else 1
}}
//使用action操作触发执行
newData.count
//此时accum的值为5,是我们要的结果
accum.value

//继续操作,查看刚才变动的数据,foreach也是action操作
newData.foreach(println)
//上个步骤没有进行累计器操作,可是累加器此时的结果已经是10了
//这并不是我们想要的结果
accum.value

spark中的一系列transform操作会构成DAG,此时需要通过一个action操作来触发,accumulator也是一样。因此在一个action操作之前,你调用value方法查看其数值,肯定是没有任何变化的。

所以在第一次count(action操作)之后,我们发现累加器的数值变成了5,是我们要的答案。

之后又对新产生的的newData进行了一次foreach(action操作),其实这个时候又执行了一次map(transform)操作,所以累加器又增加了5。最终获得的结果变成了10。

这种问题如何解决呢?看了上面的分析,大家都有这种印象了,那就是使用累加器的过程中只能使用一次action的操作才能保证结果的准确性。事实上,还是有解决方案的,只要将任务之间的依赖关系切断就可以了。什么方法有这种功能呢?你们肯定都想到了,cachepersist。调用这个方法的时候会将之前的依赖切除,后续的累加器就不会再被之前的transfrom操作影响到了。

//
val accum= sc.accumulator(0, "Error Accumulator")
val data = sc.parallelize(1 to 10)

//代码和上方相同
val newData = data.map{x => {...}}
//使用cache缓存数据,切断依赖。
newData.cache.count
//此时accum的值为5
accum.value

newData.foreach(println)
//此时的accum依旧是5
accum.value

总之,使用Accumulator时,为了保证准确性,只使用一次action操作。

文章最后发布于: 2018-05-07 17:04:11

相关阅读

关于spark shuffle过程的理解

shuffle过程:由ShuffleManager负责,计算引擎HashShuffleManager(Spark 1.2)—>SortShuffleManagerspark根据shuffle类算子进行stage的

spark2.1.0自定义累加器AccumulatorV2的使用

类 类继承AccumulatorV2 class MyAccumulatorV2 extends AccumulatorV2[String, String] 覆写抽象方法: /** * @author lcjasa

累加器(Accumulator)

public class AccumulatorDemo { public static void main(String[]args){ System.setProperty("hadoop.home.dir",

第9章Spark 2.1.0新一代Tungsten优化引擎彻底解析

第9章  Spark 2.1.0新一代Tungsten优化引擎彻底解析  1.1      概述Spark作为一个一体化多元化的大数据处理通用平台,性能

Accumulator

Accumulator 比如需要对Driver端的某个变量做累加操作,累加说的就是,数值相加或字符串的拼接,直接使用foreach是实现不了的,因为该算

分享到:

栏目导航

推荐阅读

热门阅读