Tomcat - 线程池的设计与实现:StandardThreadExecutor

上文中我们研究了下Service的设计和实现,StandardService中包含Executor的调用;这个比较好理解,Tomcat需要并发处理用户的请求,自然而言就想到线程池,那么Tomcat中线程池(Executor)具体是如何实现的?本文带你继续深度解析。@pdai

理解思路

我们如下几个方面开始引入线程池的,这里主要从上文Service引入,保持上下文之间的衔接,会很好的构筑你的知识体系。@pdai

  • 上文中我们了解到,Executor是包含在Service中的,Service中关于Executor的配置和相关代码如下:

server.xml中service里包含Executor的配置

<Service name="Catalina">
<!-- 1. 属性说明
	name:Service的名称
-->

    <!--2. 一个或多个excecutors --> // 看这里
    <!--
    <Executor name="tomcatThreadPool" namePrefix="catalina-exec-"
        maxThreads="150" minSpareThreads="4"/>
    -->
</Service>    
1
2
3
4
5
6
7
8
9
10
11

Service中executors相关方法

/**
  * Adds a named executor to the service
  * @param ex Executor
  */
@Override
public void addExecutor(Executor ex) {
    synchronized (executors) {
        if (!executors.contains(ex)) {
            executors.add(ex);
            if (getState().isAvailable()) {
                try {
                    ex.start(); // 启动
                } catch (LifecycleException x) {
                    log.error(sm.getString("standardService.executor.start"), x);
                }
            }
        }
    }
}

/**
  * Retrieves all executors
  * @return Executor[]
  */
@Override
public Executor[] findExecutors() {
    synchronized (executors) {
        Executor[] arr = new Executor[executors.size()];
        executors.toArray(arr);
        return arr;
    }
}


/**
  * Retrieves executor by name, null if not found
  * @param executorName String
  * @return Executor
  */
@Override
public Executor getExecutor(String executorName) {
    synchronized (executors) {
        for (Executor executor: executors) {
            if (executorName.equals(executor.getName()))
                return executor;
        }
    }
    return null;
}

/**
  * Removes an executor from the service
  * @param ex Executor
  */
