Java多线程-12(线程管理中的线程池)

                                                           线程池

                                                                                              个人博客:www.xiaobeigua.icu


1.2  线程池

        线程池就是有效使用线程的一种常用方式。线程池内部可以预先创建一定数量的工作线程,客户端代码直接将任务作为一个对象提交给线程池,线程池将这些任务缓存在工作队列中, 线程池中的工作线程不断地从队列中取出任务并执行。

1.2.1什么是线程池

        可以以 new Thread( () -> { 线程执行的任务 }).start();(λ 表达式) 这种形式开启一个线程. 当 run()方法运行结束,线程对象会被 GC 释放。

        在真实的生产环境中,可能需要很多线程来支撑整个应用,当线程数量非常多时 ,反而会耗尽 CPU 资源。 如果不对线程进行控制与管理,反而会影响程序的性能。

        线程主要开销包括:

                 创建与启动线程的开销;

                 线程销毁开销;

                 线程调度的开销;

        线程数量受限 CPU 处理器数量。

        线程池就是有效使用线程的一种常用方式。线程池内部可以预先创建一定数量的工作线程,客户端代码直接将任务作为一个对象提交给线程池,线程池将这些任务缓存在工作队列中, 线程池中的工作线程不断地从队列中取出任务并执行。

                Java多线程-12(线程管理中的线程池)

                                         Java多线程-12(线程管理中的线程池)

1.2.2 JDK 对线程池的支持

JDK 提供了一套 Executor 框架,可以帮助开发人员有效的使用线程池

Java多线程-12(线程管理中的线程池)

        在java中,线程池有很多的实现类,一般情况下,我们可可以使用上图中的ThreadPoolExecutor实现类 或者Executors工具类来实现创建线程池 。更多线程池类后面会再分析

例子:使用 Executors工具类来创建和使用线程池

/**  * 线程池的基本使用  * 作者:小北呱  */ public class TestOne {     public static void main(String[] args) {          //使用Executors创建 线程数为5的线程池         ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);          //创建15个任务分配给线程池         for (int i=0;i<15;i++){             //执行任务             fixedThreadPool.execute(new Runnable() {                 @Override                 public void run() {                     System.out.println(Thread.currentThread().getId()+"线程执行了任务....");                     try {                         Thread.sleep(2000);                     } catch (InterruptedException e) {                         e.printStackTrace();                     }                 }             });         }      }  } 

 结果: 一个把15个任务交给了线程池,但是里面只有5个活动线程,所以一次性只能执行五个任务,身下的任务就会进入任务等待队列中等待

        Java多线程-12(线程管理中的线程池)

1.2.3 核心线程池的底层实现 

        查看 Executors 工 具 类 中 newCachedThreadPool(), newSingleThreadExcecutor(), newFixedThreadPool()源码:          

 注意:这几个方法是用来创建不同功能的线程池的

public static ExecutorService newCachedThreadPool() {      return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());  }
public static ExecutorService newFixedThreadPool(int nThreads) {          return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());  }
public static ExecutorService newSingleThreadExecutor() {     return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));  }

        发现 Excutor 工具类中返回线程池的方法底层都使用了 ThreadPoolExecutor 线程池,所以我们可以发现这些方法都是 ThreadPoolExecutor 线程池的封装

        我们再来看看ThreadPoolExecutor 的构造方法:

ThreadPoolExecutor 的构造方法:      public ThreadPoolExecutor(int corePoolSize,                                int maximumPoolSize,                                long keepAliveTime,                                 TimeUnit unit,                                BlockingQueue<Runnable> workQueue,                                 ThreadFactory threadFactory,                                RejectedExecutionHandler handler)

