publicvoidexecute(Runnable command){ // ctl的值代表当前线程池中的线程数 int c = ctl.get(); // 如果当前线程数 < 核心线程数,直接创建新线程addWorker执行任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 如果线程池正常,进行任务入队等待 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //如果线程池不处于运行态,则从队列中移除然后执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); elseif (workerCountOf(recheck) == 0) addWorker(null, false); } //如果入队失败(已满)则判断是否<maximumSize,小于则创建新的线程执行任务,否则执行拒绝策略 elseif (!addWorker(command, false)) reject(command); }
//包装任务并创建线程执行任务 privatebooleanaddWorker(Runnable firstTask, boolean core){ retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);
// 检查线程池状态、任务是否为空、任务等待队列是否为空等 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) returnfalse; //自旋检查wc for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) returnfalse; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable thrownew IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
addWoker()://mainLock.lock。 //把任务Runnable包成Worker,new Worker时会使用线程工厂创建新的线程 this.thread = getThreadFactory().newThread(this); w = new Worker(firstTask); //加入线程池中 workers.add(w); // 启动线程 w.thread.start()//实际上会调用worker的runWorker方法 //线程启动失败会从池子中删除线程,并ctl--,mainLock.lock。 addWorkerFailed(w);
Worker://Worker继承了AQS,实现了Runnable接口 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ publicvoidrun(){ runWorker(this); }