Java中的线程池

大多数并发应用程序是以执行任务(task)为基本单位进行管理的。通常情况下,我们会为每个任务单独创建一个线程来执行。这样会带来两个问题:

  1. 大量的线程(>100)会消耗系统资源,使线程调度的开销变大,引起性能下降;
  2. 对于生命周期短暂的任务,频繁地创建和消亡线程并不是明智的选择。因为创建和消亡线程的开销可能会大于使用多线程带来的性能好处。

一种更加合理的使用多线程的方法是使用线程池(Thread Pool)。 java.util.concurrent 提供了一个灵活的线程池实现:Executor 框架。

一、Executor

Executor框架可以用于异步任务执行,而且支持很多不同类型的任务执行策略。它还为任务提交和任务执行之间的解耦提供了标准的方法,为使用 Runnable 描述任务提供了通用的方式。 Executor 的实现还提供了对生命周期的支持和 hook 函数,可以添加如统计收集、应用程序管理机制和监视器等扩展。

1、Executor接口

并发编程的一种编程方式是把任务拆分为一些列的小任务,即Runnable,然后在提交给一个Executor执行,Executor.execute(Runnalbe) 。Executor在执行时使用内部的线程池完成操作。

1
2
3
public interface Executor {
void execute(Runnable command);
}

该接口只有一个execute方法,执行已提交的 Runnable 任务对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程。

Executor基于生产者消费者模式,提交任务的操作相当于生产者(生产待完成的工作单元),执行任务的线程则相当于消费者(执行完这些工作单元)。如果在程序中实现一个生产者消费者的设计,那么最简单的方式通常就是使用Executor。

1
2
3
4
5
6
7
8
public static void main(String[] args) {
Executor executor=Executors.newCachedThreadPool();
executor.execute(new Runnable(){
public void run(){
System.out.println("hello");
}
});
}

上面代码有两个问题:

  1. 使用了Executors类的工厂方法来获得了一个Executor的实例,这个类会在下面详细介绍
  2. 这个程序会一直运行下去,不会终止。因为这个接口中并没有用于管理生命周期的方法。

2、ExecutorService接口

ExecutorService继承自Executor接口,添加了一些用于生命周期管理的方法,同时还有一些用于任务提交的遍历方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

2.1、shutdown与shutdownNow

这两个方法都用于终止ExecutorService。

  • shutdown方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成(包括那些还没开始执行的任务)。

  • shutdownNow() 方法试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。

    无法保证能够停止正在处理的活动执行任务,但是会尽力尝试。例如,通过 Thread.interrupt() 来取消典型的实现,所以任何任务无法响应中断都可能永远无法终止。

2.2、submit方法

submit方法是execute的一个扩展,它传入的参数是可返回值的Callable。

  • \ Future\ submit(Callable\ task): 提交一个返回值的任务用于执行,返回一个表示任务的返回结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。
  • \ Future\ submit(Runnable task,T result):交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。 result表示返回的结果
  • Future<?> submit(Runnable task):提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回 null。

3、Excutors类

Executors类,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。

1
2
3
4
5
6
public static void main(String[] args) {
ExecutorService executor1=Executors.newFixedThreadPool(nThreads);
ExecutorService executor2=Executors.newCachedThreadPool();
ExecutorService executor3=Executors.newSingleThreadExecutor();
ExecutorService executor4=Executors.newScheduledThreadPool(corePoolSize);
}
  1. newFixedThreadPool:

    创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化。如果某个线程由于发生了未预期的Exception而结束,那么线程池会补充一个新的线程。

  2. newCachedThreadPool:

    创建一个可缓存的线程池,如果线程池当前规模超过了处理需求时,那么将回收空闲线程,而当需求增加时,则可以添加新的线程,线程池的规模不受任何限制。

  3. newSingleThreadExecutor:

    单线程的Executor,它创建单个工作者线程来执行任务,如果这个线程异常结束,会创建一个线程来替代。

    newSingleThreadExecutor能够确保依照任务在队列中的顺序来串行执行。

  4. newScheduledThreadPool:

    创建一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于Timer。

newFixedThreadPool和newCachedThreadPool这两个方法都返回ThreadPoolExecutor实例,这些实例可以直接用来构造专门用途的executor。

二、ThreadPoolExecutor

ThreadPoolExecutor是ExecutorService的一个实现类,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置。

获取ThreadPoolExecutor实例有一下几个方法:

  • new关键词调用构造函数
  • Executors工厂方法获取
    • Executors.newCachedThreadPool()
    • Executors.newFixedThreadPool(int)
    • Executors.newSingleThreadExecutor()

通过Executors类获取的三种上面已经介绍过,它们为大多数使用场景预定义了设置。如果需要自己定制属性,那么就需要通过构造函数显式的创建,构造函数如下:

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {}

1、线程池的大小

线程池的大小由一下两个参数决定:

  • corePoolSize:线程池的基本大小,
  • maximumPoolSize:线程池的最大大小
  • 当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使有线程是空闲的。
  • 如果运行的线程多于corePoolSize而少于maximumPoolSize,则仅当队列满时才创建新线程。
  • 如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池,即newFixedThreadPool
  • 如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务,即newCachedThreadPool

2、任务队列

如果提交的任务超过了线程池的处理速度,那么新到达的请求将积累起来。在ThreadPoolExecutor中有一个专门管理这些Runnable的队列。它们会在这个队列中等待。基本的任务排队方法有三种:

  • 无界队列
  • 有界队列
  • 同步移交

