在Java 1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarrier和Semaphore,今天我们就来学习一下这三个辅助类的用法。
1. CountDownLatch用法
直译过来就是倒计数(CountDown)门闩(Latch)。倒计数不用说,门闩的意思顾名思义就是阻止前进。在这里就是指 CountDownLatch.await() 方法在倒计数为0之前会阻塞当前线程。
CountDownLatch 的作用和 Thread.join() 方法类似,可用于一组线程和另外一组线程的协作。例如,主线程在做一项工作之前需要一系列的准备工作,只有这些准备工作都完成,主线程才能继续它的工作。这些准备工作彼此独立,所以可以并发执行以提高速度。在这个场景下就可以使用 CountDownLatch 协调线程之间的调度了。在直接创建线程的年代(Java 5.0 之前),我们可以使用 Thread.join()。在 JUC 出现后,因为线程池中的线程不能直接被引用,所以就必须使用 CountDownLatch 了。
1.1. CountDownLatch接口
CountDownLatch类只提供了一个构造器:
1 | public CountDownLatch(int count) { }; // 参数count为计数值 |
然后下面这3个方法是CountDownLatch类中最重要的方法:
1 | // 调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行 |
1.2. CountDownLatch应用例子
下面看一个例子大家就清楚CountDownLatch的用法了:
下面的这个例子可以理解为 F1 赛车的维修过程,只有 startSignal (可以表示停车,可能名字不太贴合)命令下达之后,维修工才开始干活,只有等所有工人完成工作之后,赛车才能继续。
1 | package com.wxweven.concurrent; |
执行结果:
1 | 正在执行准备工作... |
2. CyclicBarrier用法
CyclicBarrier 翻译过来叫循环栅栏、循环障碍什么的。它主要的方法就是一个:await()
。await() 方法每被调用一次,计数便会减少1,并阻塞住当前线程。当计数减至0时,阻塞解除,所有在此 CyclicBarrier 上面阻塞的线程开始运行。在这之后,如果再次调用 await() 方法,计数就又会变成 N-1,新一轮重新开始,这便是 Cyclic 的含义所在。
CyclicBarrier 的使用并不难,但需要注意它所相关的异常。除了常见的异常,CyclicBarrier.await() 方法会抛出一个独有的 BrokenBarrierException。这个异常发生在当某个线程在等待本 CyclicBarrier 时被中断或超时或被重置时,其它同样在这个 CyclicBarrier 上等待的线程便会受到 BrokenBarrierException。意思就是说,同志们,别等了,有个小伙伴已经挂了,咱们如果继续等有可能会一直等下去,所有各回各家吧。
CyclicBarrier.await() 方法带有返回值,用来表示当前线程是第几个到达这个 Barrier 的线程。
和 CountDownLatch 一样,CyclicBarrier 同样可以可以在构造函数中设定总计数值。与 CountDownLatch 不同的是,CyclicBarrier 的构造函数还可以接受一个 Runnable,会在 CyclicBarrier 被释放时执行。
2.1. CyclicBarrier接口
CyclicBarrier类位于java.util.concurrent包下,CyclicBarrier提供2个构造器:
1 | public CyclicBarrier(int parties, Runnable barrierAction) { |
参数parties指让多少个线程或者任务等待至barrier状态;参数barrierAction为当这些线程都达到barrier状态时会执行的内容。
然后CyclicBarrier中最重要的方法就是await方法,它有2个重载版本:
1 | public int await() throws InterruptedException, BrokenBarrierException { }; |
第一个版本比较常用,用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;
第二个版本是让这些线程等待至一定的时间,如果还有线程没有到达barrier状态就直接让到达barrier的线程执行后续任务。
2.2. CyclicBarrier应用例子
下面举几个例子就明白了:
假若有若干个线程对一个数组的不同部分进行赋值,并且只有所有线程都完成赋值之后,才能对数组进行最后的汇总,此时就可以利用CyclicBarrier了:
1 | package com.wxweven.concurrent; |
执行结果:
1 | 正在执行准备工作... |
从上面输出结果可以看出,每个写入线程对数组赋值完成之后,就在等待其他线程赋值完毕。
当所有线程线程赋值完毕之后,就可以对数组进行汇总操作了。
CyclicBarrier提供的Runnable参数,用于在所有线程工作完成后需要执行的任务,(比如上述例子中,所有线程执行完对数组的赋任务后,还需要执行一个汇总的任务),CyclicBarrier会将该任务 随机分配 给已完成工作的线程去执行(这一点设计也是很优秀的,直接复用之前的线程,而不是再去起新的线程了!),比如示例运行中的线程3。
2.3. CyclicBarrier设置等待超时
下面看一下为await指定时间的效果:
1 | package com.wxweven.concurrent; |
执行结果:
1 | 正在执行准备工作... |
上面的代码46行中,故意让最后一个线程启动延迟,因为在前面三个线程都达到barrier之后,等待了指定的时间发现第四个线程还没有达到barrier,就抛出异常并继续执行后面的任务。
2.4. CyclicBarrier重用
另外CyclicBarrier是可以重用的,看下面这个例子:
1 | package com.wxweven.concurrent; |
执行结果:
1 | 正在执行准备工作... |
从执行结果可以看出,在初次的4个线程越过barrier状态后,又可以用来进行新一轮的使用。而CountDownLatch无法进行重复使用。
3. Semaphore用法
Semaphore翻译成字面意思为 信号量,Semaphore可以控同时访问的线程个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。
3.1. Semaphore接口
Semaphore类位于java.util.concurrent包下,它提供了2个构造器:
1 | public Semaphore(int permits) { |
下面说一下Semaphore类中比较重要的几个方法,首先是acquire()、release()方法:
1 | public void acquire() throws InterruptedException { } // 获取一个许可 |
acquire()用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可。
release()用来释放许可。注意,在释放许可之前,必须先获获得许可。
这4个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法:
1 | // 尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false |
另外还可以通过availablePermits()方法得到可用的许可数目。
3.2. Semaphore应用例子
下面通过一个例子来看一下Semaphore的具体使用:
假若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过Semaphore来实现:
1 | package com.wxweven.concurrent; |
执行结果:
1 | 工人1开始工作,占用一台机器... |
4. 总结
CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:
- CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;
- 而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
- 另外,CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。
Semaphore其实和锁有点类似,它一般用于控制对某组资源的访问权限。