大多数并发应用程序是以执行任务(task)为基本单位进行管理的。通常情况下,我们会为每个任务单独创建一个线程来执行。这样会带来两个问题:
- 大量的线程(>100)会消耗系统资源,使线程调度的开销变大,引起性能下降;
- 对于生命周期短暂的任务,频繁地创建和消亡线程并不是明智的选择。因为创建和消亡线程的开销可能会大于使用多线程带来的性能好处。
一种更加合理的使用多线程的方法是使用线程池(Thread Pool)。 java.util.concurrent 提供了一个灵活的线程池实现:Executor 框架。
一、Executor
Executor框架可以用于异步任务执行,而且支持很多不同类型的任务执行策略。它还为任务提交和任务执行之间的解耦提供了标准的方法,为使用 Runnable 描述任务提供了通用的方式。 Executor 的实现还提供了对生命周期的支持和 hook 函数,可以添加如统计收集、应用程序管理机制和监视器等扩展。
1、Executor接口
并发编程的一种编程方式是把任务拆分为一些列的小任务,即Runnable,然后在提交给一个Executor执行,Executor.execute(Runnalbe) 。Executor在执行时使用内部的线程池完成操作。
1 | public interface Executor { |
该接口只有一个execute方法,执行已提交的 Runnable 任务对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程。
Executor基于生产者消费者模式,提交任务的操作相当于生产者(生产待完成的工作单元),执行任务的线程则相当于消费者(执行完这些工作单元)。如果在程序中实现一个生产者消费者的设计,那么最简单的方式通常就是使用Executor。
1 | public static void main(String[] args) { |
上面代码有两个问题:
- 使用了Executors类的工厂方法来获得了一个Executor的实例,这个类会在下面详细介绍
- 这个程序会一直运行下去,不会终止。因为这个接口中并没有用于管理生命周期的方法。
2、ExecutorService接口
ExecutorService继承自Executor接口,添加了一些用于生命周期管理的方法,同时还有一些用于任务提交的遍历方法。
1 | public interface ExecutorService extends Executor { |
2.1、shutdown与shutdownNow
这两个方法都用于终止ExecutorService。
shutdown方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成(包括那些还没开始执行的任务)。
shutdownNow() 方法试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
无法保证能够停止正在处理的活动执行任务,但是会尽力尝试。例如,通过 Thread.interrupt() 来取消典型的实现,所以任何任务无法响应中断都可能永远无法终止。
2.2、submit方法
submit方法是execute的一个扩展,它传入的参数是可返回值的Callable。
- \
Future\ submit(Callable\ task): 提交一个返回值的任务用于执行,返回一个表示任务的返回结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。 - \
Future\ submit(Runnable task,T result):交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。 result表示返回的结果 - Future<?> submit(Runnable task):提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回 null。
3、Excutors类
Executors类,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。
1 | public static void main(String[] args) { |
newFixedThreadPool:
创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化。如果某个线程由于发生了未预期的Exception而结束,那么线程池会补充一个新的线程。
newCachedThreadPool:
创建一个可缓存的线程池,如果线程池当前规模超过了处理需求时,那么将回收空闲线程,而当需求增加时,则可以添加新的线程,线程池的规模不受任何限制。
newSingleThreadExecutor:
单线程的Executor,它创建单个工作者线程来执行任务,如果这个线程异常结束,会创建一个线程来替代。
newSingleThreadExecutor能够确保依照任务在队列中的顺序来串行执行。
newScheduledThreadPool:
创建一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于Timer。
newFixedThreadPool和newCachedThreadPool这两个方法都返回ThreadPoolExecutor实例,这些实例可以直接用来构造专门用途的executor。
二、ThreadPoolExecutor
ThreadPoolExecutor是ExecutorService的一个实现类,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置。
获取ThreadPoolExecutor实例有一下几个方法:
- new关键词调用构造函数
- Executors工厂方法获取
- Executors.newCachedThreadPool()
- Executors.newFixedThreadPool(int)
- Executors.newSingleThreadExecutor()
通过Executors类获取的三种上面已经介绍过,它们为大多数使用场景预定义了设置。如果需要自己定制属性,那么就需要通过构造函数显式的创建,构造函数如下:
1 | public ThreadPoolExecutor(int corePoolSize, |
1、线程池的大小
线程池的大小由一下两个参数决定:
- corePoolSize:线程池的基本大小,
- maximumPoolSize:线程池的最大大小
- 当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使有线程是空闲的。
- 如果运行的线程多于corePoolSize而少于maximumPoolSize,则仅当队列满时才创建新线程。
- 如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池,即newFixedThreadPool
- 如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务,即newCachedThreadPool
2、任务队列
如果提交的任务超过了线程池的处理速度,那么新到达的请求将积累起来。在ThreadPoolExecutor中有一个专门管理这些Runnable的队列。它们会在这个队列中等待。基本的任务排队方法有三种:
- 无界队列
- 有界队列
- 同步移交
2.1、无界队列
newFixedThreadPool(int)和newSingleThreadExecutor()在默认情况下使用一个无界的LinkedBlockingQueue。如果所有的工作者线程都处于忙碌状态,那么任务将在队列中等候。
如果任务持续快速到达,并且超过了线程池处理它们的速度,那么队列将无限制的增长。
2.2、有界队列
一种更稳妥的资源管理策略是使用有界队列,例如ArrayBlockingQueue,有界的LinkedBlockingQueue、PriorityBlockingQueue。有界队列有助于避免资源耗尽的情况,但同时也带来一个新的问题,当队列填满之后,新来的任务怎么办?这个主要涉及到下面的饱和策略。
队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销。
LinkedBlockingQueue或者ArrayBlockingQueue可以保证先进先出,而如果使用PriorityBlockingQueue,则可以根据优先级来安排。
2.3、同步移交
工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。
SynchronousQueue不是一个真正的队列,而是一种在线程之间进行移交的机制。
- 要将一个元素放入SynchronousQueue中,必须有一个线程正在等待接收这个元素。
- 如果没有线程正在等待,并且线程池的大小小于最大值,那么ThreadPoolExecutor会创建一个新的线程。
- 否则,根据饱和策略,这个任务将会被拒绝。
newCachedThreadPool()返回的ThreadPoolExecutor中就使用了SynchronousQueue。
对于Executor,newCachedThreadPool工厂方法是一个很好的默认选择,它能提供比固定大小的线程池更好的排队性能。
3、饱和策略
上面提到当有界队列被填满或者SynchronousQueue超出设置的最大值的时候,饱和策略开始发挥作用。
构造函数中看到了最后一个参数RejectedExecutionHandler handler就是饱和策略。
JDK提供了几种不同的实现:
- AbortPolicy
- CallerRunsPolicy
- DiscardPolicy
- DiscardOldestPolicy
3.1、Abort 中止
中止策略是默认的饱和策略,就是中止任务,该策略将抛出RejectedExecutionException。调用者可以捕获这个异常然后去编写代码处理异常。
3.2、Discard 抛弃
当新提交的任务无法保存到队列中等待执行时
- DiscardPolicy会稍稍的抛弃该任务
- DiscardOldestPolicy则会抛弃最旧的(下一个将被执行的任务),然后尝试重新提交新的任务。
如果工作队列是那个优先级队列时,搭配DiscardOldestPolicy饱和策略会导致优先级最高的那个任务被抛弃,所以两者不要组合使用。
3.3、CallerRuns 调用者运行
CallerRunsPolicy是“调用者运行”策略,实现了一种调节机制 。它不会抛弃任务,也不会抛出异常。 而是将任务回退到调用者。它不会在线程池中执行任务,而是在一个调用了Executor的线程中执行该任务。
4、线程工厂
每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的。而不是传统的new Thread。
默认情况下为Executors.defaultThreadFactory(),ThreadFactory接口:
1 | public interface ThreadFactory { |
我们也可以采用自定义的ThreadFactory工厂,增加对线程创建与销毁等更多的控制,并且作为参数来传入线程池中,这样就可以使用自己定义的线程工厂来创建线程。
1 | import java.util.concurrent.ExecutorService; |
输出
1 | Thread: 1 created |
每次的输出都不确定,因为线程池不一定会创建多少个线程,由于任务执行的比较快,有可能一个线程执行多次任务,这样虽然打印出的run与输入的相同,但是却不一定创建了那么多个数量的线程,这也体现出了使用线程池的优势,就是减少了多次线程创建与销毁所消耗的时间。
三、参考地址
http://blog.csdn.net/linghu_java/article/details/17123057