accumulator
accumulator
比如需要对Driver端的某个变量做累加操作,累加说的就是,数值相加或字符串的拼接,直接使用foreach是实现不了的,因为该算子无法把Executor累加的结果聚合到Driver端,可以使用Accumulator来实现累加的操作
注意:
- Accumulator只能实现累加,而且只能为Driver端的变量做累加
- Executor无法读取累加的值,只能Driver端读取
调用上下文指定accumulator方法可以实现累加(sc.accumulator(0)),该方法在Spark2.0版本后是过期方法,2.0之后需要自定义Accumulator,必须继承AccumulatorV2抽象类,重写6个方法。
例子:
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, Sparkcontext}
/**
* @Description: 用2.0版本的AccumulatorV2实现累加
* @Date: 2019/1/7 10:19
* @Auther: Dale
* @Version: 1.0
*/
object AccumulatorDemo02 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAPPName("AccumulatorDemo02").setMaster("local[2]")
val sc = new SparkContext(conf)
val nums = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
//创建自定义Accumulator
val accumulator = new AccumulatorTest
//注册累加器
sc.register(accumulator,"acc")
//不能调用transformation算子进行累加,会出现无法累加的情况
// nums.foreach(x=>accumulator.add(x))
nums.foreach(accumulator.add)
println(accumulator.value)
sc.stop()
}
}
/**
* @Description: 在继承AccumulatorV2的时候需要指定泛型,给定输入和输出的类型,然后重写方法
* @Date: 2019/1/7 10:19
* @Auther: Dale
* @Version: 1.0
*/
class AccumulatorTest extends AccumulatorV2[Int, Int] {
//初始化输出变量
var sum: Int = _
//判断初始值是否为空
override def isZero: Boolean = sum == 0
//copy一个新的累加器
override def copy(): AccumulatorV2[Int, Int] = {
//需要创建当前自定义的累加器对象
val acc = new AccumulatorTest
//需要将当前数据拷贝到新的累加器数据里面
//将原有累加器中的数据copy到新的累加器数据中
acc.sum = this.sum
acc
}
//重置一个累加器,将累加器中的数据初始化
override def reset(): Unit = sum = 0
//给定具体累加的过程,属于每一个分区进行累加的方法(局部累加方法)
override def add(v: Int): Unit = {
//v就是该分区中的某个元素
sum += v
}
//全局累加,合并每一个分区的累加值
override def merge(other: AccumulatorV2[Int, Int]): Unit = this.sum += other.value
//输出值
override def value: Int = sum
}
Spark实现了一些简单的用于数值累加的累加器,不用再去定义