ThreadPoolExecutor解析三(线程池履行提交使命)ITeye - AG环亚娱乐集团

ThreadPoolExecutor解析三(线程池履行提交使命)ITeye

2019-01-11 06:07:24 | 作者: 水风 | 标签: 线程,使命,履行 | 浏览: 1339

Executor接口的界说:接口界说:接口界说:解析:接口界说:解析:解析:接口界说:解析一(中心线程池数量、线程池状况等) :
解析二(线程工厂、作业线程,回绝战略等):
上一篇文章咱们看了一下线程工厂、作业线程,回绝战略,先回忆一下:
  默许的线程池回绝使命处理战略AbortPolicy,直接抛出RejectedExecutionException;直接丢掉战略DiscardPolicy,丢掉旧的使命DiscardOldestPolicy,调用者履行使命CallerRunsPolicy。DiscardOldestPolicy和CallerRunsPolicy都是在线程池没封闭时,战略才收效,不然封闭直接丢掉使命。
回绝战略都为ThreadPoolExecutor的内部类。
  默许的线程工厂DefaultThreadFactory为Executors的内部类, 用于创立线程,工厂创立分组相同的线程,交由履行器履行。假如有java.lang.SecurityManager,则用System#getSecurityManager线程组,不然用调用者的线程组。创立的新线程为非看护形式,优先级在 MIN_PRIORITY和MAX_PRIORITY之间,默许为NORM_PRIORITY。能够经过Thread#getName获取线程name,默许为pool-N-thread-M,N为线程池编号,M为线程编号。
  Worker包装了使命线程,首要是为了维护中止操控状况和其他非必须状况记载,及使命的履行。Worker一起承继了AQS,在使命线程履行前lock,使命履行完unlock。加锁的意图首要是维护使命线程的履行,线程池唤醒一个使命线程等候使命,而不是中止当时正在履行使命的线程去履行使命。Worker使用了一个 非重入互质锁,而不是ReentrantLock,这样做的意图是以防在使命履行的进程,线程池操控办法的改动,对使命线程履行的影响,比方setCorePoolSize办法。别的为了避免使命线程在实践履行前被中止,咱们初始化锁状况为-1,在runWorker办法中,咱们会铲除它。runWorker履行使命时,首要开释锁,此刻锁翻开,答应中止,假如线程池正在stop,保证线程池已中止,不然
做履行前作业,履行使命,做履行后作业,假如使命被中止,则作业线程数量减1;
假如使命完结,则更新完结使命数量,从作业使命会集移除作业线程,测验完毕线程池。
  测验完毕线程池,首要查看线程池运转状况假如为运转中,封闭但使命行列不为空,
或线程池作业线程为0,使命行列为空,则直接回来;不然查看作业线程是否为0,不为0,则依据onlyOne参数确认中止多少闲暇线程,假如onlyOne为true,中止一个,不然中止一切闲暇线程。
今日咱们来看使命的提交和履行,假如篇幅够的话,把线程池的封闭也说一下。
先来看使命的提交履行。
首要提交使命:
//AbstractExecutorService
 public Future ? submit(Runnable task) {
 if (task == null) throw new NullPointerException();
 RunnableFuture Void ftask = newTaskFor(task, null);
 execute(ftask);
 return ftask;
 }