各个参数含义:

        corePoolSize, 指定线程池中核心线程的数量

        maxinumPoolSize,指定线程池中最大线程数量

        keepAliveTime,当线程池线程的数量超过 corePoolSize 时,多余的空闲线程的存活时长,即空闲线程在多长时长内销毁    unit, 是 keepAliveTime 时长单位

        workQueue,任务队列,把任务提交到该任务队列中等待执行

        threadFactory,线程工厂,用于创建线程 handler 拒绝策略,当任务太多来不及处理时,如何拒绝

        handler 拒绝策略,当任务太多来不及处理时,如何拒绝

workQueue 说明:

        workQueue 工作队列是指提交未执行的任务队列 , 它是 BlockingQueue 接口的对象,仅用于存储 Runnable 任务。根据队列功能分类,在ThreadPoolExecutor 构造方法中可以使用以下几种阻塞队列:

        1) 直接提交队列,Java多线程-12(线程管理中的线程池)

        2) 有界任务队列 ,由 ArrayBlockingQueue 对象提供 , 在 创 建 ArrayBlockingQueue 对象时,可以指定一个容量。当有任务需要执行时,如果线程池中线程数小于 corePoolSize 核心线程数则创建新的线程;如果大于 corePoolSize 核心线程数则加入等待队列。如果队列已满则无法加入,在线程数小于 maxinumPoolSize 指定的最大线程数前提下会创建新的线程来执行 , 如果线程数大 于 maxinumPoolSize 最大线程数则执行拒绝策略。

        ​​​​​​​        Java多线程-12(线程管理中的线程池)

        3) 无界任务队列,由 LinkedBlockingQueue 对象实现,与有界队列相比,除非系统资源耗尽,否则无界队列不存在任务入队失败的情况。当有新的任务时,在系统线程数小于 corePoolSize 核心线程数则创建新的线程来执行任务,当线程池中线程数量大于corePoolSize 核心线程数则把任务加入阻塞队列。

        ​​​​​​​        Java多线程-12(线程管理中的线程池)

  

        4) 优先任务队列是通过 PriorityBlockingQueue 实现的,是带有任务优先级的队列 , 是 一 个特殊的无界队列。 不管是 ArrayBlockingQueue 队列还是 LinkedBlockingQueue 队列都是按照 先进先出算法处理任务的。在 PriorityBlockingQueue 队列中可以根 据任务优先级顺序先后执行。

1.2.4 拒绝策略

        ThreadPoolExecutor 构造方法的最后一个参数指定了拒绝策略。当提交给线程池的任务量超过实际承载能力时,如何处理? 即线程池中的线程已经用完了,等待队列也满了,无法为新提交的任务服务,可以通过拒绝策略来处理这个问题。

        JDK 提供了四种拒绝策略:

        AbortPolicy 策略,会抛出异常

        CallerRunsPolicy 策略,只要线程池没关闭,会在调用者线程中运行当前被丢弃的任务

        DiscardOldestPolicy 将任务队列中最老的任务丢弃,尝试再次提交新任务

        DiscardPolicy 直接丢弃这个无法处理的任务

        Executors 工具类提供的静态方法返回的线程池默认的拒绝策略是 AbortPolicy 抛出异常,如果内置的拒绝策略无法满足实际需求,可以扩展 RejectedExecutionHandler 接口 

例子:使用 ThreadPoolExecutor创建 固定大小为2的线程池 给他分配5个任务.

public class TestOne {     public static void main(String[] args) {                //使用 ThreadPoolExecutor创建 固定大小为2的线程池 使用了new SynchronousQueue<>()表示当线程池线程都在工作时,新的任务直接拒绝         ThreadPoolExecutor threadPool=new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new RejectedExecutionHandler() {             @Override             public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {                 //r就是请求任务,executor就是当前线程池                 System.out.println(r+"线程池被拒绝了");             }         });          //创建5个任务分配给线程池         for (int i=0;i<5;i++){             //执行任务             threadPool.execute(new Runnable() {                 @Override                 public void run() {                     System.out.println(Thread.currentThread().getId()+"线程执行了任务....");                     try {                         Thread.sleep(2000);                     } catch (InterruptedException e) {                         e.printStackTrace();                     }                 }             });         }      }  } 

结果:

        Java多线程-12(线程管理中的线程池)

 分析:

        我们只在线程池中创建了最大数量为 2 的线程 ,使用了new SynchronousQueue<>()的直接提交工作队列表示当线程吃的线程都在工作时,新的任务直接拒绝,所以当5个任务进入线程池时,只有两个可以被线程池中的线程执行,剩下的3个任务直接被拒绝了

1.2.5  ThreadFactory

        线程池中的线程从哪儿来的? 答案就是 ThreadFactory。

        ThreadFactory 是一个接口,只有一个用来创建线程的方法: Thread  newThread(Runnable r); 当线程池中需要创建线程时就会调用该方法

        ​​​​​​​        ​​​​​​​        ​​​​​​​        Java多线程-12(线程管理中的线程池)

例子:创建线程池时重写 线程工厂 :将创建的线程设值为 守护线程 

public class TestOne {     public static void main(String[] args) throws InterruptedException {          //定义任务         Runnable r=new Runnable() {             @Override             public void run() {                 System.out.println(Thread.currentThread().getId()+"线程正在执行任务....");                 while (true){                     //模拟任务                  }              }         };          ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactory() {             @Override             public Thread newThread(Runnable r) {                 //根据参数r任务,创建一个线程来执行任务                 Thread thread=new Thread(r);                 System.out.println("线程池创建了守护线程"+thread.getId());                 //将线程设置为守护线程                 thread.setDaemon(true);                 return thread;             }         });           //提交 5 个任务, 当给当前线程池提交的任务超过 5 个时,线程池默认抛出异常         for (int i = 0; i < 5; i++) {            poolExecutor.execute(r);         }                   //主线程睡眠4s确保线程执行任务         Thread.sleep(4000);      }  }

 结果:

Java多线程-12(线程管理中的线程池)

1.2.6 监控线程池

        ThreadPoolExecutor 提供了一组方法用于监控线程池,进而更好的获得线程池的内部状态,保障系统安全

        ​​​​​​​        Java多线程-12(线程管理中的线程池)

        int getActiveCount() 获得线程池中当前活动线程的数量

        long getCompletedTaskCount() 返回线程池完成任务的数量

        int getCorePoolSize() 线程池中核心线程的数量

        int getLargestPoolSize() 返回线程池曾经达到的线程的最大数

        int getMaximumPoolSize() 返回线程池的最大容量

        int getPoolSize() 当前线程池的大小

        BlockingQueue getQueue() 返回阻塞队列 long

        getTaskCount() 返回线程池收到的任务总数  

例子 :使用上述方法来监控线程池

public class TestOne {     public static void main(String[] args) throws InterruptedException {          //定义任务         Runnable runnable=new Runnable() {             int i=0;             @Override             public void run() {                 //模拟正在执行任务                 System.out.println(" ");              }         };           //创建线程池         ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(2,5,0,TimeUnit.SECONDS,new SynchronousQueue<>(),new ThreadPoolExecutor.AbortPolicy());           //创建10个任务         for (int i=0;i<5;i++){             System.out.println("提交第"+(i+1)+"次任务===============================");             poolExecutor.execute(runnable);             System.out.println("线程池中核心线程的数量:"+poolExecutor.getCorePoolSize());             System.out.println("线程池中最大容量:"+poolExecutor.getMaximumPoolSize());             System.out.println("线程池的活动线程数量:"+poolExecutor.getActiveCount());             System.out.println("当前线程池的大小:"+poolExecutor.getPoolSize());             System.out.println("线程池玩成任务的数量:"+poolExecutor.getCompletedTaskCount());             System.out.println("线程池收到的总任务数:"+poolExecutor.getTaskCount());             Thread.sleep(2000);         }        }  } 

结果:

Java多线程-12(线程管理中的线程池)

         Java多线程-12(线程管理中的线程池)

