必威体育Betway必威体育官网
当前位置:首页 > IT技术

Java高并发(四)——ThreadPool,线程复用

时间:2019-09-07 06:44:25来源:IT技术作者:seo实验室小编阅读:72次「手机版」
 

threadpool

前边我们讲述了:java高并发——了解并行世界、Java高并发——多线程基础、Java高并发——多线程协作,同步控制 。从1,线程是什么?为什么需要多线程?2,Java对多线程的基础操作:线程的状态扭转,线程的创建、终止、中断、等待和通知、挂起和执行、等待结束和谦让,volatile关键字,线程组进行分类管理,守护线程,线程优先级,线程共享资源安全和synchronized进行控制。3,多线程的协作——同步控制:ReentrantLock重入锁为什么叫重入锁、中断的支持、等待时间控制、公平锁支持,Condition条件实现等待通知,Semahore信号量对一个共享资源控制多个线程的访问,ReadWriteLock读写锁对读多写少场景的支持,countdownlatch倒计时器对线程的统一汇总,CyclicBarrier循环栅栏对线程进行循环统一汇总的支持,LockSupport利用信号量原理对线程进行阻塞和可执行……  而,这么多线程的操作都有了,我们如何管理这些线程呢?

  举个例子:工人干活,工人可以比喻为线程,工人如何干活好比单个线程的各种处理,工人如何分配公共工具,如何进行协作好比线程对共享资源的安全处理,以及各种协作通知。而工人的管理怎么办?线程的管理怎么办?总不能用的时候去招聘一个,不用的时候就解雇了吧(如果工作量不大的话,好像也行啊……)。好,下边我么看看线程池,好比工人团队。我们先看张思维导图吧,从这几个方便进行学习总结一下吧:

一,什么是线程池:

  这个从上边的例子,想必已经能有很好的理解了。想必大家都用过数据库连接池,其实线程池也一样:为了避免系统频繁的创建和销毁线程(招聘解雇工人)带来的性能消耗,可以让线程得到很好的复用。当需要使用线程时从线程池取(原来是创建),当用完时归还线程池(原来是销毁)。

二,JDK中的线程池:

1,先看下JDK对线程池进行各种控制的类关系图(Executor框架),可以看着JDK的源码类,进行解读。通过Executors类方法可以看出,它的方法ExecutorService newFixedThreadPool(int nThreads)可以创建固定数量的线程池;方法ExecutorService newSingleThreadExecutor()可以创建只有一个线程的线程池;方法ExecutorService newcachedThreadPool() 可以返回一个根据实际情况调整线程数量的线程池;方法scheduledexecutorservice newSingleThreadScheduledExecutor()可以返回一个可以执行定时任务线程的线程池;方法ScheduledExecutorService newScheduledThreadPool(int corePoolSize)可以返回指定数量可以执行定时任务的线程池。当然这些方法都有重载方法,例如参数中加入自定义的ThreadFacotry等。

2,看下固定大小线程池的简单使用:

public class FixThreadPoolDemo {

    public static class MyTask implements Runnable{

        public void run() {
            System.out.println(system.currenttimemillis() + "Thread Name:" + Thread.currentThread().getName());

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printstacktrace();
            }
        }
    }

    public static void main(String[] args) {
        MyTask myTask = new MyTask();
        int size =5;
        //下篇说下阿里技术规范插件对这个的提示问题
//        ExecutorService executorService = new ThreadPoolExecutor(size,size,0L, TimeUnit.MILLISECONDS,new Linkedblockingqueue<Runnable>());
//        ThreadFactory namedThreadFactory = new ThreadFactorybuilder().setNameFormat("thread-call-runner-%d").build();
//        ExecutorService executorService2 = new ThreadPoolExecutor(size,size,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),namedThreadFactory);

        ExecutorService es = Executors.newFixedThreadPool(size);
        for (int i = 0; i < 10 ; i++) {
            es.submit(myTask);
        }

    }

}

3,看下定时任务线程池的简单使用:

public class ScheduledThreadPoolDemo {

    public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        /**
         * 1,executorService.scheduleatfixedrate:创建一个周期性任务,从上个任务开始,过period周期执行下一个(如果执行时间>period,则以执行时间为周期)
         * 2,executorService.scheduleWithFixedDelay:创建一个周期上午,从上个任务结束,过period周期执行下一个。
         */
        //如果前边任务没有完成则调度也不会启动
        executorService.scheduleAtFixedRate(new Runnable() {
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println("当前时间:" + System.currentTimeMillis()/1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },0,2, TimeUnit.SECONDS);
    }
}

三,线程池的实现原理:

1,通过看JDK源码可以知道上边五种类型的线程池创建无论哪种最后都是ThreadPoolExecutor类的封装,我们来看下ThreadPoolExecutor最原始的构造函数,和调度execute源码:

