Java的Concurrent包

Java 5 添加了一个新的包到 Java 平台,java.util.concurrent 包。这个包包含有一系列能够让 Java 的并发编程变得更加简单轻松的类。包中内容大致有下面一些:

  • atomic文件夹:里面包含了一系列的基本数据类型对应的原子类比如AtomicInteger
  • locks文件夹:里面包含了显示锁Lock和相关的Condition、ReentrantLock类、读写锁ReentrantReadWriteLock类和很多同步工具都使用的AQS即AbstractQueuedLongSynchronizer类
  • 同步容器类:
    • 写时复制队列:CopyOnWriteArrayList
    • 阻塞队列:BlockingQueue类
    • HashMap:ConcurrentHashMap
  • 同步工具类:
    • 闭锁:CountDownLatch
    • 信号量:Semaphore
    • 栅栏:CyclicBarrier

一、同步工具类

1、CountDownLatch

在多线程程序设计中,经常会遇到一个线程等待一个或多个线程的场景

  • 如果是一个线程等待一个线程,则可以通过wait()和notify()来实现;
  • 如果是一个线程等待多个线程,则就可以使用CountDownLatch和CyclicBarrier来实现比较好的控制。

CountDownLatch是一种灵活实现的闭锁,在Java中的实现类为java.util.concurrent.CountDownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class CountDownLatchDemo {
public long timeTasks(int n, final Runnable task) throws InterruptedException{

final java.util.concurrent.CountDownLatch startGate=new CountDownLatch(1);
final CountDownLatch endGate=new CountDownLatch(n);

for(int i=0; i<n; i++){
Thread t=new Thread(){
public void run(){
try {
startGate.await();//所有线程都要先等待,然后统一开始执行
try{
task.run();
}finally{//latch.countDown()建议放在finally里面执行
endGate.countDown();//执行之后count-,知道为0,代表所有线程执行完毕
}
} catch (InterruptedException e) {
}
}
};
t.start();
}
long start=System.nanoTime();
startGate.countDown();//统一开始执行
endGate.await();//等待所有线程执行完毕
long end=System.nanoTime();
return end-start;
}
}

2、CycleBarrier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class CyclicBarrierDemo {

static class MyThread implements Runnable{
private CyclicBarrier barrier;
public MyThread(CyclicBarrier barrier){
this.barrier=barrier;
}

@Override
public void run() {
try{
System.out.println("worker is waiting");
barrier.await();
System.out.println("ID:"+Thread.currentThread().getId()+" Working");
}catch(Exception e){

}
}
}

public static void main(String[] args) {
CyclicBarrier barrier=new CyclicBarrier(5, new Runnable(){
@Override
public void run() {
System.out.println("Inseide barrier");
}

});
for(int i=0; i<5; i++){
new Thread(new MyThread(barrier)).start();
}
}
}
  1. CyclicBarrier初始化时规定一个数目,然后计算调用了CyclicBarrier.await()进入等待的线程数。当线程数达到了这个数目时,所有进入等待状态的线程被唤醒并继续。
  2. CyclicBarrier就象它名字的意思一样,可看成是个障碍, 所有的线程必须到齐后才能一起通过这个障碍。
  3. CyclicBarrier初始时还可带一个Runnable的参数, 此Runnable任务在CyclicBarrier的数目达到后,所有其它线程被唤醒前被执行。

CountDownLatch与CyclicBarrier比较

  1. CountDownLatch是一次性的,而CyclicBarrier在调用reset之后还可以继续使用。
  2. CountDownLatch是一个线程等待N个线程全部完成(计数为0),这个线程才可以继续执行。
  3. CyclicBarrier是多个线程之间相互等待,所有线程都完成了(到达初始化时指定的数值),再一起继续执行。

3、Semaphore

Java中实现计数信号量的类为java.util.concurrent.Semaphore,是在1.5中引入的。

  • Semaphore中管理着一组许可,许可的初始数量可以通过构造方法来指定。
  • 在执行操作时需要先获得许可(acquire),并在使用完后释放许可(release)。
  • 如果当前没有许可,那么acquire将阻塞下到有许可可用,或者直到被中断,或者操作超时。
  • 当初始值为1时,该信号量就可以实现互斥锁的功能。

Semaphore可以用于实现资源池,例如数据库的连接池。可以构建一个固定长度的资源池,当池为空时,请求资源就会失败,但Semaphore可以实现阻塞而非失败,并且当资源非空时接触阻塞。

可以使用Semaphore将任何容器变成有界的阻塞容器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class BoundedList<T> {  

private final List<T> list;
private final Semaphore semaphore;

public BoundedList(int bound) {
list = Collections.synchronizedList(new LinkedList<T>());
semaphore = new Semaphore(bound);
}

public boolean add(T obj) throws InterruptedException {
semaphore.acquire();
boolean addedFlag = false;
try {
addedFlag = list.add(obj);
}
finally {
if (!addedFlag) {
semaphore.release();
}
}
return addedFlag;
}

public boolean remove(Object obj) {
boolean removedFlag = list.remove(obj);
if (removedFlag) {
semaphore.release();
}
return removedFlag;
}

}

二、AbstractQueuedSynchronizer类

上面介绍的几个类,他们在实现的时候都用到了一个共同的基类,就是AbstractQueuedSynchronizer(AQS)类。

ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch、FuturTask等类都使用到了这个类。

他们都有各自获取锁的方法,同时相对于Java的内置锁,他们具有明显的优势:花最小的空间开销创建锁、最少的时间开销获得锁、使用更加方便灵活。

AQS解决了再实现同步器时涉及的大量细节问题,例如等待线程采用FIFO队列操作顺序。

在基于AQS的同步器类中,最基本的操作暴多各种形式的获取操作释放操作

下面是书中给出的AQS中获取操作与释放操作的形式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
boolean acauire() throws InterruptedException{
while(当前状态不允许获取操作){
if(需要阻塞获取操作){
如果当前县城不在队列中,则将其插入队列中
阻塞当前线程
}else
返回失败
}
可能更新同步器的状态
如果线程位于队列中,则将移出队列
返回成功
}

void release(){
更新同步器状态
if(新的状态允许某个被阻塞的线程获取成功){
解除队列中一个或多个线程的阻塞状态
}
}

发现await、signal方法的使用就是以这个为基础的。每个同步类中都可以看到一个内部类继承自AQS。

三、参考地址

http://www.cnblogs.com/techyc/archive/2013/03/13/2957059.html