Tomcat 线程池学习总结(tomcat线程池 threadlocal)

  本篇文章为你整理了Tomcat 线程池学习总结(tomcat线程池 threadlocal)的详细内容,包含有tomcat 线程池配置与优化 tomcat线程池 threadlocal tomcat线程池满应该如何处理 tomcat线程池调优 Tomcat 线程池学习总结,希望能帮助你了解 Tomcat 线程池学习总结。

  Tomcat线程池介绍

  Tomcat线程池,源于JAVA JDK自带线程池。由于JAVA JDK线程池策略,比较适合处理 CPU 密集型任务,但是对于 I/O 密集型任务,如数据库查询,rpc 请求调用等,不是很友好,所以Tomcat在其基础上进行了扩展。

  任务处理流程

  扩展线程池相关源码简析

  Tomcat中定义了一个StandardThreadExecutor类,该类实现了org.apache.catalina.Executor,org.apache.tomcat.util.threads.ResizableExecutor接口

  该类内部定义了namePrefix(创建的线程名称前缀,默认值tomcat-exec-),maxThreads(最大线程数,默认值 200),minSpareThreads(最小线程数,即核心线程数,默认值 25),maxIdleTime(线程最大空闲时间,毫秒为单位,默认值60秒),maxQueueSize(最大队列大小,默认值 Integer.MAX_VALUE)等属性,此外,还定义了一个org.apache.tomcat.util.threads.ThreadPoolExecutor类型的执行器对象,一个execute(Runnable command) 方法

  当execute(Runnable command) 方法被调用时,会调用上述ThreadPoolExecutor类对象的execute方法

  org.apache.catalina.core.StandardThreadExecutor.java

  

