JUC - 学习笔记

JUC

java.util.concurrent :concrrent包
java.util.concurrent.atomic :原子包
java.util.concurrent.;locks :锁lock包

lock 锁

package com.bin.concurrent;  import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;  class Resources {     private int num = 30;     private Lock lock = new ReentrantLock();//可重入锁      public void subtractNum() {         lock.lock();         try {             if (num > 0) {                 System.out.println(Thread.currentThread().getName() + "售卖前:" + (num--) + "售卖后:num=" + num);             }         } catch (Exception e) {             e.printStackTrace();         } finally {             lock.unlock();         }     } }  /**  * 1.高内聚低耦合的情况下: 线程  操作(对外暴露的调用方法)  资源类  * 2.判断 / 干活  / 通知  * 3.多线程交互中,必须要防止多线程的虚假唤醒,必须用while  不能用if  *  * @Author: 邱成兵  * @Date: Created in 19:30 2021/4/27  */ public class SaleTicket {     public static void main(String[] args) {         Resources resources = new Resources();         new Thread(() -> {             for (int i = 0; i < 40; i++) resources.subtractNum();         }, "A").start();         new Thread(() -> {             for (int i = 0; i < 40; i++) resources.subtractNum();         }, "B").start();         new Thread(() -> {             for (int i = 0; i < 40; i++) resources.subtractNum();         }, "C").start();          /*new Thread(new Runnable() {             @Override             public void run() {                 for (int i = 0; i < 40; i++) {                     resources.subtractNum();                 }             }         },"A").start();          new Thread(new Runnable() {             @Override             public void run() {                 for (int i = 0; i < 40; i++) {                     resources.subtractNum();                 }             }         },"B").start();          new Thread(new Runnable() {             @Override             public void run() {                 for (int i = 0; i < 40; i++) {                     resources.subtractNum();                 }             }         },"C").start();*/     } }  
package com.bin.concurrent;  import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;  class ShareResource {     private int number=1;//A:1 B:2 C:3     private Lock lock=new ReentrantLock();//可重入锁     private Condition condition1=lock.newCondition();     private Condition condition2=lock.newCondition();     private Condition condition3=lock.newCondition();     public void print5(){         lock.lock();         try {             //判断             if(number!=1){                 condition1.await();             }             //干活             for (int i = 1; i <=5 ; i++) {                 System.out.println(Thread.currentThread().getName()+"t"+i);             }             number=2;             //通知             condition2.signal();         }catch (Exception e){             e.printStackTrace();         }finally {             lock.unlock();         }     }      public void print10(){         lock.lock();         try {             //判断             if(number!=2){                 condition2.await();             }             //干活             for (int i = 1; i <=10 ; i++) {                 System.out.println(Thread.currentThread().getName()+"t"+i);             }             number=3;             //通知             condition3.signal();         }catch (Exception e){             e.printStackTrace();         }finally {             lock.unlock();         }     }      public void print15(){         lock.lock();         try {             //判断             if(number!=3){                 condition3.await();             }             //干活             for (int i = 1; i <=15 ; i++) {                 System.out.println(Thread.currentThread().getName()+"t"+i);             }             number=1;             //通知             condition1.signal();         }catch (Exception e){             e.printStackTrace();         }finally {             lock.unlock();         }     } }  /**  *  精准唤醒  * 1.高内聚低耦合的情况下: 线程  操作(对外暴露的调用方法)  资源类  * 2.判断 / 干活  / 通知  * 3.多线程交互中,必须要防止多线程的虚假唤醒,必须用while  不能用if  * 4.标志位  *  * @Author: 邱成兵  * @Date: Created in 11:03 2021/4/30  */ public class ThreadOrderAccess {     public static void main(String[] args) {         ShareResource shareResource=new ShareResource();         new Thread(() -> {             for (int i = 0; i < 10; i++) {                 shareResource.print5();             }         }, "A").start();         new Thread(() -> {             for (int i = 0; i < 10; i++) {                 shareResource.print10();             }         }, "B").start();         new Thread(() -> {             for (int i = 0; i < 10; i++) {                 shareResource.print15();             }         }, "C").start();     } }  
package com.bin.concurrent;  import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;  class AirConditioner{     private int num=0;     private Lock lock=new ReentrantLock();//可重入锁     //代替 synchronized 里面的  wait notify notifyall 方法   awati  signal signalAll     private Condition condition=lock.newCondition();      public void addNum() throws InterruptedException {         lock.lock();         try {             while (num!=0){ //                wait();                 condition.await();             }             num++;             System.out.println(Thread.currentThread().getName() +"生产" +num); //            notify();             condition.signalAll();         }catch (Exception e){             e.printStackTrace();         }finally {             lock.unlock();         }     }       public void subtractNum() throws InterruptedException {         lock.lock();         try {             //判断             while(num==0){ //                wait();                 condition.await();             }             //干活             num--;             System.out.println(Thread.currentThread().getName() +"消费"+num); //            notify();             //通知             condition.signalAll();         }catch (Exception e){             e.printStackTrace();         }finally {             lock.unlock();         }     }       /**      *  老版本      *      **/     /*public synchronized void addNum() throws InterruptedException {         while (num>0){             wait();         }         num++;         System.out.println(Thread.currentThread().getName() +"生产" +num);         notify();     }      public synchronized void subtractNum() throws InterruptedException {         while(num<=0){             wait();         }         num--;         System.out.println(Thread.currentThread().getName() +"消费"+num);         notify();     }*/ } /**  * 交替打印出消费数字  *   2.判断 / 干活  / 通知  *   3.多线程交互中,必须要防止多线程的虚假唤醒,必须用while  不能用if  *  * @Author: 邱成兵  * @Date: Created in 11:18 2021/4/28  */ public class ThreadWaitNotifyDemo {     public static void main(String[] args) {         AirConditioner airConditioner=new AirConditioner();         new Thread(() -> {             try {                 for (int i = 0; i < 10; i++) {                     airConditioner.addNum();                 }             } catch (InterruptedException e) {                 e.printStackTrace();             }         }, "A").start();          new Thread(() -> {             try {                 for (int i = 0; i < 10; i++) {                     airConditioner.subtractNum();                 }             } catch (InterruptedException e) {                 e.printStackTrace();             }         }, "B").start();          new Thread(() -> {             try {                 for (int i = 0; i < 10; i++) {                     airConditioner.addNum();                 }             } catch (InterruptedException e) {                 e.printStackTrace();             }         }, "C").start();          new Thread(() -> {             try {                 for (int i = 0; i < 10; i++) {                     airConditioner.subtractNum();                 }             } catch (InterruptedException e) {                 e.printStackTrace();             }         }, "D").start();     } }  

多线程8锁

同步方法,锁是当前this
静态同步方法,锁是当前对象的.class
同步方法块,锁是()里传的对象,结束或异常必须释放锁
普通方法,互不影响

线程异常

java.util.concurrentModificationException 并发修改异常

Arraylist 如何线程安全

  1. 用vector集合
  2. Collections.JUC - 学习笔记

    集合扩容是一半 map扩容是一倍2*n次方 每次加1
    并发经验:如果能确认map的大小直接给确定值,避免后续再扩容操作

    安全的集合和map

    package com.bin.concurrent;  import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet;  /**  * 线程安全  * 1.故障现象: java.util.ConCurrentModificationException  * 2.导致原因: 线程不安全导致的  * 3.解决方案: 线程安全的集合或map  集合工具类 collections  并发包:concurrent   * 4.优化建议: (同样的错误 不出现第二次)  *   * @Author: qcb  * @Date: Created in 11:47 2021/5/6  */ public class NotSafeDemo {     public static void mapNotSafe() {         // 线程安全的map         Map<Object, Object> collections = new ConcurrentHashMap<>();//Collections.synchronizedMap()//new HashMap<>();         for (int i = 0; i < 30; i++) {             new Thread(() -> {                 collections.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0, 8));                 System.out.println(collections);             }, String.valueOf(i)).start();         }     }      public static void setNotSafe() {         // 线程安全的set         Set<String> collections = new CopyOnWriteArraySet(); //Collections.synchronizedSet(new HashSet<>())//new HashSet<>();          for (int i = 0; i < 30; i++) {             new Thread(() -> {                 collections.add(UUID.randomUUID().toString().substring(0, 8));                 System.out.println(collections);             }, String.valueOf(i)).start();         }     }      public static void listNotSafe(){         // 线程安全的list        List<String> collections= new CopyOnWriteArrayList();//Collections.synchronizedList()//new Vector<>();//new ArrayList<>();         for (int i = 0; i <30 ; i++) {             new Thread(() -> {                 collections.add(UUID.randomUUID().toString().substring(0,8));                 System.out.println(collections);             }, String.valueOf(i)).start();         }     } }  

    多线程

    • 创建多线程的方式:
    1. 传统有2种,java5之后增加了2种 总共4种
    JUC - 学习笔记

    callable 细节

    package com.bin.concurrent;  import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask;  class MyThread implements Callable<Integer> {      @Override     public Integer call() throws Exception {         System.out.println("come in here");         return 1024;     } }  /**  * @Author: 邱成兵  * @Date: Created in 15:15 2021/5/6  *  获取多线程的方式  * 1.继承Thread  * 2.实现runnable  * 3.实现callable<>  * 4.线程池  */ public class CallableDemo {     public static void main(String[] args) throws ExecutionException, InterruptedException {         FutureTask futureTask = new FutureTask(new MyThread());         //实现原理   java 多态         new Thread(futureTask, "A").start();         //多次调用 走的是缓存  只会调用一次        //new Thread(futureTask, "B").start();          //获取结果一定要放到最后  不然会阻塞拿到结果在执行         System.out.println(futureTask.get());     }  }  

    辅助类

    JUC - 学习笔记

    CountDownLatch 辅助类-减少计数

    作用: 是一种通用的同步工具 等其他线程执行完了 主线程再关闭 关门走人
    原理:

    • CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。 * 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞), * 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
    package com.bin.concurrent;  import java.util.concurrent.CountDownLatch;  /**  *  JUC辅助类  * @Author: 邱成兵  * @Date: Created in 16:09 2021/5/6  */ public class CountDownLatchDemo {     public static void main(String[] args) throws InterruptedException {         //减少计数  开始初始 第一个是启动信号,防止任何工作人员进入,直到驾驶员准备好继续前进;         CountDownLatch countDownLatch=new CountDownLatch(6);         for (int i = 0; i < 6; i++) {             new Thread(() -> {                 System.out.println("第"+Thread.currentThread().getName()+"个同学出教室 /t");                 countDownLatch.countDown();             }, String.valueOf(i)).start();         }         //第二个是完成信号,允许司机等到所有的工作人员完成。         countDownLatch.await();         System.out.println("关门");     } }  

    CyclicBarrier 辅助类-循环栅栏 到达共同屏障点的同步辅助

    允许一组线程全部等待彼此达到共同屏障点的同步辅助 集齐7龙珠
    原理:

    • CyclicBarrier * 的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是, * 让一组线程到达一个屏障(也可以叫同步点)时被阻塞, * 直到最后一个线程到达屏障时,屏障才会开门,所有 * 被屏障拦截的线程才会继续干活。 * 线程进入屏障通过CyclicBarrier的await()方法。
    package com.bin.concurrent;  import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier;  /**  *   允许一组线程全部等待彼此达到共同屏障点的同步辅助  *  * @Author: 邱成兵  * @Date: Created in 16:53 2021/5/6  */ public class CyclicBarrierDemo {     public static void main(String[] args) {         //参数1:多少个线程跳闸   参数2:跳闸之后执行的线程         CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{             System.out.println("集齐7颗龙珠");         });         for (int i = 0; i < 7; i++) {             final int temInt=i;             new Thread(() -> {                 System.out.println(Thread.currentThread().getName()+" /t 第" +temInt+"颗龙珠");                 try {                     cyclicBarrier.await();                 } catch (InterruptedException e) {                     e.printStackTrace();                 } catch (BrokenBarrierException e) {                     e.printStackTrace();                 }             }, String.valueOf(i)).start();         }      } }  

    semaphore -辅助类-信号灯

    可做秒杀线程数量控制 不管前端请求多少个 只接受固定能接受的数量
    主要用于并发的控制可资源的互斥 限流
    如果设置semaphore 为1 等同于 synchronized (场景 一个线程持有一个资源多久 可以用这个实现)
    原理:
    在信号量上我们定义两种操作: * acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1), * 要么一直等下去,直到有线程释放信号量,或超时。 * release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。 * * 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。

    package com.bin.concurrent;  import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit;  /**  *  信号等  *  用户并发的控制和资源的互斥   限流  * @Author: 邱成兵  * @Date: Created in 17:15 2021/5/6  */ public class SemaphoreDemo {     public static void main(String[] args) {         //模拟3个车位         Semaphore semaphore=new Semaphore(3);         for (int i = 0; i < 6; i++) {             new Thread(() -> {                 //资源减一                 try {                     semaphore.acquire();                     System.out.println(Thread.currentThread().getName()+"/t 抢到了车位");                     try {                         TimeUnit.SECONDS.sleep(3);                     }catch (Exception e){                      }                     System.out.println(Thread.currentThread().getName()+"/t 退出停车位");                 } catch (InterruptedException e) {                     e.printStackTrace();                 }finally {                     //释放资源                     semaphore.release();                 }             }, String.valueOf(i)).start();         }     } }  

    ReadWriteLock 读写锁 (场景:缓存的读写)

    读读能共存 读写不能共存 写写不能共存

    package com.bin.concurrent;  import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;  class MyCache{     private volatile Map<String,Object> map=new HashMap<>();     //读写锁     private ReadWriteLock lock=new ReentrantReadWriteLock();      public void put(String key,Object value){         try {             lock.writeLock().lock();             System.out.println(key+" /t 开始写入数据");             TimeUnit.MILLISECONDS.sleep(3);             map.put(key,value);             System.out.println(key+" /t 写入完成"+value);         }catch (Exception e){             e.printStackTrace();         }finally {             lock.writeLock().unlock();         }     }      public void get(String key) {         try {             lock.readLock().lock();             System.out.println(key+" /t 开始读数据");             TimeUnit.MILLISECONDS.sleep(3);             Object o = map.get(key);             System.out.println(key+" /t 读取读数据"+o);         }catch (Exception e){             e.printStackTrace();         }finally {             lock.readLock().unlock();         }      } }  /**  *  读写锁  * @Author: 邱成兵  * @Date: Created in 10:32 2021/5/7  */ public class ReadWriteLockDemo {     public static void main(String[] args) {         MyCache myCache=new MyCache();         for (int i = 0; i < 5; i++) {             myCache.put(i+"",i+"");         }         for (int i = 0; i < 5; i++) {             myCache.get(i+"");         }     } }  

    BlockingQueue - 阻塞队列

    不用手动阻塞、唤醒线程了 (wait/await notify/signal notifyall/signalAll) BlockingQueue全部自动实现了

    JUC - 学习笔记
    JUC - 学习笔记

    核心方法

    JUC - 学习笔记
    package com.bin.concurrent;   import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit;  /**  *  阻塞队列  * @Author: 邱成兵  * @Date: Created in 14:35 2021/5/7  */ public class BlockingQueueDemo {     public static void main(String[] args) throws InterruptedException {         BlockingQueue<String> blockingQueue=new ArrayBlockingQueue<>(3);         blockingQueue.add("a");//添加  超过界限会报异常  queue full :队列已满         blockingQueue.remove();//移除先进的第一位  没有会报异常         blockingQueue.element();//检查 没有会报异常          blockingQueue.offer("a"); //超出返回false         blockingQueue.poll(); //没有移除的返回 null         blockingQueue.peek();//检查 没有返回null          blockingQueue.put("a");//添加  超出会阻塞         blockingQueue.take();//移除  没有会阻塞          blockingQueue.offer("a",3l, TimeUnit.SECONDS);//超出多少时间后直接退出         blockingQueue.poll(3l, TimeUnit.SECONDS);//超出多少时间后直接退出     } }  

    线程池

    例子:10年前单核CPU电脑,假的多线程,像马戏团小丑玩多个球,CPU需要来回切换。现在是多核电脑,多个线程各自跑在独立的CPU上,不用切换效率高。

    线程池的优势:线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

    它的主要特点为:线程复用;控制最大并发数;管理线程。

    第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
    第二:提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。
    第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

    JUC - 学习笔记
    package com.bin.concurrent;  import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;  /**  *  线程池三大主流使用方法  * @Author: 邱成兵  * @Date: Created in 15:59 2021/5/7  */ public class ThreadDemo {     public static void main(String[] args) {         //执行长期任务性能好,创建一个线程池,一池有N个固定的线程,有固定线程数的线程 //        ExecutorService executorService= Executors.newFixedThreadPool(5);         //一个任务一个任务的执行,一池一线程 //        ExecutorService executorService= Executors.newSingleThreadExecutor();         //执行很多短期异步任务,线程池根据需要创建新线程,但在先前构建的线程可用时将重用它们。可扩容,遇强则强         ExecutorService executorService= Executors.newCachedThreadPool();         try {             for (int i = 1; i <= 10; i++) {                 final int teamInt=i;                 executorService.execute(()->{                     System.out.println(Thread.currentThread().getName()+" /t 办理业务"+teamInt);                 });             }         }catch (Exception e){             e.printStackTrace();         }finally {             executorService.shutdown();         }     } }  

    7大参数详解

    1、corePoolSize:线程池中的常驻核心线程数

    2、maximumPoolSize:线程池中能够容纳同时
    执行的最大线程数,此值必须大于等于1

    3、keepAliveTime:多余的空闲线程的存活时间
    当前池中线程数量超过corePoolSize时,当空闲时间
    达到keepAliveTime时,多余线程会被销毁直到
    只剩下corePoolSize个线程为止

    4、unit:keepAliveTime的单位

    5、workQueue:任务队列,被提交但尚未被执行的任务

    6、threadFactory:表示生成线程池中工作线程的线程工厂,
    用于创建线程,一般默认的即可

    7、handler:拒绝策略,表示当队列满了,并且工作线程大于
    等于线程池的最大线程数(maximumPoolSize)时如何来拒绝
    请求执行的runnable的策略

    JUC - 学习笔记

    线程池底层工作原理

    JUC - 学习笔记
    JUC - 学习笔记

    工作底层原理步骤

    1. 提交任务
    2. 判断核心线程是否满 满:进入队列
    3. 判断阻塞队列是否满 满:创建主线程 (一个一个的扩)
    4. 判断主线程是否满 满:拒绝策略

    1、在创建了线程池后,开始等待请求。
    2、当调用execute()方法添加一个请求任务时,线程池会做出如下判断:
    2.1、如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;
    2.2、如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
    2.3、如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
    2.4、如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
    3、当一个线程完成任务时,它会从队列中取下一个任务来执行。
    4、当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断: 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。

    工作中常用创建的线程池方法

    在工作中单一的/固定数的/可变的三种创建线程池的方法哪个用的多?
    一个都不用,我们工作中只能使用自定义的
    JUC - 学习笔记
    OOM(OutOfMemoryError? -JAVA内存溢出 虚拟机暴露故障

    ExecutorService executorService=new ThreadPoolExecutor(2,//核心                 5,//主线程                 2L,//过期时间                 TimeUnit.SECONDS,//过期单位                 new LinkedBlockingQueue<>(3),//队列类型 和 队列的大小                 Executors.defaultThreadFactory(), //线程工厂  一般都是用默认的                 new ThreadPoolExecutor.AbortPolicy());//拒绝策略 

    4大拒绝策略

    1. AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
    2. CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
    3. DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中尝试再次提交当前任务。
    4. DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。

    最大线程数该如何设置?

    public static void main(String[] args) {         //cpu的数量         int cpuNum = Runtime.getRuntime().availableProcessors();         //cpu密集型   内核数+1或+2         //IO密集型    1/总核数/阻塞系数          ExecutorService executorService=new ThreadPoolExecutor(2,//核心                 5,//主线程                 2L,//过期时间                 TimeUnit.SECONDS,//过期单位                 new LinkedBlockingQueue<>(3),//队列类型 和 队列的大小                 Executors.defaultThreadFactory(), //线程工厂  一般都是用默认的                 new ThreadPoolExecutor.AbortPolicy());//拒绝策略  默认抛出异常     } 

    一:CPU密集型:

    定义:CPU密集型也是指计算密集型,大部分时间用来做计算逻辑判断等CPU动作的程序称为CPU密集型任务。该类型的任务需要进行大量的计算,主要消耗CPU资源。
    这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。(或者加1到2)

    特点:

    01:CPU 使用率较高(也就是经常计算一些复杂的运算,逻辑处理等情况)非常多的情况下使用

    02:针对单台机器,最大线程数一般只需要设置为CPU核心数的线程个数就可以了

    03:这一类型多出现在开发中的一些业务复杂计算和逻辑处理过程中。

    package pool;  import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;  public class Demo02 {     public static void main(String[] args) {         //自定义线程池! 工作中只会使用 ThreadPoolExecutor          /**          * 最大线程该如何定义(线程池的最大的大小如何设置!)          * 1、CPU  密集型,几核,就是几,可以保持CPU的效率最高!          */          //获取电脑CPU核数         System.out.println(Runtime.getRuntime().availableProcessors());    //8核          ThreadPoolExecutor threadPool = new ThreadPoolExecutor(                 2,                                        //核心线程池大小                 Runtime.getRuntime().availableProcessors(),   //最大核心线程池大小(CPU密集型,根据CPU核数设置)                 3,                                       //超时了没有人调用就会释放                 TimeUnit.SECONDS,                             //超时单位                 new LinkedBlockingDeque<>(3),                 //阻塞队列                 Executors.defaultThreadFactory(),             //线程工厂,创建线程的,一般不用动                 new ThreadPoolExecutor.AbortPolicy());        //银行满了,还有人进来,不处理这个人的,抛出异常          try {             //最大承载数,Deque + Max    (队列线程数+最大线程数)             //超出 抛出 RejectedExecutionException 异常             for (int i = 1; i <= 9; i++) {                 //使用了线程池之后,使用线程池来创建线程                 threadPool.execute(()->{                     System.out.println(Thread.currentThread().getName()+" ok");                 });             }         } catch (Exception e) {             e.printStackTrace();         } finally {             //线程池用完,程序结束,关闭线程池             threadPool.shutdown();      //(为确保关闭,将关闭方法放入到finally中)         }     } } 

    二:IO密集型:

    定义:IO密集型任务指任务需要执行大量的IO操作,涉及到网络、磁盘IO操作,对CPU消耗较少,其消耗的主要资源为IO。

    我们所接触到的 IO ,大致可以分成两种:磁盘 IO和网络 IO。

    01:磁盘 IO ,大多都是一些针对磁盘的读写操作,最常见的就是文件的读写,假如你的数据库、 Redis 也是在本地的话,那么这个也属于磁盘 IO。

    02:网络 IO ,这个应该是大家更加熟悉的,我们会遇到各种网络请求,比如 http 请求、远程数据库读写、远程 Redis 读写等等。

    IO 操作的特点就是需要等待,我们请求一些数据,由对方将数据写入缓冲区,在这段时间中,需要读取数据的线程根本无事可做,因此可以把 CPU
    时间片让出去,直到缓冲区写满。

    既然这样,IO 密集型任务其实就有很大的优化空间了(毕竟存在等待):

    CPU 使用率较低,程序中会存在大量的 I/O 操作占用时间,导致线程空余时间很多,所以通常就需要开CPU核心数两倍的线程。当线程进行 I/O 操作 CPU
    空闲时,线程等待时间所占比例越高,就需要越多线程,启用其他线程继续使用 CPU,以此提高 CPU 的使用率;线程 CPU
    时间所占比例越高,需要越少的线程,这一类型在开发中主要出现在一些计算业务频繁的逻辑中

    package pool;  import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;  public class Demo02 {     public static void main(String[] args) {         //自定义线程池! 工作中只会使用 ThreadPoolExecutor          /**          * 最大线程该如何定义(线程池的最大的大小如何设置!)          * 2、IO   密集型  >判断你程序中十分耗IO的线程          *      程序    15个大型任务   io十分占用资源!  (最大线程数设置为30)          *      设置最大线程数为十分耗io资源线程个数的2倍          */          //获取电脑CPU核数         System.out.println(Runtime.getRuntime().availableProcessors());   //8核          ThreadPoolExecutor threadPool = new ThreadPoolExecutor(                 2,                               //核心线程池大小                 16,                     //若一个IO密集型程序有15个大型任务且其io十分占用资源!(最大线程数设置为 2*CPU 数目)                 3,                                //超时了没有人调用就会释放                 TimeUnit.SECONDS,                 //超时单位                 new LinkedBlockingDeque<>(3),     //阻塞队列                 Executors.defaultThreadFactory(),               //线程工厂,创建线程的,一般不用动                 new ThreadPoolExecutor.DiscardOldestPolicy());  //队列满了,尝试和最早的竞争,也不会抛出异常          try {             //最大承载数,Deque + Max    (队列线程数+最大线程数)             //超出 抛出 RejectedExecutionException 异常             for (int i = 1; i <= 9; i++) {                 //使用了线程池之后,使用线程池来创建线程                 threadPool.execute(()->{                     System.out.println(Thread.currentThread().getName()+" ok");                 });             }         } catch (Exception e) {             e.printStackTrace();         } finally {             //线程池用完,程序结束,关闭线程池             threadPool.shutdown();      //(为确保关闭,将关闭方法放入到finally中)         }     } }   

    总结:

    1:高并发、任务执行时间短的业务,线程池线程数可以设置为CPU核数+1,减少线程上下文的切换

    2:并发不高、任务执行时间长的业务这就需要区分开看了:

    a)假如是业务时间长集中在IO操作上,也就是IO密集型的任务,因为IO操作并不占用CPU,所以不要让所有的CPU闲下来,可以适当加大线程池中的线程数目,让CPU处理更多的业务

    b)假如是业务时间长集中在计算操作上,也就是计算密集型任务,这个就没办法了,线程池中的线程数设置得少一些,减少线程上下文的切换

    (其实从一二可以看出无论并发高不高,对于业务中是否是cpu密集还是I/O密集的判断都是需要的当前前提是你需要优化性能的前提下)

    3:并发高、业务执行时间长,解决这种类型任务的关键不在于线程池而在于整体架构的设计,看看这些业务里面某些数据是否能做缓存是第一步,我们的项目使用的时redis作为缓存(这类非关系型数据库还是挺好的)。增加服务器是第二步(一般政府项目的首先,因为不用对项目技术做大改动,求一个稳,但前提是资金充足),至于线程池的设置,设置参考
    2
    。最后,业务执行时间长的问题,也可能需要分析一下,看看能不能使用中间件(任务时间过长的可以考虑拆分逻辑放入队列等操作)对任务进行拆分和解耦。

    三.:总结:

    01:一个计算为主的程序(CPU密集型程序),多线程跑的时候,可以充分利用起所有的 CPU 核心数,比如说 8 个核心的CPU ,开8
    个线程的时候,可以同时跑 8 个线程的运算任务,此时是最大效率。但是如果线程远远超出 CPU
    核心数量,反而会使得任务效率下降,因为频繁的切换线程也是要消耗时间的。因此对于 CPU 密集型的任务来说,线程数等于 CPU 数是最好的了。

    02:如果是一个磁盘或网络为主的程序(IO密集型程序),一个线程处在 IO 等待的时候,另一个线程还可以在 CPU 里面跑,有时候
    CPU 闲着没事干,所有的线程都在等着 IO,这时候他们就是同时的了,而单线程的话此时还是在一个一个等待的。我们都知道 IO 的速度比起
    CPU 来是很慢的。此时线程数等于CPU核心数的两倍是最佳的。

    最大线程数量 引用地址

    java.util.function 函数式接口

    4大函数式接口

    JUC - 学习笔记
    1. Function<T,R> 函数型接口 特点:传入T 有一个返回R
    /*Function<String,Integer> function=new Function<String, Integer>() {             @Override             public Integer apply(String s) {                 return 1024;             }         };*/         //新写法 lambda 表达式写法 //        Function<String,Integer> function=(s) -> {return 1024;}; 		//一个参数可以省略()  返回可以省略{return }         Function<String,Integer> function=s -> 1024;         System.out.println(function.apply("a")); 
    1. Predicate 断定型接口 特点:返回boolean
    /*Predicate<String> predicate=new Predicate<String>() {             @Override             public boolean test(String s) {                 return false;             }         };*/         //lambda 表达式 //        Predicate<String> predicate=s -> s.isEmpty(); //lambada 表达式+方法的引用         Predicate<String> predicate=String::isEmpty;         System.out.println(predicate.test("qcb")); 
    1. Consumer 消费型接口 特点:没有返回值 有参数
    /*Consumer<String> consumer=new Consumer<String>() {             @Override             public void accept(String s) {                 System.out.println(s);             }         };*/         //lambda 表达式 //        Consumer consumer= s -> System.out.println(s);         Consumer consumer= System.out::print;         consumer.accept("a"); 
    1. Supplier 宫给型接口 特点:无参数 有返回值
    /*Supplier<String> supplier=new Supplier<String>() {             @Override             public String get() {                 return "aa";             }         };*/         //lambda 表达式 //        Supplier<String> supplier=() -> {return "aa";};         Supplier<String> supplier=() -> "aa";         System.out.println(supplier.get()); 

    java.util.stream

    流(Stream) 到底是什么呢?
    是数据渠道,用于操作数据源(集合、数组等)所生成的元素序列。“集合讲的是数据,流讲的是计算!”

    特点:
    1. Stream 自己不会存储元素
    2. Stream 不会改变源对象。相反,他们会返回一个持有结果的新Stream。
    3. Stream 操作是延迟执行的。这意味着他们会等到需要结果的时候才执行。
    阶段:
    1. 创建一个Stream:一个数据源(数组、集合)
    2. 中间操作:一个中间操作,处理数据源数据
    3. 终止操作:一个终止操作,执行中间操作链,产生结果
    //题目:请按照给出数据,找出同时满足 * //偶数ID且年龄大于24且用户名转为大写且用户名字母倒排序 * // 最后只输出一个用户名字 User u1 = new User(11, "a", 23);         User u2 = new User(12, "b", 24);         User u3 = new User(13, "c", 22);         User u4 = new User(14, "d", 28);         User u5 = new User(16, "e", 26);         List<User> list = Arrays.asList(u1, u2, u3, u4, u5);         list.stream().filter(user -> {             return user.getId() % 2 == 0 && user.getAge() > 24;  //id偶数 并且 大于24的         })./*filter(user -> {             return user.getAge() > 24;             //Function<T,R> mapper)         }).*/map(user -> {             return user.getUserName().toUpperCase(); //把名称转为大写         }).sorted(/*(user1, user2) -> {             return user2.compareTo(user1);      //倒叙         }*/Collections.reverseOrder()).limit(1).forEach(System.out::println);//只查一条 

    ForkJoinPool 分支合并框架

    原理:Fork:把一个复杂任务进行分拆,大事化小Join:把分拆任务的结果进行合并

    ForkJoinPool:分支合并池 类比=> 线程池

    ForkJoinTask:ForkJoinTask 类比=> FutureTask

    RecursiveTask:递归任务:继承后可以实现递归(自己调自己)调用的任务

    @Override     protected Integer compute() {         if((end-begin)<=ADJUST_VALUE){             for (int i = begin; i <= end; i++) {                 result=result+i;             }         }else {             int middle = (end + begin) / 2;             MyTask myTask1=new MyTask(begin,middle); //分支1             MyTask myTask2=new MyTask(middle+1,end);//分支2             myTask1.fork();//开启分支1             myTask2.fork();//开启分支2             result=myTask1.join()+myTask2.join();//合并         }         return result;     } 
    JUC - 学习笔记
    package com.bin.concurrent;  import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask;  //RecursiveTask 继承ForkJoinTask 继承 Future 可以调用callable接口 返回计算结果 class MyTask extends RecursiveTask<Integer> {     private static final Integer ADJUST_VALUE=10;     private int begin;     private int end;     private int result;      public MyTask(Integer begin, Integer end) {         this.begin = begin;         this.end = end;     }      @Override     protected Integer compute() {         if((end-begin)<=ADJUST_VALUE){             for (int i = begin; i <= end; i++) {                 result=result+i;             }         }else {             int middle = (end + begin) / 2;             MyTask myTask1=new MyTask(begin,middle);             MyTask myTask2=new MyTask(middle+1,end);             myTask1.fork();             myTask2.fork();             result=myTask1.join()+myTask2.join();         }         return result;     } }   /**  *  分支合并框架  * @Author: 邱成兵  * @Date: Created in 18:04 2021/5/8  */ public class ForkJoinPollDemo {     public static void main(String[] args) throws ExecutionException, InterruptedException {         MyTask myTask=new MyTask(0,100);         ForkJoinPool forkJoinPool=new ForkJoinPool();         ForkJoinTask<Integer> submit = forkJoinPool.submit(myTask);         System.out.println(submit.get());         forkJoinPool.shutdown();     } }  

    CompletableFutrue - 异步调用

    package com.bin.concurrent;  import java.util.concurrent.CompletableFuture;  /**  * 异步调用  *  * @Author: 邱成兵  * @Date: Created in 14:37 2021/5/11  */ public class CompletableFutureDemo {     public static void main(String[] args) throws Exception {         /*CompletableFuture<Void> completableFuture= CompletableFuture.runAsync(new Runnable() {             @Override             public void run() {                 System.out.println("a");             }         });*/         //执行异步调用没有返回参数的         CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {             System.out.println("没有返回值cccccc");         });         completableFuture.get();          /*CompletableFuture<Integer> completableFuture1=CompletableFuture.supplyAsync(new Supplier<Integer>() {             @Override             public Integer get() {                 return 1024;             }         });*/         //执行异步调用有返回参数的         CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { //            int a=1024/0;             return 1024;         });         System.out.println(completableFuture1.whenComplete((t, d) -> {             System.out.println("t:" + t);             System.out.println("d:" + d);         }).exceptionally(f -> {             System.out.println(f.getMessage());             return 4444;         }).get());     } }  

    原理:https://blog.

版权声明:玥玥 发表于 2021-05-14 23:01:38。
转载请注明:JUC - 学习笔记 | 女黑客导航