java.util.concurrent 中包含了几个能帮助人们管理相互合作的同步工具类,本文介绍了其中的 CountDownLatch、 CyclicBarrier 以及 Semaphore。
CountDownLatch
闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态。CountDownLatch 是一种灵活的闭锁实现,它可以使一个或者多个线程等待一组事件的发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown 方法递减计数器,表示有一个事件已经发生,而 await 方法等待计数器达到零,这表示需要等待的事情都已经发生。如果计数器的值非零,那么 await 会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。
下面这个例子给出了闭锁的两种常见用法。startLatch 计数器初始值为 1, endLatch 计数器初始值为工作线程的数量。每个工作线程首先要做的就是在 startLatch 上等待,从而确保所有线程都就绪后才开始执行。而每个线程要做的最后一件事情是调用 endLatch 的 countDown 方法,这能使主线程高效的等待直到所有的线程都执行完成。
1 | // 修改自《Java Concurrency in Practice》 5-11 的 TestHarness |
CountDownLatch 是通过 AbstractQueuedSynchronizer(AQS)实现的,具体细节可参考 这里。
CyclicBarrier
我们已经看到通过闭锁来启动一组相关的操作,或者等待一组相关的操作结束。闭锁是一次性对象,一旦进入终止状态,就不能被重置。
栅栏(Barrier)类似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏和闭锁的关键区别在于,所有线程必须同时达到栅栏位置,才能继续执行。CyclicBarrier 可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用,有兴趣的可以看下 CyclicBarrier example: a parallel sort algorithm。
当现成达到栅栏位置时将调用 await 方法这个方法将阻塞直到所有线程都达到栅栏位置。如果所有线程都达到了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置一遍下次使用。如果对 await 的调用超时,或者 await 阻塞的线程被中断,那么栅栏就被认为是被打破了,所有阻塞的 await 调用都将终止并抛出 BrokenBarrierException。如果成功通过栅栏,那么 await 将为每个线程返回一个唯一的到达索引号。CyclicBarrier 还可以使你将一个栅栏操作传递给构造函数,这是一个 Runnable,当成功通过栅栏时会(在一个子任务线程中)执行它,但在阻塞线程被释放之前是不能执行的。下面是一个 Two Step 的例子。
1 | import java.util.concurrent.*; |
有关 CountDownLatch 和 CyclicBarrier 区别的讨论,请移步 Stack Overflow。
Semaphore
计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的访问数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。
Semaphore 中管理着一组虚拟的许可(permit),许可的初始容量可通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么 acquire 将阻塞直到有许可(或者直到被中断或者操作超时)。release 方法将返回一个许可给信号量。二值信号量(即初始值为 1 的 Semaphore)可以用做互斥体(mutex),并具备不可重入的加锁语义:谁拥有这个唯一的许可,谁就拥有了互斥锁。因此,它可以用来 实现生产者-消费者模式。
下面是一个 Semaphore 的实例。
1 | import java.util.concurrent.*; |
推荐阅读
Brian Goetz 等. Java并发编程实战[M]. 机械工业出版社, 2012.
CyclicBarrier: concordinating the stages of a multithreaded operation