/**
 * 1,corePoolSize:指定线程池中活跃的线程数量
 * 2,maximumPoolSize:指定线程池中最大线程数量
 * 3,keepAliveTime:超过corePoolSize个多余线程的存活时间
 * 4,unit:keepAliveTime的时间单位
 * 5,workQueue:任务队列,被提交但尚未被执行的任务
 * 6,threadFactory:线程工厂,用于创建线程
 * 7,handler拒绝策略:当任务太多来不及处理时,如何拒绝任务
 */
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedexecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new illegalargumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                Accesscontroller.getcontext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }



/**
 * 三步:1,创建线程直到corePoolSize;2,加入任务队列;3,如果还是执行不过来,则执行拒绝策略
 */
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

2,workQueue,当任务被提交但尚未被执行的任务队列,是一个BlockingQueue接口的对象,只存放Runnable对象。根据队列功能分类,看下JDK提供的几种BlockingQueue:

BlockingQueue分类说明
work queue class name 说明
SynchronousQueue 直接提交队列:没有容量,每一个插入操作都要等待一个相应的删除操作。通常使用需要将maximumPoolSize的值设置很大,否则很容易触发拒绝策略。
ArrayBlockingQueue 有界的任务队列:任务大小通过入参 int capacity决定,当填满队列后才会创建大于corePoolSize的线程。
LinkedBlockingQueue 无界的任务队列:线程个数最大为corePoolSize,如果任务过多,则不断扩充队列,知道内存资源耗尽。
priorityBlockingQueue 优先任务队列:是一个无界的特殊队列,可以控制任务执行的先后顺序,而上边几个都是先进先出的策略。

3,ThreadFactory,是用来创建线程池中的线程工厂类,我们来看下JDK中的默认DefaultThreadFactory类,并自己写个试试:

/**
 * The default thread factory
 */
    static class DefaultThreadFactory implements ThreadFactory {
        private static final Atomicinteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setpriority(Thread.NORM_PRIORITY);
            return t;
        }
    }



/**
 * 自定义试试
 */
public class ThreadFactoryDemo {
    public static class MyTask implements Runnable{

        public void run() {
            System.out.println(System.currentTimeMillis() + "Thread Name:" + Thread.currentThread().getName());

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        FixThreadPoolDemo.MyTask myTask = new FixThreadPoolDemo.MyTask();
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
            public Thread newThread(Runnable r) {
                Thread t =new Thread(r);
                t.setDaemon(true);
                System.out.println("create thread" + t.getName());
                return  t;
            }
        });
        for (int i = 0; i < 10 ; i++) {
            es.submit(myTask);
        }

    }
}

4,拒绝策略:如果线程池处理速度达不到任务的出现速度时,只能执行拒绝策略,看下JDK提供几种,然后自定义看个例子:

JDK提供的线程池拒绝策略
策略名称 描述
AbortPolicy 该策略会直接抛出异常,阻止系统正常 工作。线程池默认为此。
CallerRunsPolicy 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。
DiscardOledestPolicy 该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试重新提交当前任务。
DiscardPolicy 该策略默默地丢弃无法处理的任务,不予任务处理。
public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable{

        public void run() {
            System.out.println(System.currentTimeMillis() + "Thread Name:" + Thread.currentThread().getName());

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        FixThreadPoolDemo.MyTask myTask = new FixThreadPoolDemo.MyTask();
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(r.toString() + "is discard");
            }
        });
        for (int i = 0; i < 20 ; i++) {
            es.submit(myTask);
        }

    }
}

四,线程池的扩展:JDK已经对线程池做了非常好的编写,如果我们想扩展怎么办呢?ThreadPoolExecutor提供了三个方法供我们使用:beforeExecute()每个线程执行前,afterExecute()每个线程执行后,terminated()线程池退出时。我们只要对这个三方法进行重写即可:

public class ExtThreadPoolDemo {
    public static class MyTask implements Runnable{

        public void run() {
            System.out.println(System.currentTimeMillis() + "Thread Name:" + Thread.currentThread().getName());

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        FixThreadPoolDemo.MyTask myTask = new FixThreadPoolDemo.MyTask();
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10)){
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行线程:" + r.toString() +"==="  + t.getName());
            }

            @Override
            protected void afterExecute(Runnable r, throwable t) {
                System.out.println("执行完成线程:" + r.toString());
            }

            @Override
            protected void terminated() {
                System.out.println("线程池退出" );
            }
        };
        for (int i = 0; i < 10 ; i++) {
            es.submit(myTask);
        }
        Thread.sleep(3000);
        es.shutdown();
    }
}

五,线程数量的优化:线程池的大小对系统性能有一定的影响,过大或者过小都无法发挥系统的最佳性能。但是也没有必要做的特别精确,只是不要太大,不要太小即可。我们可以根据此公式进行粗略计算:线程池个数=cpu的数量*CPU的使用率*(1+等待时间/计算时间)。当然了还需要根据实际情况,积累实际经验,来进行判断。

