accumulator
public class accumulatorDemo {
public static void main(String[]args){
System.setProperty("hadoop.home.dir", "E:\\software\\bigdate\\hadoop-2.6.0-cdh5.15.0\\hadoop-2.6.0-cdh5.15.0");
SparkConf conf=new SparkConf().setAPPName("AccumulatorDemo").setMaster("local");
javaSparkcontext sc=new JavaSparkContext(conf);
Accumulator<Long> acc=sc.accumulator(99L,new LongAccumulator());
List<Long> seq=Arrays.asList(1L,2L,3L,4L);
JavaRDD<Long> rdd=sc.parallelize(seq);
rdd.foreach(new VoidFunction<Long>(){
@Override
public void call(Long arg0) throws Exception {
acc.add(arg0);
}
});
System.out.println(acc.value());;
}}
public class LongAccumulator implements AccumulatorParam<Long>{
/*
* init 就是SparkContext.accumulator(init)参数init。
* 这里的返回值是累计的起始值。注意,他可以不等于init。
*
* 如果init=10,zero(init)=0,那么运算过程如下:
* v1:=0+step
* v1:=v1+step
* ...
* ...
* 最后v1:=v1+init
**/
@Override
public Long zero(Long init) {
System.out.println("init"+init);
return init;
}
@Override
public Long addAccumulator(Long value, Long step) {
System.out.println(value+","+step);
return value+step;
}
//执行完addAccumulator方法之后,最后会执行这个方法,将value加到init。
@Override
public Long addInPlace(Long init, Long value) {
System.out.println(init+":"+value);
return init+value;
}
}
相关阅读
技术交流qq群: 659201069 鄙人的新书《elasticsearch7完全开发指南》,欢迎订阅! https://wenku.baidu.com/view/8ff2ce94591b6bd97f1
spark2.1.0自定义累加器AccumulatorV2的使用
类 类继承AccumulatorV2 class MyAccumulatorV2 extends AccumulatorV2[String, String] 覆写抽象方法: /** * @author lcjasa