import org.apache.catalina.Executor;

 

  import org.apache.catalina.LifecycleException;

  import org.apache.catalina.LifecycleState;

  import org.apache.catalina.util.LifecycleMBeanBase;

  import org.apache.tomcat.util.res.StringManager;

  import org.apache.tomcat.util.threads.ResizableExecutor;

  import org.apache.tomcat.util.threads.TaskQueue;

  import org.apache.tomcat.util.threads.TaskThreadFactory;

  import org.apache.tomcat.util.threads.ThreadPoolExecutor;

  public class StandardThreadExecutor extends LifecycleMBeanBase

   implements Executor, ResizableExecutor {

   protected static final StringManager sm = StringManager.getManager(StandardThreadExecutor.class);

   // ---------------------------------------------- Properties

   * Default thread priority

   protected int threadPriority = Thread.NORM_PRIORITY;

   * Run threads in daemon or non-daemon state

   protected boolean daemon = true;

   * Default name prefix for the thread name

   protected String namePrefix = "tomcat-exec-";

   * max number of threads

   protected int maxThreads = 200;

   * min number of threads

   protected int minSpareThreads = 25;

   * idle time in milliseconds

   protected int maxIdleTime = 60000;

   * The executor we use for this component

   protected ThreadPoolExecutor executor = null;

   * the name of this thread pool

   protected String name;

   * The maximum number of elements that can queue up before we reject them

   protected int maxQueueSize = Integer.MAX_VALUE;

   * After a context is stopped, threads in the pool are renewed. To avoid

   * renewing all threads at the same time, this delay is observed between 2

   * threads being renewed.

   protected long threadRenewalDelay =

   org.apache.tomcat.util.threads.Constants.DEFAULT_THREAD_RENEWAL_DELAY;

   private TaskQueue taskqueue = null;

   // ---------------------------------------------- Constructors

   public StandardThreadExecutor() {

   //empty constructor for the digester

   //....此处代码已省略

   @Override

   public void execute(Runnable command) {

   if (executor != null) {

   // Note any RejectedExecutionException due to the use of TaskQueue

   // will be handled by the o.a.t.u.threads.ThreadPoolExecutor

   executor.execute(command);

   } else {

   throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted"));

   //....此处代码已省略

  

 

  当org.apache.tomcat.util.threads.ThreadPoolExecuto类对象的execute(Runnable command) 方法被调用时,会调用该类定义的一个executeInternal方法,并在捕获到RejectedExecutionException异常时,尝试再次将任务放入工作队列中。

  executeInternal方法中,通过代码可知,当前线程数小于核心线程池大小时,会创建新线程,否则,会调用workQueue对象(org.apache.tomcat.util.threads.TaskQueue类型)的offer方法,将任务进行排队。Tomcat通过控制workQueue.offer()方法的返回值,实现了当前线程数超过核心线程池大小时,优先创建线程,而不是让任务排队。

  org.apache.tomcat.util.threads.ThreadPoolExecutor

  

public class ThreadPoolExecutor extends AbstractExecutorService {

 

   //...此处代码已省略

   @Override

   public void execute(Runnable command) {

   submittedCount.incrementAndGet();

   try {

   executeInternal(command);

   } catch (RejectedExecutionException rx) {

   if (getQueue() instanceof TaskQueue) {

   // If the Executor is close to maximum pool size, concurrent

   // calls to execute() may result (due to Tomcats use of

   // TaskQueue) in some tasks being rejected rather than queued.

   // If this happens, add them to the queue.

   final TaskQueue queue = (TaskQueue) getQueue();

   if (!queue.force(command)) {

   submittedCount.decrementAndGet();

   throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));

   } else {

   submittedCount.decrementAndGet();

   throw rx;

  
* Executes the given task sometime in the future. The task

   * may execute in a new thread or in an existing pooled thread.

   * If the task cannot be submitted for execution, either because this

   * executor has been shutdown or because its capacity has been reached,

   * the task is handled by the current {@link RejectedExecutionHandler}.

   * @param command the task to execute

   * @throws RejectedExecutionException at discretion of

   * {@code RejectedExecutionHandler}, if the task

   * cannot be accepted for execution

   * @throws NullPointerException if {@code command} is null

   private void executeInternal(Runnable command) {

   if (command == null) {

   throw new NullPointerException();

   * Proceed in 3 steps:

   * 1. If fewer than corePoolSize threads are running, try to

   * start a new thread with the given command as its first

   * task. The call to addWorker atomically checks runState and

   * workerCount, and so prevents false alarms that would add

   * threads when it shouldnt, by returning false.

   * 2. If a task can be successfully queued, then we still need

   * to double-check whether we should have added a thread

   * (because existing ones died since last checking) or that

   * the pool shut down since entry into this method. So we

   * recheck state and if necessary roll back the enqueuing if

   * stopped, or start a new thread if there are none.

   * 3. If we cannot queue task, then we try to add a new

   * thread. If it fails, we know we are shut down or saturated

   * and so reject the task.

   int c = ctl.get();

   if (workerCountOf(c) corePoolSize) { // 当前线程数小于核心线程数时,

   if (addWorker(command, true)) { // 创建线程

   return;

   c = ctl.get();

   if (isRunning(c) workQueue.offer(command)) { //workQueue.offer(command)为false时,会走以下的else if分支,创建线程

   int recheck = ctl.get();

   if (! isRunning(recheck) remove(command)) {

   reject(command);

   } else if (workerCountOf(recheck) == 0) {

   addWorker(null, false);

   else if (!addWorker(command, false)) {

   reject(command);

   //...此处代码已省略

  

 

  org.apache.tomcat.util.threads.TaskQueue继承于java.util.concurrent.LinkedBlockingQueue,并重写了offer(排队任务的方法),该方法中,当当前线程数大于核心线程数,小于最大线程数时,返回false,导致上述executeInternal方法中workQueue.offer(command)为false,进而导致该分支代码不被执行,执行addWorker(command, false)方法,创建新线程。

  org.apache.tomcat.util.threads.TaskQueue

  

import java.util.Collection;

 

  import java.util.concurrent.LinkedBlockingQueue;

  import java.util.concurrent.RejectedExecutionException;

  import java.util.concurrent.TimeUnit;

  import org.apache.tomcat.util.res.StringManager;

   * As task queue specifically designed to run with a thread pool executor. The

   * task queue is optimised to properly utilize threads within a thread pool

   * executor. If you use a normal queue, the executor will spawn threads when

   * there are idle threads and you wont be able to force items onto the queue

   * itself.

  public class TaskQueue extends LinkedBlockingQueue Runnable {

   //...此处代码已省略

   * Used to add a task to the queue if the task has been rejected by the Executor.

   * @param o The task to add to the queue

   * @return {@code true} if the task was added to the queue,

   * otherwise {@code false}

   public boolean force(Runnable o) {

   if (parent == null parent.isShutdown()) {

   throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));

   return super.offer(o); //forces the item onto the queue, to be used if the task is rejected

  
//we are maxed out on threads, simply queue the object

   if (parent.getPoolSize() == parent.getMaximumPoolSize()) {

   return super.offer(o);

   //we have idle threads, just add it to the queue

   if (parent.getSubmittedCount() =(parent.getPoolSize())) {

   return super.offer(o);

   //if we have less threads than maximum force creation of a new thread

   if (parent.getPoolSize() parent.getMaximumPoolSize()) {

   return false;

   //if we reached here, we need to add it to the queue

   return super.offer(o);

   //...此处代码已省略

  

 

  https://gitee.com/apache/tomcat/blob/10.1.x/java/org/apache/catalina/core/StandardThreadExecutor.java

  以上就是Tomcat 线程池学习总结(tomcat线程池 threadlocal)的详细内容,想要了解更多 Tomcat 线程池学习总结的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: