Java 并发基础(四):再谈 CyclicBarrier

你好,我是看山。

java.util.concurrent.CyclicBarrier 也是 JDK 1.5 提供的一个同步辅助类(为什么用也呢?参见 再谈 CountDownLatch),它允许一组线程互相等待,直到到达某个临界点(a common barrier point,翻译成公共障碍点、公共栅栏点都不够传神,直接用临界点吧)。在某个程序中,一组固定大小的线程必须互相等待时,CyclicBarrier 将起很大的作用。因为在等待线程被释放后,这个临界点可以重用,所以说是循环的。

CyclicBarrier 支持一个可选的 Runnable,在一组线程中的最后一个线程完成之后、释放所有线程之前,该 Runnable 在屏障点运行一次(每循环一次 Runnable 运行一次)。这种方式可以用来在下一波继续运行的线程运行之前更新共享状态(比如下一波僵尸来之前,检查武器弹药)。

CountDownLatch 与 CyclicBarrier

CountDownLatch 是不能够重复使用的,是一次性的,其锁定一经打开,就不能够在重复使用。就像引线,点燃后就在燃烧减少,燃烧完了就不能再次使用了。CyclicBarrier 是一种循环的方式进行锁定,这次锁定被打开之后,还能够重复计数,再次使用。就像沙漏,这次漏完了,倒过来接着漏。

还有一点是两者之间很大的区别,就是 CountDownLatch 在等待子线程的过程中,会锁定主线程,而 CyclicBarrier 不会锁定主线程,只是在所有子线程结束后,根据定义执行其可选的 Runnable 线程。

所以在这两种辅助类中进行选择时,能够很明显进行区分。

CyclicBarrier 实例

可以考虑这么一种情况,我们需要向数据库导入一些数据,没导入几条希望能进行一次计时,便于我们查看。因为实现比较简单,直接上代码:

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class CyclicBarrierTest {
    public static void main(String[] args) throws InterruptedException {
        final long start = System.currentTimeMillis();
        final CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
            @Override
            public void run() {
                long end = System.currentTimeMillis();
                System.out.println("导入" + 3 + "条数据,至此总共用时:" + (end - start)
                        + "毫秒");
            }
        });

        for (int i = 0; i < 9; i++) {
            final int threadID = i + 1;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(new Random().nextInt(10));// 模拟业务操作
                        System.out.println(threadID + "完成导入操作。");
                        barrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
        System.out.println("====主线程结束====");
    }
}

执行结果为:

====主线程结束====
4 完成导入操作。
2 完成导入操作。
1 完成导入操作。
导入 3 条数据,至此总共用时:4006 毫秒
5 完成导入操作。
6 完成导入操作。
8 完成导入操作。
导入 3 条数据,至此总共用时:4007 毫秒
3 完成导入操作。
0 完成导入操作。
7 完成导入操作。
导入 3 条数据,至此总共用时:8006 毫秒

程序没导入 3 条会进行一次计时,统计已经执行的时间。如果 CyclicBarrier 构造函数的数字和 for 循环的次数相等的话,这个就是总共用时。

扩展

考虑一下上面的例子,如果 for 循环的次数不是 CyclicBarrier 监听次数的整数倍,比如是 10,那执行结果将会是:

====主线程结束====
2 完成导入操作。
5 完成导入操作。
4 完成导入操作。
导入 3 条数据,至此总共用时:4005 毫秒
8 完成导入操作。
1 完成导入操作。
3 完成导入操作。
导入 3 条数据,至此总共用时:5005 毫秒
7 完成导入操作。
6 完成导入操作。
0 完成导入操作。
导入 3 条数据,至此总共用时:8005 毫秒
9 完成导入操作。

在打印完“9 完成导入操作。”之后,将一直等待。在这里可以通过 barrier.getNumberWaiting() 查看还差多少个线程达到屏障点。如果出现这种情况,那就需要和 CountDownLatch 配合使用了,当子线程全部执行完,有判断 barrier.getNumberWaiting() 不等于 0,则调用 barrier.reset() 重置。这个时候将会触发 BrokenBarrierException 异常,但是将结束整个过程。

修改的代码如下:

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class CyclicBarrierTest {
    public static void main(String[] args) throws InterruptedException {
        final long start = System.currentTimeMillis();
        final CountDownLatch count = new CountDownLatch(10);
        final CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
            @Override
            public void run() {
                long end = System.currentTimeMillis();
                System.out.println("导入" + 3 + "条数据,至此总共用时:" + (end - start)
                        + "毫秒");
            }
        });

        for (int i = 0; i < 10; i++) {
            final int threadID = i + 1;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(new Random().nextInt(10));// 模拟业务操作
                        System.out.println(threadID + "完成导入操作。");
                        count.countDown();
                        barrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        System.out.println("触发 BrokenBarrierException 异常。");
                    }
                }
            }).start();
        }
        count.await();

        if(barrier.getNumberWaiting() != 0) {
            System.out.println("不是整数倍。都已执行完,重置 CyclicBarrier。");
            barrier.reset();
        }

        System.out.println("====主线程结束====");
    }
}

执行结果为:

3 完成导入操作。
9 完成导入操作。
6 完成导入操作。
导入 3 条数据,至此总共用时:3005 毫秒
8 完成导入操作。
5 完成导入操作。
10 完成导入操作。
导入 3 条数据,至此总共用时:7005 毫秒
1 完成导入操作。
7 完成导入操作。
4 完成导入操作。
2 完成导入操作。
导入 3 条数据,至此总共用时:9005 毫秒
不是整数倍。都已执行完,重置 CyclicBarrier。
====主线程结束====
触发 BrokenBarrierException 异常。

使用 barrier.reset() 进行重置,因为 CyclicBarrier 是一个循环,开头就是结尾,所以重置也可以理解为直接完成。

另外,因为使用了 CountDownLatch,所以主线程会锁定,直到线程通过 count.await() 向下执行。

推荐阅读


你好,我是看山,公众号:看山的小屋,10 年老猿,Apache Storm、WxJava、Cynomys 开源贡献者。游于码界,戏享人生。

个人主页:https://www.howardliu.cn
个人博文:Java 并发基础(四):再谈 CyclicBarrier
CSDN 主页:http://blog.csdn.net/liuxinghao
CSDN 博文:Java 并发基础(四):再谈 CyclicBarrier

公众号:看山的小屋