FutureTask 源码分析

JDK源码学习

深入分析下 java.util.concurrent 包下 FutureTask 类

简单画了个UML图,可以看到FutureTask, CompletableFuture 都有实现 Future接口类

FutureTask 源码分析

FutureTask类

先来看Future的实现类 --> FutureTask间接实现Runnable,Future, 可作为一个任务被执行,也能获取计算结果

有些场景需要异步执行任务, 或子线程并行执行任务,此时就可用FutureTask来实现,可以阻塞获取返回值,也可轮询获取返回值

下面通过例子来分析源码:

public class FutureTest {     public static class Task implements Callable<String> {         @Override         public String call() {             String tid = String.valueOf(Thread.currentThread().getId());             System.out.printf("Thread#%s : in calln", tid);             return tid;         }     }      public static void main(String[] args) throws InterruptedException, ExecutionException              {         //新建一个任务放入线程池,获取其执行完的返回值         ExecutorService es = Executors.newFixedThreadPool(3);         Future future = es.submit(new Task());         System.out.println(future.get());     } }

向线程池提交一个有返回值任务, 返回FutureTask实例

抽象类AbstractExecutorService:     public <T> Future<T> submit(Callable<T> task) {         if (task == null) throw new NullPointerException();         RunnableFuture<T> ftask = newTaskFor(task);         execute(ftask);         return ftask;     }  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {         return new FutureTask<T>(callable);     }  FutureTask类: private Callable<V> callable; private volatile int state; public FutureTask(Callable<V> callable) {         if (callable == null)             throw new NullPointerException();         this.callable = callable;         this.state = NEW;       // ensure visibility of callable     } 

分析execute(ftask),此方法将任务放入线程池,在未来某个时间点执行任务

类ThreadPoolExecutor:  //运行状态存储在高三位中 private static final int RUNNING = -1 << COUNT_BITS; //ctl原子整数,存储着有效线程数和线程池状态,ctlOf方法通过或运算符计算, 初始化时工作线程数为0 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); public void execute(Runnable command) {         if (command == null)             throw new NullPointerException();         }         int c = ctl.get();         //计算工作线程数是否小于核心线程数         if (workerCountOf(c) < corePoolSize) {             //创建核心线程执行任务             if (addWorker(command, true))                 return;             c = ctl.get();         }         //判断线程池是否是运行状态,并向工作队列添加一个任务         if (isRunning(c) && workQueue.offer(command)) {             int recheck = ctl.get();             //线程池不处于运行状态,移除任务             if (! isRunning(recheck) && remove(command))                 reject(command);             else if (workerCountOf(recheck) == 0)                 //创建新线程                 addWorker(null, false);         }         else if (!addWorker(command, false))             //拒绝策略             reject(command);     }

继续分析addWorker方法,此方法创建工作线程执行任务

类ThreadPoolExecutor: private boolean addWorker(Runnable firstTask, boolean core) {         //标记循环的位置         retry:         for (;;) {             //返回池控制状态             int c = ctl.get();             //获取线程池状态             int rs = runStateOf(c);              // Check if queue empty only if necessary.             if (rs >= SHUTDOWN &&                 ! (rs == SHUTDOWN &&                    firstTask == null &&                    ! workQueue.isEmpty()))                 return false;              for (;;) {                 //计算工作线程数                 int wc = workerCountOf(c);                 if (wc >= CAPACITY ||                     wc >= (core ? corePoolSize : maximumPoolSize))                     return false;                 //cas操作递增工作线程数,当前数+1                 if (compareAndIncrementWorkerCount(c))                     //跳出循环标记位                     break retry;                 c = ctl.get();  // Re-read ctl                 if (runStateOf(c) != rs)                     continue retry;                 // else CAS failed due to workerCount change; retry inner loop             }         }          boolean workerStarted = false;         boolean workerAdded = false;         Worker w = null;         try {             //创建worker实例             w = new Worker(firstTask);             final Thread t = w.thread;             if (t != null) {                 final ReentrantLock mainLock = this.mainLock;                 mainLock.lock();                 try {                     // Recheck while holding lock.                     // Back out on ThreadFactory failure or if                     // shut down before lock acquired.                     int rs = runStateOf(ctl.get());                      if (rs < SHUTDOWN ||                         (rs == SHUTDOWN && firstTask == null)) {                         if (t.isAlive()) // precheck that t is startable                             throw new IllegalThreadStateException();                         //放入工作线程池集合                         workers.add(w);                         int s = workers.size();                         if (s > largestPoolSize)                             largestPoolSize = s;                         workerAdded = true;                     }                 } finally {                     mainLock.unlock();                 }                 if (workerAdded) {                     t.start();                     workerStarted = true;                 }             }         } finally {             //启动失败,移除worker, cas操作递减工作线程数,当前数-1             if (! workerStarted)                 addWorkerFailed(w);         }         return workerStarted;     }   private void addWorkerFailed(Worker w) {         final ReentrantLock mainLock = this.mainLock;         mainLock.lock();         try {             if (w != null)                 workers.remove(w);             decrementWorkerCount();             //尝试终止线程池             tryTerminate();         } finally {             mainLock.unlock();         }     }

通过上面的分析,可以看到任务放入了Worker对象中,然后启动了Worker里面的线程,那这个线程做了什么呢?
这个线程先执行创建Worker对象时提交的任务,然后取工作队列里的任务执行

类ThreadPoolExecutor:          final void runWorker(Worker w) {         Thread wt = Thread.currentThread();         //创建Worker对象时提交的任务         Runnable task = w.firstTask;         w.firstTask = null;         w.unlock(); // allow interrupts         boolean completedAbruptly = true;         try {             //getTask() 从任务池中拿一个任务             while (task != null || (task = getTask()) != null) {                 w.lock();                 // If pool is stopping, ensur thread is interrupted;                 // if not, ensure thread is not interrupted.  This                 // requires a recheck in second case to deal with                 // shutdownNow race while clearing interrupt                 if ((runStateAtLeast(ctl.get(), STOP) ||                      (Thread.interrupted() &&                       runStateAtLeast(ctl.get(), STOP))) &&                     !wt.isInterrupted())                     wt.interrupt();                 try {                     //子类可重写,前置增强                     beforeExecute(wt, task);                     Throwable thrown = null;                     try {                         //运行任务                         task.run();                     } catch (RuntimeException x) {                         thrown = x; throw x;                     } catch (Error x) {                         thrown = x; throw x;                     } catch (Throwable x) {                         thrown = x; throw new Error(x);                     } finally {                         afterExecute(task, thrown);                     }                 } finally {                     task = null;                     w.completedTasks++;                     w.unlock();                 }             }             completedAbruptly = false;         } finally {             processWorkerExit(w, completedAbruptly);         }     }      //从队列中取任务     private Runnable getTask() {         boolean timedOut = false; // Did the last poll() time out?          for (;;) {             int c = ctl.get();             int rs = runStateOf(c);              // Check if queue empty only if necessary.             if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {                 decrementWorkerCount();                 return null;             }              int wc = workerCountOf(c);              // 判断超时标志位             // 核心线程不允许超时,allowCoreThreadTimeOut 默认false             // wc > corePoolSize 判断当前线程数是否大于核心线程数             boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;              if ((wc > maximumPoolSize || (timed && timedOut))                 && (wc > 1 || workQueue.isEmpty())) {                 //当前线程数-1, 线程退出                 if (compareAndDecrementWorkerCount(c))                     return null;                 continue;             }              try {                 //timed = true, 等待keepAliveTime 纳秒取任务                 //timed = false, 阻塞直到有可用任务                 Runnable r = timed ?                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                     workQueue.take();                 if (r != null)                     return r;                 //已超时,作为上面的if条件,用于判断此线程是否退出                 timedOut = true;             } catch (InterruptedException retry) {                 timedOut = false;             }         }     }

由于在 ThreadPoolExecutor 提交的是一个FutureTask 任务实现类,所以runWorker 里是调用 FutureTask.run() 方法来执行

类FutureTask:  public void run() {         if (state != NEW ||             !UNSAFE.compareAndSwapObject(this, runnerOffset,                                          null, Thread.currentThread()))             return;         try {             Callable<V> c = callable;             if (c != null && state == NEW) {                 V result;                 boolean ran;                 try {                     //执行call函数,也就是执行测试例子中的Task.call()方法                     result = c.call();                     ran = true;                 } catch (Throwable ex) {                     result = null;                     ran = false;                     setException(ex);                 }                 if (ran)                     //设置执行结果                     set(result);             }         } finally {             runner = null;             int s = state;             if (s >= INTERRUPTING)                 handlePossibleCancellationInterrupt(s);         }     }      protected void set(V v) {         //cas操作变更此任务状态为 COMPLETING         if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {             //任务结果赋值给outcome, 后续future.get获取的就是outcome值             outcome = v;             //设置最终状态为 NORMAL             UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state             //移除所有等待的线程并发出信号唤醒             finishCompletion();         }     }

上段代码中 set() 方法设置此任务最终的状态为 NORMAL, 表示此任务已正常执行完成。
接着 finishCompletion() 方法 遍历所有等待的节点,并发送信号唤醒线程

private void finishCompletion() {         // assert state > COMPLETING;         for (WaitNode q; (q = waiters) != null;) {             if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {                 for (;;) {                     Thread t = q.thread;                     if (t != null) {                         q.thread = null;                         //唤醒等待的线程,此处唤醒awaitDone()方法中LockSupport.park阻塞的线程                         LockSupport.unpark(t);                     }                     WaitNode next = q.next;                     if (next == null)                         break;                     q.next = null; // unlink to help gc                     q = next;                 }                 break;             }         }         done();         callable = null;        // to reduce footprint     }

最终我们来看future.get() 是怎么执行的

类FutureTask:  public V get() throws InterruptedException, ExecutionException {         int s = state;         if (s <= COMPLETING)             //任务未执行完成,进行阻塞             s = awaitDone(false, 0L);         return report(s);     }  private int awaitDone(boolean timed, long nanos)         throws InterruptedException {         final long deadline = timed ? System.nanoTime() + nanos : 0L;         WaitNode q = null;         boolean queued = false;         for (;;) {             //线程中断,则从链表中移除节点,并抛出中断异常             if (Thread.interrupted()) {                 removeWaiter(q);                 throw new InterruptedException();             }              int s = state;             //任务完成、取消、异常等,则返回当前任务状态             if (s > COMPLETING) {                 if (q != null)                     q.thread = null;                 return s;             }             else if (s == COMPLETING) // cannot time out yet                 //让出cpu,使当前线程从运行状态变成就绪状态,和其它线程一同竞争cpu                 Thread.yield();             else if (q == null)                 //创建等待节点                 q = new WaitNode();             else if (!queued)                 //cas操作更新waiters的头节点为q, 完成入队                 queued = UNSAFE.compareAndSwapObject(this, waitersOffset,                                                      q.next = waiters, q);             else if (timed) {                 nanos = deadline - System.nanoTime();                 if (nanos <= 0L) {                     //到了设置的超时时间,则移除等待队列                     removeWaiter(q);                     return state;                 }                 //阻塞nacos纳秒数                 LockSupport.parkNanos(this, nanos);             }             else                 //阻塞,等待唤醒                 LockSupport.park(this);         }     }  private V report(int s) throws ExecutionException {         //此处x的值就是前面set方法赋值的outcome值         Object x = outcome;         if (s == NORMAL)             return (V)x;         if (s >= CANCELLED)             throw new CancellationException();         throw new ExecutionException((Throwable)x);     }

总结:

到这里Future类分析完了,基本通过这个例子可看到整个的执行流程,比如创建了FutureTask对像、创建了线程执行任务、控制状态的存储、怎么完成的回调等等。

 

 

 

 

 

版权声明:玥玥 发表于 2021-05-14 13:32:01。
转载请注明:FutureTask 源码分析 | 女黑客导航