2.1、无界队列

newFixedThreadPool(int)和newSingleThreadExecutor()在默认情况下使用一个无界的LinkedBlockingQueue。如果所有的工作者线程都处于忙碌状态,那么任务将在队列中等候。

如果任务持续快速到达,并且超过了线程池处理它们的速度,那么队列将无限制的增长。

2.2、有界队列

一种更稳妥的资源管理策略是使用有界队列,例如ArrayBlockingQueue,有界的LinkedBlockingQueue、PriorityBlockingQueue。有界队列有助于避免资源耗尽的情况,但同时也带来一个新的问题,当队列填满之后,新来的任务怎么办?这个主要涉及到下面的饱和策略。

队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销。

LinkedBlockingQueue或者ArrayBlockingQueue可以保证先进先出,而如果使用PriorityBlockingQueue,则可以根据优先级来安排。

2.3、同步移交

工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。

SynchronousQueue不是一个真正的队列,而是一种在线程之间进行移交的机制。

  • 要将一个元素放入SynchronousQueue中,必须有一个线程正在等待接收这个元素。
  • 如果没有线程正在等待,并且线程池的大小小于最大值,那么ThreadPoolExecutor会创建一个新的线程。
  • 否则,根据饱和策略,这个任务将会被拒绝。

newCachedThreadPool()返回的ThreadPoolExecutor中就使用了SynchronousQueue。

对于Executor,newCachedThreadPool工厂方法是一个很好的默认选择,它能提供比固定大小的线程池更好的排队性能。

3、饱和策略

上面提到当有界队列被填满或者SynchronousQueue超出设置的最大值的时候,饱和策略开始发挥作用。

构造函数中看到了最后一个参数RejectedExecutionHandler handler就是饱和策略。

JDK提供了几种不同的实现:

  • AbortPolicy
  • CallerRunsPolicy
  • DiscardPolicy
  • DiscardOldestPolicy

3.1、Abort 中止

中止策略是默认的饱和策略,就是中止任务,该策略将抛出RejectedExecutionException。调用者可以捕获这个异常然后去编写代码处理异常。

3.2、Discard 抛弃

当新提交的任务无法保存到队列中等待执行时

  • DiscardPolicy会稍稍的抛弃该任务
  • DiscardOldestPolicy则会抛弃最旧的(下一个将被执行的任务),然后尝试重新提交新的任务。

如果工作队列是那个优先级队列时,搭配DiscardOldestPolicy饱和策略会导致优先级最高的那个任务被抛弃,所以两者不要组合使用。

3.3、CallerRuns 调用者运行

CallerRunsPolicy是“调用者运行”策略,实现了一种调节机制 。它不会抛弃任务,也不会抛出异常。 而是将任务回退到调用者。它不会在线程池中执行任务,而是在一个调用了Executor的线程中执行该任务。

4、线程工厂

每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的。而不是传统的new Thread。

默认情况下为Executors.defaultThreadFactory(),ThreadFactory接口:

1
2
3
public interface ThreadFactory {  
Thread newThread(Runnable r);
}

我们也可以采用自定义的ThreadFactory工厂,增加对线程创建与销毁等更多的控制,并且作为参数来传入线程池中,这样就可以使用自己定义的线程工厂来创建线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadFactoryDemo implements Runnable{

static class WorkerThread extends Thread{
private Runnable target;

public WorkerThread(Runnable target, int cnt){
super(String.valueOf(cnt));
this.target=target;
}

public void run(){
try{
target.run();
}finally{
System.out.println(Thread.currentThread().getName()+"end");
}
}
}

public static void main(String[] args) throws InterruptedException {
ExecutorService executorService=Executors.newCachedThreadPool(new ThreadFactory(){
private AtomicInteger count=new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
int cnt=count.incrementAndGet();
System.out.println("Thread: "+cnt+" created");
return new WorkerThread(r, cnt);
}
});

executorService.execute(new ThreadFactoryDemo());
executorService.execute(new ThreadFactoryDemo());
executorService.execute(new ThreadFactoryDemo());
executorService.execute(new ThreadFactoryDemo());
executorService.execute(new ThreadFactoryDemo());
executorService.execute(new ThreadFactoryDemo());
executorService.execute(new ThreadFactoryDemo());
System.out.println("shutdown");
executorService.shutdown();

}
@Override
public void run() {
System.out.println("Thread: "+Thread.currentThread().getName()+" run");
}
}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Thread: 1 created
Thread: 2 created
Thread: 1 run
Thread: 2 run
Thread: 3 created
Thread: 3 run
Thread: 2 run
Thread: 1 run
Thread: 4 created
Thread: 3 run
shutdown
Thread: 4 run
4end
2end
1end
3end

每次的输出都不确定,因为线程池不一定会创建多少个线程,由于任务执行的比较快,有可能一个线程执行多次任务,这样虽然打印出的run与输入的相同,但是却不一定创建了那么多个数量的线程,这也体现出了使用线程池的优势,就是减少了多次线程创建与销毁所消耗的时间。

三、参考地址

http://blog.csdn.net/linghu_java/article/details/17123057

http://www.cnblogs.com/xingele0917/p/4110634.html

http://guojuanjun.blog.51cto.com/277646/650981/