public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * 1.如果工作线程小于核心线程池数量,尝试新建一个工作线程执行任务addWorker。 addWorker将会自动检查线程池状态和工作线程数,以防在添加工作线程的过程中, 线程池被关闭。 * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * 2.如果创建工作线程执行任务失败,则任务入队列,如果入队列成功, 我们仍需要二次检查线程池状态,以防在入队列的过程中,线程池关闭。 如果线程池关闭,则回滚任务。 * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. 如果任务入队列失败,则尝试创建一个工作线程执行任务 */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //如果当前工作线程数小于核心线程池数量,则添加新的工作线程执行任务 if (addWorker(command, true)) return; c = ctl.get(); } //如果当前工作线程数大于核心线程池数量,检查运行状态,如果是正在运行,则添加任务到任务队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //重新检查线程池运行状态,如果线程池非处于运行状态,则移除任务 if (! isRunning(recheck) && remove(command)) reject(command);//移除成功,则进行拒绝任务处理 else if (workerCountOf(recheck) == 0) //如线程池已关闭,且工作线程为0,则创建一个空闲工作线程 addWorker(null, false); } //根据最大线程池数量,判断是否应该添加工作线程,如果当前工作线程数量小于最大线程池数量,则尝试添加 //工作线程线程执行任务,如果尝试失败,则拒绝任务处理 else if (!addWorker(command, false)) reject(command); }
int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //如果当前工作线程数小于核心线程池数量,则添加新的工作线程执行任务 if (addWorker(command, true)) return; c = ctl.get(); }
/** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread#start), we roll back cleanly. * 根据当前线程池状态和核心线程池数量与最大线程池数量,检查是否应该, 添加工作线程执行任务。如果应该添加工作线程,则更新工作线程数, 如果调整成功,则创建工作线程,执行任务。如果线程是已关闭或正在关闭, 则添加工作线程失败。如果线程工厂创建线程失败,则返回false,如果由于 线程工厂返回null或OutOfMemoryError等原因,执行回滚清除工作。 * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) //如果线程池已关闭或线程池正在关闭,提交的任务为null且任务队列不为空,则直接返回false //添加工作线程失败。 return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) //如果工作线程数量大于线程池容量, //或当前工作线程数量大于core(如果core,为true,则为corePoolSize,否则maximumPoolSize) return false; if (compareAndIncrementWorkerCount(c)) //CAS操作工作线程数,即原子操作工作线程数+1,成功则跳出自旋 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) //如果在判断是否应该添加工作线程执行任务和CAS操作工作线程数, //线程状态改变,跳出本次自旋 continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false;//工作线程是否开始 boolean workerAdded = false;//工作线程是否添加成功 Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //如果线程池是正在运行或线程池正在关闭,任务为null if (t.isAlive()) // precheck that t is startable //线程存活,抛出非法线程状态异常 throw new IllegalThreadStateException(); //添加工作线程,到工作线程集 workers.add(w); int s = workers.size(); if (s > largestPoolSize) //更新最大线程池数量 largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //添加工作线程成功,则执行任务 t.start(); workerStarted = true; } } } finally { if (! workerStarted) //执行任务失败,则回滚工作线程和工作线程数 addWorkerFailed(w); } return workerStarted; }
if (! workerStarted) //执行任务失败,则回滚工作线程和工作线程数 addWorkerFailed(w);
/** * Rolls back the worker thread creation. * - removes worker from workers, if present * - decrements worker count * - rechecks for termination, in case the existence of this * worker was holding up termination */ private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) //从工作线程集移除工作线程 workers.remove(w); //工作线程数减-1 decrementWorkerCount(); //检查是否线程池关闭,关闭则执行相关工作 //这个我们在前面说过,则里简单回顾一下 tryTerminate(); } finally { mainLock.unlock(); } }
/** * Transitions to TERMINATED state if either (SHUTDOWN and pool * and queue empty) or (STOP and pool empty). If otherwise * eligible to terminate but workerCount is nonzero, interrupts an * idle worker to ensure that shutdown signals propagate. This * method must be called following any action that might make * termination possible -- reducing worker count or removing tasks * from the queue during shutdown. The method is non-private to * allow access from ScheduledThreadPoolExecutor. */ final void tryTerminate() { //自旋尝试关闭线程池 for (;;) { int c = ctl.get(); //如果线程池正在运行,或正在关闭且队列不为空,则返回 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate //如果工作线程不为空,则中断空闲工作线程 interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //线程池已关闭,任务队列为空,工作线程为0,更新线程池状态为TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { //执行结束工作 terminated(); } finally { //线程池已结束 ctl.set(ctlOf(TERMINATED, 0)); //唤醒等待线程池结束的线程 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
interruptIdleWorkers(ONLY_ONE); /** * Interrupts threads that might be waiting for tasks (as * indicated by not being locked) so they can check for * termination or configuration changes. Ignores * SecurityExceptions (in which case some threads may remain * uninterrupted). * 中断等待任务的空闲非锁住状态的工作线程 * @param onlyOne If true, interrupt at most one worker. This is * called only from tryTerminate when termination is otherwise * enabled but there are still other workers. In this case, at * most one waiting worker is interrupted to propagate shutdown * signals in case all threads are currently waiting. * Interrupting any arbitrary thread ensures that newly arriving * workers since shutdown began will also eventually exit. * To guarantee eventual termination, it suffices to always * interrupt only one idle worker, but shutdown() interrupts all * idle workers so that redundant workers exit promptly, not * waiting for a straggler task to finish. 如果onlyOne为true以为着,只中断最多一个空闲工作线程,这个在关闭线程池时, 调用或关闭的过程中,工作线程完成任务调用。 */ private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { //遍历工作线程集 Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) {//锁打开说明,工作线程空闲 try { //如果工作线程非中断,且空闲,尝试获取锁,获取锁成功,则中断工作线程 t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) //如果是只中断一个空闲线程,则结束本次中断空闲线程任务 break; } } finally { mainLock.unlock(); } }
terminated();
/** * Method invoked when the Executor has terminated. Default * implementation does nothing. Note: To properly nest multiple * overridings, subclasses should generally invoke 待子类扩展 * {@code super.terminated} within this method. */ protected void terminated() { }
//如果当前工作线程数大于核心线程池数量,检查运行状态,如果是正在运行,则添加任务到任务队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //重新检查线程池运行状态,如果线程池非处于运行状态,则移除任务 if (! isRunning(recheck) && remove(command)) reject(command);//移除成功,则进行拒绝任务处理 else if (workerCountOf(recheck) == 0) //如线程池非运行状态,且工作线程为0,则创建一个空闲工作线程 //即线程池正在关闭之后的状态,且任务队列不为空 addWorker(null, false); }
/** * Invokes the rejected execution handler for the given command. * Package-protected for use by ScheduledThreadPoolExecutor. */ final void reject(Runnable command) { //调用拒绝任务处理器处理任务 handler.rejectedExecution(command, this); }
//根据最大线程池数量,判断是否应该添加工作线程,如果当前工作线程数量小于最大线程池数量,则尝试添加 //工作线程线程执行任务,如果尝试失败,则拒绝任务处理 else if (!addWorker(command, false)) reject(command);
final void runWorker(Worker w) { Thread wt = Thread.currentThread();//当前线程 Runnable task = w.firstTask;//工作线程任务 w.firstTask = null; //任务线程的锁状态默认为-1,此时解锁+1,变为0,即锁打开状态,允许中断,在任务未执行之前,不允许中断。 w.unlock(); // allow interrupts, boolean completedAbruptly = true;//完成后是否可以中断 try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt //如果线程池正在Stop,则确保线程中断; //如果非处于Stop之后的状态,则判断是否中断,如果中断则判断线程池是否已关闭 //如果线程池正在关闭,但没有中断,则中断线程池 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //执行前工作 beforeExecute(wt, task); Throwable thrown = null; try { //执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //执行后工作 afterExecute(task, thrown); } } finally { task = null; //任务线程完成任务数量加1,释放锁 w.completedTasks++; w.unlock(); } } //任务已执行完不可以中断 completedAbruptly = false; } finally { //处理任务完成后的工作 processWorkerExit(w, completedAbruptly); } }
while (task != null || (task = getTask()) != null) {
/** * Performs blocking or timed wait for a task, depending on * current configuration settings, or returns null if this worker * must exit because of any of: * 1. There are more than maximumPoolSize workers (due to * a call to setMaximumPoolSize). * 2. The pool is stopped. * 3. The pool is shutdown and the queue is empty. * 4. This worker timed out waiting for a task, and timed-out * workers are subject to termination (that is, * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) * both before and after the timed wait. * * @return task, or null if the worker must exit, in which case * workerCount is decremented */ private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 如果线程池处于关闭之后或已关闭任务队列为空,则重置工作线程数 decrementWorkerCount(); return null;//返回null任务 } boolean timed; // Are workers subject to culling? for (;;) { int wc = workerCountOf(c); //如果线程池正在运行,根据是否允许空闲线程等待任务和 //当前工作线程与核心线程池数量比较值,判断是否需要超时等待任务 timed = allowCoreThreadTimeOut || wc > corePoolSize; if (wc <= maximumPoolSize && ! (timedOut && timed)) //如果当前工作线程数,小于最大线程数,空闲工作线程不需要超时等待任务, //则跳出自旋,即在当前工作线程小于最大线程池的情况下,有工作线程可用, //任务队列为空。 break; if (compareAndDecrementWorkerCount(c)) //减少工作线程数量失败,返回null return null; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) //如果与自旋前状态不一致,跳出本次自旋 continue retry; // else CAS failed due to workerCount change; retry inner loop } try { //如果非超时则直接take,否则等待keepAliveTime时间,poll任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
//如果当前工作线程数大于核心线程池数量,检查运行状态,如果是正在运行,则添加任务到任务队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //重新检查线程池运行状态,如果线程池非处于运行状态,则移除任务 if (! isRunning(recheck) && remove(command)) reject(command);//移除成功,则进行拒绝任务处理 else if (workerCountOf(recheck) == 0) //如线程池非运行状态,且工作线程为0,则创建一个空闲工作线程 //即线程池正在关闭之后的状态,且任务队列不为空 addWorker(null, false); }
//如线程池非运行状态,且工作线程为0,则创建一个空闲工作线程 //即线程池正在关闭之后的状态,且任务队列不为空 addWorker(null, false);