必威体育Betway必威体育官网
当前位置:首页 > IT技术

累加器(Accumulator)

时间:2019-08-02 21:42:11来源:IT技术作者:seo实验室小编阅读:63次「手机版」
 

accumulator

public class accumulatorDemo {
    public static void main(String[]args){
        System.setProperty("hadoop.home.dir", "E:\\software\\bigdate\\hadoop-2.6.0-cdh5.15.0\\hadoop-2.6.0-cdh5.15.0");
        SparkConf conf=new SparkConf().setAPPName("AccumulatorDemo").setMaster("local");
        javaSparkcontext sc=new JavaSparkContext(conf);
        Accumulator<Long> acc=sc.accumulator(99L,new LongAccumulator());
        List<Long> seq=Arrays.asList(1L,2L,3L,4L);
        JavaRDD<Long> rdd=sc.parallelize(seq);
        rdd.foreach(new VoidFunction<Long>(){
            @Override
            public void call(Long arg0) throws Exception {
                acc.add(arg0);
            }
        });
        System.out.println(acc.value());;
    }}
public class LongAccumulator implements AccumulatorParam<Long>{
    /*
     * init 就是SparkContext.accumulator(init)参数init。
     * 这里的返回值是累计的起始值。注意,他可以不等于init。
     *
     * 如果init=10,zero(init)=0,那么运算过程如下:
     * v1:=0+step
     * v1:=v1+step
     * ...
     * ...
     * 最后v1:=v1+init
     **/
    @Override
    public Long zero(Long init) {
        System.out.println("init"+init);
        return init;
    }

    @Override
    public Long addAccumulator(Long value, Long step) {
        System.out.println(value+","+step);
        return value+step;
    }
    //执行完addAccumulator方法之后,最后会执行这个方法,将value加到init。
    @Override
    public Long addInPlace(Long init, Long value) {
        System.out.println(init+":"+value);
        return init+value;
    }
}

相关阅读

Spark累加器(Accumulator)使用详解

技术交流qq群: 659201069 鄙人的新书《elasticsearch7完全开发指南》,欢迎订阅! https://wenku.baidu.com/view/8ff2ce94591b6bd97f1

spark2.1.0自定义累加器AccumulatorV2的使用

类 类继承AccumulatorV2 class MyAccumulatorV2 extends AccumulatorV2[String, String] 覆写抽象方法: /** * @author lcjasa

分享到:

栏目导航

推荐阅读

热门阅读