@Override
public void removeExecutor(Executor ex) {
    synchronized (executors) {
        if ( executors.remove(ex) && getState().isAvailable() ) {
            try {
                ex.stop(); // 停止
            } catch (LifecycleException e) {
                log.error(sm.getString("standardService.executor.stop"), e);
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
  • 和Server、Service实现一样,StandardThreadExecutor也是继承LifecycleMBeanBase;然后实现Executor的接口。

  • Tomcat关于Executor相关的配置文档

http://tomcat.apache.org/tomcat-9.0-doc/config/executor.html

准备知识

在理解Tomcat的线程池时,需要有一定的基础,这里推荐学习下JDK关于线程池的设计和实现。

并发基础

  • Java 并发 - 理论基础

    • 多线程的出现是要解决什么问题的?
    • 线程不安全是指什么? 举例说明
    • 并发出现线程不安全的本质什么? 可见性,原子性和有序性。
    • Java是怎么解决并发问题的? 3个关键字,JMM和8个Happens-Before
    • 线程安全是不是非真即假? 不是
    • 线程安全有哪些实现思路?
    • 如何理解并发和并行的区别?
  • Java 并发 - 线程基础

    • 线程有哪几种状态? 分别说明从一种状态到另一种状态转变有哪些方式?
    • 通常线程有哪几种使用方式?
    • 基础线程机制有哪些?
    • 线程的中断方式有哪些?
    • 线程的互斥同步方式有哪些? 如何比较和选择?
    • 线程之间有哪些协作方式?

JUC BlockingQueue 和 ThreadPoolExecutor

  • JUC集合: BlockingQueue详解

    • 什么是BlockingDeque?
    • BlockingQueue大家族有哪些? ArrayBlockingQueue, DelayQueue, LinkedBlockingQueue, SynchronousQueue...
    • BlockingQueue适合用在什么样的场景?
    • BlockingQueue常用的方法?
    • BlockingQueue插入方法有哪些? 这些方法(add(o),offer(o),put(o),offer(o, timeout, timeunit))的区别是什么?
    • BlockingDeque 与BlockingQueue有何关系,请对比下它们的方法?
    • BlockingDeque适合用在什么样的场景?
    • BlockingDeque大家族有哪些?
    • BlockingDeque 与BlockingQueue实现例子?
  • JUC线程池: ThreadPoolExecutor详解

    • 为什么要有线程池?
    • Java是实现和管理线程池有哪些方式? 请简单举例如何使用。
    • 为什么很多公司不允许使用Executors去创建线程池? 那么推荐怎么使用呢?
    • ThreadPoolExecutor有哪些核心的配置参数? 请简要说明
    • ThreadPoolExecutor可以创建哪是哪三种线程池呢?
    • 当队列满了并且worker的数量达到maxSize的时候,会怎么样?
    • 说说ThreadPoolExecutor有哪些RejectedExecutionHandler策略? 默认是什么策略?
    • 简要说下线程池的任务执行机制? execute –> addWorker –>runworker (getTask)
    • 线程池中任务是如何提交的?
    • 线程池中任务是如何关闭的?
    • 在配置线程池的时候需要考虑哪些配置因素?
    • 如何监控线程池的状态?

Executor接口设计

Executor的设计很简单,在理解的时候需要理解两点:

  • 1.Tomcat希望将Executor也纳入Lifecycle生命周期管理,所以让它实现了Lifecycle接口
  • 2.引入超时机制:也就是说当work queue满时,会等待指定的时间,如果超时将抛出RejectedExecutionException,所以这里增加了一个void execute(Runnable command, long timeout, TimeUnit unit)方法; 其实本质上,它构造了JUC中ThreadPoolExecutor,通过它调用ThreadPoolExecutor的void execute(Runnable command, long timeout, TimeUnit unit)方法。
public interface Executor extends java.util.concurrent.Executor, Lifecycle {

    public String getName();

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the <code>Executor</code> implementation.
     * If no threads are available, it will be added to the work queue.
     * If the work queue is full, the system will wait for the specified
     * time until it throws a RejectedExecutionException
     *
     * @param command the runnable task
     * @param timeout the length of time to wait for the task to complete
     * @param unit    the units in which timeout is expressed
     *
     * @throws java.util.concurrent.RejectedExecutionException if this task
     * cannot be accepted for execution - the queue is full
     * @throws NullPointerException if command or unit is null
     */
    void execute(Runnable command, long timeout, TimeUnit unit);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

找到Executor的实现类

StandardThreadExecutor的实现

接下来我们看下具体的实现类StandardThreadExecutor。

理解相关配置参数

Executor官方配置说明文档 (opens new window)

  • 公共属性

Executor的所有实现都 支持以下属性:

属性 描述
className 实现的类。实现必须实现 org.apache.catalina.Executor接口。此接口确保可以通过其name属性引用对象并实现Lifecycle,以便可以使用容器启动和停止对象。className的默认值是org.apache.catalina.core.StandardThreadExecutor
name 用于在server.xml中的其他位置引用此池的名称。该名称是必需的,必须是唯一的。
  • StandardThreadExecutor属性

默认实现支持以下属性:

属性 描述
threadPriority (int)执行程序中线程的线程优先级,默认为 5(Thread.NORM_PRIORITY常量的值)
daemon (boolean)线程是否应该是守护程序线程,默认为 true
namePrefix (字符串)执行程序创建的每个线程的名称前缀。单个线程的线程名称将是namePrefix+threadNumber
maxThreads (int)此池中活动线程的最大数量,默认为 200
minSpareThreads (int)最小线程数(空闲和活动)始终保持活动状态,默认为 25
maxIdleTime (int)空闲线程关闭之前的毫秒数,除非活动线程数小于或等于minSpareThreads。默认值为60000(1分钟)
maxQueueSize (int)在我们拒绝之前可以排队等待执行的可运行任务的最大数量。默认值是Integer.MAX_VALUE
prestartminSpareThreads (boolean)是否应该在启动Executor时启动minSpareThreads,默认值为 false
threadRenewalDelay (long)如果配置了ThreadLocalLeakPreventionListener,它将通知此执行程序有关已停止的上下文。上下文停止后,池中的线程将被更新。为避免同时更新所有线程,此选项在任意2个线程的续订之间设置延迟。该值以ms为单位,默认值为1000ms。如果值为负,则不会续订线程。

Lifecycle模板方法

先看核心变量:

// 任务队列
private TaskQueue taskqueue = null;

// 包装了一个ThreadPoolExecutor
protected ThreadPoolExecutor executor = null;
1
2
3
4
5
  • initInternaldestroyInternal默认父类实现
@Override
protected void initInternal() throws LifecycleException {
    super.initInternal();
}
@Override
protected void destroyInternal() throws LifecycleException {
    super.destroyInternal();
}
1
2
3
4
5
6
7
8
  • startInternal方法

这个方法中,我们不难看出,就是初始化taskqueue,同时构造ThreadPoolExecutor的实例,后面Tomcat的StandardThreadExecutor的实现本质上通过ThreadPoolExecutor实现的。

/**
  * Start the component and implement the requirements
  * of {@link org.apache.catalina.util.LifecycleBase#startInternal()}.
  *
  * @exception LifecycleException if this component detects a fatal error
  *  that prevents this component from being used
  */
@Override
protected void startInternal() throws LifecycleException {

    taskqueue = new TaskQueue(maxQueueSize);
    TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
    executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
    executor.setThreadRenewalDelay(threadRenewalDelay);
    if (prestartminSpareThreads) {
        executor.prestartAllCoreThreads();
    }
    taskqueue.setParent(executor);

    setState(LifecycleState.STARTING);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  • stopInternal方法

代码很简单,关闭线程池后置null, 方便GC回收。

/**
  * Stop the component and implement the requirements
  * of {@link org.apache.catalina.util.LifecycleBase#stopInternal()}.
  *
  * @exception LifecycleException if this component detects a fatal error
  *  that needs to be reported
  */
@Override
protected void stopInternal() throws LifecycleException {

    setState(LifecycleState.STOPPING);
    if (executor != null) {
        executor.shutdownNow();
    }
    executor = null;
    taskqueue = null;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

核心executor方法

本质上就是调用ThreadPoolExecutor的实例的相关方法。

@Override
public void execute(Runnable command, long timeout, TimeUnit unit) {
    if (executor != null) {
        executor.execute(command,timeout,unit);
    } else {
        throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted"));
    }
}


@Override
public void execute(Runnable command) {
    if (executor != null) {
        try {
            executor.execute(command);
        } catch (RejectedExecutionException rx) {
            //there could have been contention around the queue
            if (!((TaskQueue) executor.getQueue()).force(command)) {
                throw new RejectedExecutionException(sm.getString("standardThreadExecutor.queueFull"));
            }
        }
    } else {
        throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted"));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

动态调整线程池

我们还注意到StandardThreadExecutor还实现了ResizeableExecutor,从名称上我们就可知道它是希望实现对线程池的动态调整,所以呢,它封装了一个ResizeableExecutor的接口,看下接口。

public interface ResizableExecutor extends Executor {

    /**
     * Returns the current number of threads in the pool.
     *
     * @return the number of threads
     */
    public int getPoolSize();

    public int getMaxThreads();

    /**
     * Returns the approximate number of threads that are actively executing
     * tasks.
     *
     * @return the number of threads
     */
    public int getActiveCount();

    public boolean resizePool(int corePoolSize, int maximumPoolSize);

    public boolean resizeQueue(int capacity);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

前三个方法比较简单,我们看下后两个方法是如何实现的, 其实也很简单。

@Override
public boolean resizePool(int corePoolSize, int maximumPoolSize) {
    if (executor == null)
        return false;

    executor.setCorePoolSize(corePoolSize);
    executor.setMaximumPoolSize(maximumPoolSize);
    return true;
}

// 默认没有实现
@Override
public boolean resizeQueue(int capacity) {
    return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

补充TaskQueue

我们知道工作队列是有TaskQueue保障的,它集成自LinkedBlockingQueue(一个阻塞的链表队列),来看下源代码吧。

/**
 * As task queue specifically designed to run with a thread pool executor. The
 * task queue is optimised to properly utilize threads within a thread pool
 * executor. If you use a normal queue, the executor will spawn threads when
 * there are idle threads and you wont be able to force items onto the queue
 * itself.
 */
public class TaskQueue extends LinkedBlockingQueue<Runnable> {

    private static final long serialVersionUID = 1L;
    protected static final StringManager sm = StringManager
            .getManager("org.apache.tomcat.util.threads.res");
    private static final int DEFAULT_FORCED_REMAINING_CAPACITY = -1;

    private transient volatile ThreadPoolExecutor parent = null;

    // No need to be volatile. This is written and read in a single thread
    // (when stopping a context and firing the listeners)
    private int forcedRemainingCapacity = -1;

    public TaskQueue() {
        super();
    }

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public TaskQueue(Collection<? extends Runnable> c) {
        super(c);
    }

    public void setParent(ThreadPoolExecutor tp) {
        parent = tp;
    }

    public boolean force(Runnable o) {
        if (parent == null || parent.isShutdown()) throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
        return super.offer(o); //forces the item onto the queue, to be used if the task is rejected
    }

    public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if (parent == null || parent.isShutdown()) throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
        return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
    }

    @Override
    public boolean offer(Runnable o) {
      //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }


    @Override
    public Runnable poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        Runnable runnable = super.poll(timeout, unit);
        if (runnable == null && parent != null) {
            // the poll timed out, it gives an opportunity to stop the current
            // thread if needed to avoid memory leaks.
            parent.stopCurrentThreadIfNeeded();
        }
        return runnable;
    }

    @Override
    public Runnable take() throws InterruptedException {
        if (parent != null && parent.currentThreadShouldBeStopped()) {
            return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),
                    TimeUnit.MILLISECONDS);
            // yes, this may return null (in case of timeout) which normally
            // does not occur with take()
            // but the ThreadPoolExecutor implementation allows this
        }
        return super.take();
    }

    @Override
    public int remainingCapacity() {
        if (forcedRemainingCapacity > DEFAULT_FORCED_REMAINING_CAPACITY) {
            // ThreadPoolExecutor.setCorePoolSize checks that
            // remainingCapacity==0 to allow to interrupt idle threads
            // I don't see why, but this hack allows to conform to this
            // "requirement"
            return forcedRemainingCapacity;
        }
        return super.remainingCapacity();
    }

    public void setForcedRemainingCapacity(int forcedRemainingCapacity) {
        this.forcedRemainingCapacity = forcedRemainingCapacity;
    }

    void resetForcedRemainingCapacity() {
        this.forcedRemainingCapacity = DEFAULT_FORCED_REMAINING_CAPACITY;
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106

TaskQueue这个任务队列是专门为线程池而设计的。优化任务队列以适当地利用线程池执行器内的线程。

如果你使用一个普通的队列,当有空闲线程executor将产生线程并且你不能强制将任务添加到队列。

为什么不是直接使用ThreadPoolExecutor

这里你是否考虑过一个问题,为什么Tomcat会自己构造一个StandardThreadExecutor而不是直接使用ThreadPoolExecutor?

从上面的代码,你会发现这里只是使用executor只是使用了execute的两个主要方法,它希望让调用层屏蔽掉ThreadPoolExecutor的其它方法:

  • 它体现的原则:最少知识原则: 只和你的密友谈话。也就是说客户对象所需要交互的对象应当尽可能少

  • 它体现的设计模式:结构型 - 外观(Facade)

    • 外观模式(Facade pattern),它提供了一个统一的接口,用来访问子系统中的一群接口,从而让子系统更容易使用

联系我

添加@pdai微信

PS:添加时请备注Java全栈,谢谢!