 Java多线程-12(线程管理中的线程池)

1.2.7 优化线程池大小

        线程池大小对系统性能是有一定影响的,过大或者过小都会无法发挥最优的系统性能,,线程池大小不需要非常精确,只要避免极大或 者极小的情况即可,一般来说,线程池大小需要考虑 CPU 数量,内存大小等因素。

        在书中给出一个估算线程池大小的公式:

                线程池大小 = CPU 的数量 * 目标 CPU 的使用率*( 1 + 等待时间 与计算时间的比)

1.2.8 线程池死锁

         如果在线程池中执行的 任务 A 在执行过程中又向线程池提交了任务 B, 任务 B 添加到了线程池的等待队列中,如果任务 A 的结束需要等待任务 B 的执行结果。就有可能会出现这种情况: 线程池中所有的工作线程都处于等待任务处理结果,而这些任务在阻塞队列中等待执行, 线程池中没有可以对阻塞队列中的任务进行处理的线程,这种 等待会一直持续下去,从而造成死锁。

        ​​​​​​​        ​​​​​​​        ​​​​​​​        ​​​​​​​        Java多线程-12(线程管理中的线程池)

         适合给线程池提交相互独立的任务,而不是彼此依赖的任务。对于彼此依赖的任务,可以考虑分别提交给不同的线程池来执行。 

1.2.9   异常简单处理

在使用 ThreadPoolExecutor 进行 submit 提交任务时,有的任务抛出 了异常,但是线程池并没有进行提示,即线程池把任务中的异常给吃掉 了,可以把 submit 提交改为 execute执行

例子:使用线程池执行4个 数学计算任务  x/y  的结果

public class TestOne {      //定义内部任务类 计算x/y的和     static class MyRun implements Runnable{         private int x;         private int y;         private float result;         public MyRun(int x,int y){             this.x=x;             this.y=y;         }          @Override         public void run() {             result=x/y;             System.out.println("输入X="+x+" y="+y+"任务结果为: "+result);         }     }      public static void main(String[] args) throws InterruptedException {           //创建线程池         ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(2,5,0,TimeUnit.SECONDS,new LinkedBlockingDeque<>(),new ThreadPoolExecutor.AbortPolicy());           //创建4个任务              poolExecutor.submit(new MyRun(0,5));             poolExecutor.submit(new MyRun(5,1));             poolExecutor.submit(new MyRun(5,2));             poolExecutor.submit(new MyRun(5,0));          }        } 

结果:

        可以发现 有四个任务 但是 计算完只显示了3个结果  因为 一个 任务是 5/0这是不被允许的,出现了异常 By Zero  ,但是使用了submit的提交方式 异常被隐藏了,我们看不到

Java多线程-12(线程管理中的线程池)

这里我们只需要把 submit 改为 execute就好

 Java多线程-12(线程管理中的线程池)

 结果: 成功抛出了异常

Java多线程-12(线程管理中的线程池)

1.2.10 ForkJoinPool 线程池

        “分而治之” 是一个有效的处理大数据的方法,著名的大数据基石 MapReduce 就是采用这种分而治之的思路。简单点说,如果要处理的 1000 个数据,但是我们不具备处理1000个数据的能力,可以只处理10个数据,可以把这 1000 个数据分阶段处理 100 次,每次处理 10 个,把 100 次的处理结果进行合成,形成最后这 1000 个数据的处理结果。

        把一个大任务调用 fork()方法分解为若干小的任务,把小任务的处 理结果进行 join()合并为大任务的结果 

        ​​​​​​​        Java多线程-12(线程管理中的线程池)

         系统对 ForkJoinPool 线程池进行了优化,提交的任务数量与线程的数量不一定是一对一关系。在多数情况下,一个物理线程实际上需要处理多个逻辑任务。也就是说 有的线程可能处理一个任务,有的线程可能要处理多个任务

Java多线程-12(线程管理中的线程池)

        ForkJoinPool 线程池中最常用的方法是:<T> ForkJoinTask 类中的 <T>submit(ForkJoinTask task), 向线程池提交一个 ForkJoinTask 任务。ForkJoinTask 任务支持 fork()分解与join()等待的任务。

        ForkJoinTask 有 两 个 重 要 的 子 类 :RecursiveAction 和 RecursiveTask ,它们的区别在于 RecursiveAction 任务没有返回值, RecursiveTask 任务可以带有返回值

例子: 这个例子是借鉴了别人了,注解写的很详细,有需要自取

public class TestOne {     //计算数列的和, 需要返回结果,可以定义任务继承 RecursiveTask     private static class CountTask extends RecursiveTask<Long> {         private static final int THRESHOLD = 10000; //定义数据规模的阈值,允许计算10000 个数内的和,超过该阈值的数列就需要分解         private static final int TASKNUM = 100; //定义每次把大任务分解为 100 个小任务         private long start; //计算数列的起始值         private long end; //计算数列的结束值          public CountTask(long start, long end) {             this.start = start;             this.end = end;         }          //重写 RecursiveTask 类的 compute()方法,计算数列的结果         @Override         protected Long compute() {             long sum = 0; //保存计算的结果              //判断任务是否需要继续分解,如果当前数列 end 与 start 范围的数超过阈值THRESHOLD,就需要继续分解             if (end - start < THRESHOLD) {                 //小于阈值可以直接计算                 for (long i = start; i <= end; i++) {                     sum += i;                 }              } else { //数列范围超过阈值,需要继续分解                 //约定每次分解成 100 个小任务,计算每个任务的计算量                 long step = (start + end) / TASKNUM;                 //start = 0 , end = 200000, step = 2000, 如果计算[0,200000]范围内数列的和, 把该范围的数列分解为 100 个小任务,每个任务计算 2000 个数即可                 //注意,如果任务划分的层次很深,即 THRESHOLD 阈值太小,每个任务的计算量很小,层次划分就会很深,可能出现两种情况:一是系统内的线程数量会越积越多,导致性能下降严重; 二是分解次数过多,方法调用过多可能会导致栈溢出                  //创建一个存储任务的集合                 ArrayList<CountTask> subTaskList = new ArrayList<>();                 long pos = start; //每个任务的起始位置                  for (int i = 0; i < TASKNUM; i++) {                     long lastOne = pos + step; //每个任务的结束位                     //调整最后一个任务的结束位置                     if (lastOne > end) {                         lastOne = end;                     }                     //创建子任务                     CountTask task = new CountTask(pos, lastOne);                     //把任务添加到集合中                     subTaskList.add(task);                     //调用 for()提交子任务                     task.fork();                     //调整下个任务的起始位置                     pos += step + 1;                 }                  //等待所有的子任务结束后,合并计算结果                 for (CountTask task : subTaskList) {                     sum += task.join(); //join()会一直等待子任务执行完毕返回执行结果                 }             }             return sum;         }     }       public static void main(String[] args) {         //创建 ForkJoinPool 线程池         ForkJoinPool forkJoinPool = new ForkJoinPool();         //创建一个大的任务         CountTask task = new CountTask(0L, 200000L);         //把大任务提交给线程池         ForkJoinTask<Long> result = forkJoinPool.submit(task);         try{             Long res = result.get(); //调用任务的 get()方法返回结果             System.out.println("计算数列结果为:" + res);         } catch (InterruptedException e) {             e.printStackTrace();         } catch (ExecutionException e) {             e.printStackTrace();         }         //验证         long s = 0L;         for (long i = 0; i <= 200000 ; i++) {             s += i;         }         System.out.println("检验结果为:"+s);     }    } 

结果:

Java多线程-12(线程管理中的线程池)

版权声明:玥玥 发表于 2021-08-13 0:28:31。
转载请注明:Java多线程-12(线程管理中的线程池) | 女黑客导航