提交使命办法是在AbstractExecutorService中完成 ,详细的履行使命在
execute办法中,这个办法在Executor其间为笼统办法,
ThreadPoolExecutor重写了这个办法,来看execute办法
/**
 * Executes the given task sometime in the future. The task
 * may execute in a new thread or in an existing pooled thread.
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 * @param command the task to execute
 * @throws RejectedExecutionException at discretion of
 * {@code RejectedExecutionHandler}, if the task
 * cannot be accepted for execution
 * @throws NullPointerException if {@code command} is null
 public void execute(Runnable command) {
 if (command == null)
 throw new NullPointerException();
 * Proceed in 3 steps:
 * 1. If fewer than corePoolSize threads are running, try to
 * start a new thread with the given command as its first
 * task. The call to addWorker atomically checks runState and
 * workerCount, and so prevents false alarms that would add
 * threads when it shouldnt, by returning false.
 1.假如作业线程小于中心线程池数量,测验新建一个作业线程履行使命addWorker。
 addWorker将会主动查看线程池状况和作业线程数,以防在增加作业线程的进程中,
 线程池被封闭。
 * 2. If a task can be successfully queued, then we still need
 * to double-check whether we should have added a thread
 * (because existing ones died since last checking) or that
 * the pool shut down since entry into this method. So we
 * recheck state and if necessary roll back the enqueuing if
 * stopped, or start a new thread if there are none.
 2.假如创立作业线程履行使命失利,则使命入行列,假如入行列成功,
 咱们仍需求二次查看线程池状况,以防在入行列的进程中,线程池封闭。
 假如线程池封闭,则回滚使命。
 * 3. If we cannot queue task, then we try to add a new
 * thread. If it fails, we know we are shut down or saturated
 * and so reject the task.
 假如使命入行列失利,则测验创立一个作业线程履行使命
 int c = ctl.get();
 if (workerCountOf(c) corePoolSize) {
 //假如当时作业线程数小于中心线程池数量,则增加新的作业线程履行使命
 if (addWorker(command, true))
 return;
 c = ctl.get();
 //假如当时作业线程数大于中心线程池数量,查看运转状况,假如是正在运转,则增加使命到使命行列
 if (isRunning(c) workQueue.offer(command)) {
 int recheck = ctl.get();
 //从头查看线程池运转状况,假如线程池非处于运转状况,则移除使命
 if (! isRunning(recheck) remove(command))
 reject(command);//移除成功,则进行回绝使命处理
 else if (workerCountOf(recheck) == 0)
 //如线程池已封闭,且作业线程为0,则创立一个闲暇作业线程
 addWorker(null, false);
 //依据最大线程池数量,判别是否应该增加作业线程,假如当时作业线程数量小于最大线程池数量,则测验增加
 //作业线程线程履行使命,假如测验失利,则回绝使命处理
 else if (!addWorker(command, false))
 reject(command);
 }

履行使命办法中咱们与3点要看:
1.作业线程数量小于中心线程池数量,增加作业线程,履行使命;
 int c = ctl.get();
 if (workerCountOf(c) corePoolSize) {
 //假如当时作业线程数小于中心线程池数量,则增加新的作业线程履行使命
 if (addWorker(command, true))
 return;
 c = ctl.get();
}

咱们来看addWorker增加作业线程履行使命;
 /**
 * Checks if a new worker can be added with respect to current
 * pool state and the given bound (either core or maximum). If so,
 * the worker count is adjusted accordingly, and, if possible, a
 * new worker is created and started, running firstTask as its
 * first task. This method returns false if the pool is stopped or
 * eligible to shut down. It also returns false if the thread
 * factory fails to create a thread when asked. If the thread
 * creation fails, either due to the thread factory returning
 * null, or due to an exception (typically OutOfMemoryError in
 * Thread#start), we roll back cleanly.
 依据当时线程池状况和中心线程池数量与最大线程池数量,查看是否应该,
 增加作业线程履行使命。假如应该增加作业线程,则更新作业线程数,
 假如调整成功,则创立作业线程,履行使命。假如线程是已封闭或正在封闭,
 则增加作业线程失利。假如线程工厂创立线程失利,则回来false,假如因为
 线程工厂回来null或OutOfMemoryError等原因,履行回滚铲除作业。
 * @param firstTask the task the new thread should run first (or
 * null if none). Workers are created with an initial first task
 * (in method execute()) to bypass queuing when there are fewer
 * than corePoolSize threads (in which case we always start one),
 * or when the queue is full (in which case we must bypass queue).
 * Initially idle threads are usually created via
 * prestartCoreThread or to replace other dying workers.
 * @param core if true use corePoolSize as bound, else
 * maximumPoolSize. (A boolean indicator is used here rather than a
 * value to ensure reads of fresh values after checking other pool
 * state).
 * @return true if successful
 private boolean addWorker(Runnable firstTask, boolean core) {
 retry:
 for (;;) {
 int c = ctl.get();
 int rs = runStateOf(c);
 // Check if queue empty only if necessary.
 if (rs = SHUTDOWN 
 ! (rs == SHUTDOWN 
 firstTask == null 
 ! workQueue.isEmpty()))
 //假如线程池已封闭或线程池正在封闭,提交的使命为null且使命行列不为空,则直接回来false
 //增加作业线程失利。
 return false;
 for (;;) {
 int wc = workerCountOf(c);
 if (wc = CAPACITY ||
 wc = (core ? corePoolSize : maximumPoolSize))
 //假如作业线程数量大于线程池容量,
 //或当时作业线程数量大于core(假如core,为true,则为corePoolSize,不然maximumPoolSize)
 return false;
 if (compareAndIncrementWorkerCount(c))
 //CAS操作作业线程数,即原子操作作业线程数+1,成功则跳出自旋
 break retry;
 c = ctl.get(); // Re-read ctl
 if (runStateOf(c) != rs)
 //假如在判别是否应该增加作业线程履行使命和CAS操作作业线程数,
 //线程状况改动,跳出本次自旋
 continue retry;
 // else CAS failed due to workerCount change; retry inner loop
 boolean workerStarted = false;//作业线程是否开端
 boolean workerAdded = false;//作业线程是否增加成功
 Worker w = null;
 try {
 final ReentrantLock mainLock = this.mainLock;
 w = new Worker(firstTask);
 final Thread t = w.thread;
 if (t != null) {
 mainLock.lock();
 try {
 // Recheck while holding lock.
 // Back out on ThreadFactory failure or if
 // shut down before lock acquired.
 int c = ctl.get();
 int rs = runStateOf(c);
 if (rs SHUTDOWN ||
 (rs == SHUTDOWN firstTask == null)) {
 //假如线程池是正在运转或线程池正在封闭,使命为null
 if (t.isAlive()) // precheck that t is startable
 //线程存活,抛出不合法线程状况反常
 throw new IllegalThreadStateException();
 //增加作业线程,到作业线程集
 workers.add(w);
 int s = workers.size();
 if (s largestPoolSize)
 //更新最大线程池数量
 largestPoolSize = s;
 workerAdded = true;
 } finally {
 mainLock.unlock();
 if (workerAdded) {
 //增加作业线程成功,则履行使命
 t.start();
 workerStarted = true;
 } finally {
 if (! workerStarted)
 //履行使命失利,则回滚作业线程和作业线程数
 addWorkerFailed(w);
 return workerStarted;
 }

再来看履行失利回滚处理:
 if (! workerStarted)
//履行使命失利,则回滚作业线程和作业线程数
 addWorkerFailed(w);

 /**
 * Rolls back the worker thread creation.
 * - removes worker from workers, if present
 * - decrements worker count
 * - rechecks for termination, in case the existence of this
 * worker was holding up termination
 private void addWorkerFailed(Worker w) {
 final ReentrantLock mainLock = this.mainLock;
 mainLock.lock();
 try {
 if (w != null)
 //从作业线程集移除作业线程
 workers.remove(w);
 //作业线程数减-1
 decrementWorkerCount();
 //查看是否线程池封闭,封闭则履行相关作业
 //这个咱们在前面说过,则里简略回忆一下
 tryTerminate();
 } finally {
 mainLock.unlock();
 }

//tryTerminate
 /**
 * Transitions to TERMINATED state if either (SHUTDOWN and pool
 * and queue empty) or (STOP and pool empty). If otherwise
 * eligible to terminate but workerCount is nonzero, interrupts an
 * idle worker to ensure that shutdown signals propagate. This
 * method must be called following any action that might make
 * termination possible -- reducing worker count or removing tasks
 * from the queue during shutdown. The method is non-private to
 * allow access from ScheduledThreadPoolExecutor.
 final void tryTerminate() {
 //自旋测验封闭线程池
 for (;;) {
 int c = ctl.get();
 //假如线程池正在运转,或正在封闭且行列不为空,则回来
 if (isRunning(c) ||
 runStateAtLeast(c, TIDYING) ||
 (runStateOf(c) == SHUTDOWN ! workQueue.isEmpty()))
 return;
 if (workerCountOf(c) != 0) { // Eligible to terminate
 //假如作业线程不为空,则中止闲暇作业线程
 interruptIdleWorkers(ONLY_ONE);
 return;
 final ReentrantLock mainLock = this.mainLock;
 mainLock.lock();
 try {
 //线程池已封闭,使命行列为空,作业线程为0,更新线程池状况为TIDYING
 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
 try {
 //履行完毕作业
 terminated();
 } finally {
 //线程池已完毕
 ctl.set(ctlOf(TERMINATED, 0));
 //唤醒等候线程池完毕的线程
 termination.signalAll();
 return;
 } finally {
 mainLock.unlock();
 // else retry on failed CAS

来看测验完毕线程池的这一点,
//假如作业线程不为空,则中止闲暇作业线程
interruptIdleWorkers(ONLY_ONE);
 * Interrupts threads that might be waiting for tasks (as
 * indicated by not being locked) so they can check for
 * termination or configuration changes. Ignores
 * SecurityExceptions (in which case some threads may remain
 * uninterrupted).
 中止等候使命的闲暇非锁住状况的作业线程
 * @param onlyOne If true, interrupt at most one worker. This is
 * called only from tryTerminate when termination is otherwise
 * enabled but there are still other workers. In this case, at
 * most one waiting worker is interrupted to propagate shutdown
 * signals in case all threads are currently waiting.
 * Interrupting any arbitrary thread ensures that newly arriving
 * workers since shutdown began will also eventually exit.
 * To guarantee eventual termination, it suffices to always
 * interrupt only one idle worker, but shutdown() interrupts all
 * idle workers so that redundant workers exit promptly, not
 * waiting for a straggler task to finish.
 假如onlyOne为true以为着,只中止最多一个闲暇作业线程,这个在封闭线程池时,
 调用或封闭的进程中,作业线程完结使命调用。
 private void interruptIdleWorkers(boolean onlyOne) {
 final ReentrantLock mainLock = this.mainLock;
 mainLock.lock();
 try {
 for (Worker w : workers) {
 //遍历作业线程集
 Thread t = w.thread;
 if (!t.isInterrupted() w.tryLock()) {//锁翻开阐明,作业线程闲暇
 try {
 //假如作业线程非中止,且闲暇,测验获取锁,获取锁成功,则中止作业线程
 t.interrupt();
 } catch (SecurityException ignore) {
 } finally {
 w.unlock();
 if (onlyOne)
 //假如是只中止一个闲暇线程,则完毕本次中止闲暇线程使命
 break;
 } finally {
 mainLock.unlock();
 }

再看
//履行完毕作业
terminated();

/**
 * Method invoked when the Executor has terminated. Default
 * implementation does nothing. Note: To properly nest multiple
 * overridings, subclasses should generally invoke
 待子类扩展
 * {@code super.terminated} within this method.
 protected void terminated() { }

末节一下:
依据当时线程池状况和中心线程池数量与最大线程池数量,查看是否应该,
增加作业线程履行使命。假如应该增加作业线程,则更新作业线程数,
假如调整成功,则创立作业线程,履行使命。假如线程是已封闭或正在封闭,
则增加作业线程失利。假如线程工厂创立线程失利,则回来false,假如因为
线程工厂回来null或OutOfMemoryError等原因,履行回滚铲除作业。
回滚铲除作业首要是作业线程和作业线程数。最终查看线程是是否封闭,
假如线程池正在运转,或正在封闭且行列不为空,则直接回来,不然及线程池已封闭
,查看作业线程是否为0,不为零依据ONLY_ONE判别中止一个闲暇线程仍是多个。


2.假如增加作业线程失利,则增加使命到行列,并进行双查看,假如在上述期间,线程池封闭,
回滚使命,从行列中移除使命;
//假如当时作业线程数大于中心线程池数量,查看运转状况,假如是正在运转,则增加使命到使命行列
 if (isRunning(c) workQueue.offer(command)) {
 int recheck = ctl.get();
 //从头查看线程池运转状况,假如线程池非处于运转状况,则移除使命
 if (! isRunning(recheck) remove(command))
 reject(command);//移除成功,则进行回绝使命处理
 else if (workerCountOf(recheck) == 0)
 //如线程池非运转状况,且作业线程为0,则创立一个闲暇作业线程
 //即线程池正在封闭之后的状况,且使命行列不为空
 addWorker(null, false);
 }

这部在看了第一点之后,没有什么可看的了,一看就了解,
来一下 reject(command);
/**
 * Invokes the rejected execution handler for the given command.
 * Package-protected for use by ScheduledThreadPoolExecutor.
 final void reject(Runnable command) {
 //调用回绝使命处理器处理使命
 handler.rejectedExecution(command, this);
 }

3.假如使命入行列失利,依据作业线程数量是否大于最大线程池数量,来判别是否应该增加作业线程履行使命;
假如作业线程小于最大线程池数量,则CAS操作workCount,成功创立作业线程履行使命。
 //依据最大线程池数量,判别是否应该增加作业线程,假如当时作业线程数量小于最大线程池数量,则测验增加
//作业线程线程履行使命,假如测验失利,则回绝使命处理
 else if (!addWorker(command, false))
 reject(command);

有了前面的两点,这一点很容量了解。
在前面一篇文章中,咱们讲了作业线程,这一篇咱们简略看了一下线程池履行使命,
咱们回到上一篇的线程履行,还有一点咱们没有看:
final void runWorker(Worker w) {
 Thread wt = Thread.currentThread();//当时线程
 Runnable task = w.firstTask;//作业线程使命
 w.firstTask = null;
 //使命线程的锁状况默许为-1,此刻解锁+1,变为0,即锁翻开状况,答应中止,在使命未履行之前,不答应中止。
 w.unlock(); // allow interrupts,
 boolean completedAbruptly = true;//完结后是否能够中止
 try {
 while (task != null || (task = getTask()) != null) {
 w.lock();
 // If pool is stopping, ensure thread is interrupted; 
 // if not, ensure thread is not interrupted. This
 // requires a recheck in second case to deal with
 // shutdownNow race while clearing interrupt
 //假如线程池正在Stop,则保证线程中止;
 //假如非处于Stop之后的状况,则判别是否中止,假如中止则判别线程池是否已封闭
 //假如线程池正在封闭,但没有中止,则中止线程池
 if ((runStateAtLeast(ctl.get(), STOP) ||
 (Thread.interrupted() 
 runStateAtLeast(ctl.get(), STOP))) 
 !wt.isInterrupted())
 wt.interrupt();
 try {
 //履行前作业
 beforeExecute(wt, task);
 Throwable thrown = null;
 try {
 //履行使命
 task.run();
 } catch (RuntimeException x) {
 thrown = x; throw x;
 } catch (Error x) {
 thrown = x; throw x;
 } catch (Throwable x) {
 thrown = x; throw new Error(x);
 } finally {
 //履行后作业
 afterExecute(task, thrown);
 } finally {
 task = null;
 //使命线程完结使命数量加1,开释锁
 w.completedTasks++;
 w.unlock();
 //使命已履行完不能够中止
 completedAbruptly = false;
 } finally {
 //处理使命完结后的作业
 processWorkerExit(w, completedAbruptly);

咱们来看使命的履行
while (task != null || (task = getTask()) != null) {

假如使命不为null,即创立作业线程成功,并履行使命,假如为null(即在线程池履行使命的时分,创立作业线程失利,使命入行列),从使命行列取一个使命。
来看从使命行列取使命:
 /**
 * Performs blocking or timed wait for a task, depending on
 * current configuration settings, or returns null if this worker
 * must exit because of any of:
 * 1. There are more than maximumPoolSize workers (due to
 * a call to setMaximumPoolSize).
 * 2. The pool is stopped.
 * 3. The pool is shutdown and the queue is empty.
 * 4. This worker timed out waiting for a task, and timed-out
 * workers are subject to termination (that is,
 * {@code allowCoreThreadTimeOut || workerCount corePoolSize})
 * both before and after the timed wait.
 * @return task, or null if the worker must exit, in which case
 * workerCount is decremented
 private Runnable getTask() {
 boolean timedOut = false; // Did the last poll() time out?
 retry:
 for (;;) {
 int c = ctl.get();
 int rs = runStateOf(c);
 // Check if queue empty only if necessary.
 if (rs = SHUTDOWN (rs = STOP || workQueue.isEmpty())) {
 假如线程池处于封闭之后或已封闭使命行列为空,则重置作业线程数
 decrementWorkerCount();
 return null;//回来null使命
 boolean timed; // Are workers subject to culling?
 for (;;) {
 int wc = workerCountOf(c);
 //假如线程池正在运转,依据是否答应闲暇线程等候使命和
 //当时作业线程与中心线程池数量比较值,判别是否需求超时等候使命
 timed = allowCoreThreadTimeOut || wc corePoolSize;
 if (wc = maximumPoolSize ! (timedOut timed))
 //假如当时作业线程数,小于最大线程数,闲暇作业线程不需求超时等候使命,
 //则跳出自旋,即在当时作业线程小于最大线程池的情况下,有作业线程可用,
 //使命行列为空。
 break;
 if (compareAndDecrementWorkerCount(c))
 //削减作业线程数量失利,回来null
 return null;
 c = ctl.get(); // Re-read ctl
 if (runStateOf(c) != rs)
 //假如与自旋前状况不一致,跳出本次自旋
 continue retry;
 // else CAS failed due to workerCount change; retry inner loop
 try {
 //假如非超时则直接take,不然等候keepAliveTime时刻,poll使命
 Runnable r = timed ?
 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
 workQueue.take();
 if (r != null)
 return r;
 timedOut = true;
 } catch (InterruptedException retry) {
 timedOut = false;
 }

再来看线程池履行使命的第二点,
2.假如增加作业线程失利,则增加使命到行列,并进行双查看,假如在上述期间,线程池封闭,回滚使命,从行列中移除使命;
//假如当时作业线程数大于中心线程池数量,查看运转状况,假如是正在运转,则增加使命到使命行列
 if (isRunning(c) workQueue.offer(command)) {
 int recheck = ctl.get();
 //从头查看线程池运转状况,假如线程池非处于运转状况,则移除使命
 if (! isRunning(recheck) remove(command))
 reject(command);//移除成功,则进行回绝使命处理
 else if (workerCountOf(recheck) == 0)
 //如线程池非运转状况,且作业线程为0,则创立一个闲暇作业线程
 //即线程池正在封闭之后的状况,且使命行列不为空
 addWorker(null, false);
 }

要害的一点在addWorker(null, false)
//如线程池非运转状况,且作业线程为0,则创立一个闲暇作业线程
//即线程池正在封闭之后的状况,且使命行列不为空
addWorker(null, false);

上述代码,假如成功增加的一个空使命的作业线程,使命为空的话,则
从使命行列取使命履行,这个进程与创立作业线程失利,使命入行列相
对应。

总结:
履行使命的进程为,假如作业线程数量小于中心线程池数量,增加作业线程,履行使命;假如增加作业线程失利,则增加使命到行列,并进行双查看,假如在上述期间,线程池封闭,回滚使命,从行列中移除使命;假如使命入行列失利,依据作业线程数量是否大于最大线程池数量,来判别是否应该增加作业线程履行使命;假如作业线程小于最大线程池数量,
则CAS操作workCount,成功创立作业线程履行使命。增加作业线程的进程为,假如应该增加作业线程,则CAS更新作业线程数,假如更新成功,则创立作业线程,履行使命。假如线程是已封闭或正在封闭,则增加作业线程失利。假如线程工厂创立线程失利,则回来false,假如因为线程工厂回来null或OutOfMemoryError等原因,履行回滚铲除作业。回滚铲除作业首要是作业线程和作业线程数。最终查看线程是是否封闭,假如线程池正在运转,或正在封闭且行列不为空,则直接回来,不然及线程池已封闭,查看作业线程是否为0,不为零依据ONLY_ONE判别中止一个闲暇线程仍是多个。
ThreadPoolExecutor解析四(线程池封闭):
版权声明
本文来源于网络,版权归原作者所有,其内容与观点不代表AG环亚娱乐集团立场。转载文章仅为传播更有价值的信息,如采编人员采编有误或者版权原因,请与我们联系,我们核实后立即修改或删除。

猜您喜欢的文章

阅读排行

  • 1

    vba upgradeITeye

    文件,程序,晋级
  • 2

    java 多线程ITeye

    线程,作业,内存
  • 3
  • 4

    ClassLoaderITeye

    运用,文件,办法
  • 5
  • 6

    手机号码校验合法性ITeye

    代表,必定,第二位
  • 7
  • 8

    Java 目标巨细的核算ITeye

    目标,巨细,字节
  • 9
  • 10