六,线程池中的堆栈信息,我们通过下边两个例子进行查看:

/**
 * 会吃掉一些异常信息,非常不友好
 */
public class ExceptionThreadPoolDemo {
    public static class MyTask implements Runnable {
        int a, b;

        public MyTask(int a, int b) {
            this.a = a;
            this.b = b;
        }

        public void run() {

            double re = a / b;
            System.out.println(re);

        }
    }
    public static void main(String[] args) {
        //ExecutorService es = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE,0L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
        ExecutorService es = new ThreadPoolExecutor(0,Integer.MAX_VALUE,0L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
        for (int i = 0; i < 5; i++) {
            //不进行日志打印
            //es.submit(new MyTask(100,i));
            //进行日志打印,只是打印了具体方法错误:Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
            //	at com.ljh.thread.thread_pool.ExceptionThreadPoolDemo$MyTask.run(ExceptionThreadPoolDemo.java:24)
            //	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            //	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            //	at java.lang.Thread.run(Thread.java:748)
            es.execute(new MyTask(100,i));
        }

    }
}




/**
 * 通过自定自己的线程池,重写execute和submit方法,将异常打印出来:
 */
public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
    public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command,ljhTrace(),Thread.currentThread().getName()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(wrap(task,ljhTrace(),Thread.currentThread().getName()));
    }

    private Exception ljhTrace(){
        return new Exception("ljh-stack-trace");
    }

    private Runnable wrap(final Runnable task , final  Exception ljhException,String threadName){
        return  new Runnable() {
            public void run() {
                try {
                    task.run();
                }catch (Exception e){
                    ljhException.printStackTrace();
                     e.printStackTrace();
                }

            }
        };
    }
}

七,最后看下分而治之:Fork/Join:大家都知道hadoop中的Map-Reduce分开处理,合并结果;当今流行的分布式,将用户的请求分散处理等等。分而治之是非常有用实用的。JDK帮我们提供了ForkJoinPool线程池,供我们做这些处理,有两个子类供我们使用,Recursive有返回值,RecursiveAction无返回值,看个例子吧:

public class ForkjointhreadPoolDemo extends RecursiveTask<Long> {

    private static final int threshold = 10000;
    private long start;
    private long end;

    public ForkJoinThreadPoolDemo(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long sum = 0;
        boolean canCompute = (end - start) < THRESHOLD;
        if (canCompute) {
            for (long i = start; i < end; i++) {
                sum += i;
            }
        } else {
            //分成100份进行处理
            long step = (start + end) / 50;
            ArrayList<ForkJoinThreadPoolDemo> subTasks = new ArrayList<ForkJoinThreadPoolDemo>();
            long pos = start;
            for (int i = 0; i < 50; i++) {
                long lastOne = pos + step;
                if (lastOne > end) {
                    lastOne = end;
                }
                    ForkJoinThreadPoolDemo subTask = new ForkJoinThreadPoolDemo(pos, lastOne);
                    pos += step;
                    subTasks.add(subTask);
                    subTask.fork();
            }
            for (ForkJoinThreadPoolDemo t : subTasks) {
                sum += t.join();
            }
        }
        return sum;
    }

    //结果199990000
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinThreadPoolDemo task = new ForkJoinThreadPoolDemo(0, 20000);
        ForkJoinTask<Long> result = forkJoinPool.submit(task);

        try {
            long res = result.get();
            System.out.println("结果" + res);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }
}

好,JDK线程池的知识就到这吧,困了,睡觉了……

相关阅读

Thread、ThreadPool

Thread、ThreadPool 同步编程与异步编程 同步编程:在没有使用线程的情况下,我们编写的代码都是顺序执行的,都是在主线程上执行。 异

定时任务ScheduledThreadPoolExecutor的使用详解

定时任务ScheduledThreadPoolExecutor的使用详解前短时间需要用到一个定时器处理蓝牙设备接收的数据,并且需要处理的频率是很快的,

线程池原理(四):ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor用于定时任务,这里的定时意义在于: 指定延时后执行任务。 周期性重复执行任务。 我们接着分析Sched

线程池之 newScheduledThreadPool中scheduleAtFixedRa

说明:在处理消费数据的时候,统计tps,需要用一个线程监控来获得tps值,则使用了定时任务的线程池中的方法 scheduleAtFixedRate(),此方法

难道调用ThreadPool.QueueUserWorkItem()的时候,真是必须

原文地址为:难道调用ThreadPool.QueueUserWorkItem()的时候,真是必须调用Thread.Sleep(N)吗?开门见山,下面的例子中通过调用ThreadPool.Qu

分享到:

栏目导航

推荐阅读

热门阅读