引言

JDK并发包中提供了CountDownLatch、CyclicBarrier、Semaphore工具类来实现并发流程的控制。 Exchanger提供了线程间交换数据的方法。

join()方法

需求:实现主程序要等待其他线程完成后,在继续执行。很容易想到使用join()方法来实现。

public class JoinCountDownLatchTest {
    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(() -> {
            System.out.println("执行1");
        });
        Thread thread2 = new Thread(() -> {
            System.out.println("执行2");
        });

        thread1.start();
        thread2.start();

        thread1.join();
        thread2.join();
        System.out.println("主线程");
    }
}

/* 输出结果:
执行1
执行2
主线程
*/

join()原理是不停的检查join线程是否存活,直到join线程都中止,线程才会this.notifyAll()。

CountDownLatch类

public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {

        CountDownLatch countDownLatch = new CountDownLatch(2);
        Thread thread1 = new Thread(() -> {
            System.out.println("执行1");
            countDownLatch.countDown();
        });

        Thread thread2 = new Thread(() -> {
            System.out.println("执行2");
            countDownLatch.countDown();  
        });
        thread1.start();
        thread2.start();

        countDownLatch.await(10, TimeUnit.MILLISECONDS);

        System.out.println("主线程");
    }
}

说明:

CountDownLatch类的构造函数需要传入数字N,表示你想等待N个点(N个线程或N个步骤)。上述代码中, 传入的N为2,则表示主线程要等待两个点后,才会继续执行主程序,在此之前程序会一直阻塞在countDownLatch.await()方法这, (上述程序的await方法带一个指定时间,只会阻塞到指定时间)。其中调用countDown()函数时,N会减1,当N=0时, 结束阻塞。

若将上述类中的方法该成如下代码,则执行1,执行2会在主线程之前打印,但是等待i不一定会在主线程之前打印, 想要实现,需要将构造中的N改为3,在for循环后面调用countDown()方法。

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        Thread thread1 = new Thread(() -> {
            System.out.println("执行1");
            countDownLatch.countDown();
        });

        Thread thread2 = new Thread(() -> {
            System.out.println("执行2");
            countDownLatch.countDown();
            for(int i = 0; i < 1000; i++) {
                System.out.println("等待"+ (i+1));
            }
//            countDownLatch.countDown();
        });
        thread1.start();
        thread2.start();

        countDownLatch.await();

        System.out.println("主线程");
    }

CyclicBarrier类

public class CyclicBarrierTest {
    public static void main(String[] args) throws InterruptedException {

        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        // 若将3改为4,程序会一直等待,没有4个线程到达屏障
//        CyclicBarrier cyclicBarrier = new CyclicBarrier(4);

        // 若将3改为4,程序会一直等待,没有4个线程到达屏障, 优先执行当任务也不会执行
        CyclicBarrier cyclicBarrier2 = new CyclicBarrier(3, new Runnable() {
            @Override
            public void run() {
                System.out.println("优先执行");
            }
        });

        Thread thread1 = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                System.out.println("到达屏障前");
            }
            try {
                cyclicBarrier.await();
                cyclicBarrier2.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("执行1");
        });

        Thread thread2 = new Thread(() -> {
            try {
                cyclicBarrier.await();
                cyclicBarrier2.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("执行2");
        });

        thread1.start();
        thread2.start();

        try {
            cyclicBarrier.await();
            cyclicBarrier2.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        Thread.sleep(1000);

        System.out.println("主线程");
    }
}

CyclicBarrier让一组线程到达一个屏障时被阻塞,例如,上述代码若将屏障数设置为4,则只会打印到达屏障前, 然后就会3个线程都会被阻塞。在CyclicBarrier的构造函数中也可以添加一个任务,到线程数达到屏障要求时,线程 继续执行,并且构造函数中的会优先执行。

实例

计算银行的账单流水,使用多个线程进行计算,然后进行汇总。

public class BankWaterService implements Runnable{

    // 4个屏障处理完成,执行当前类当run方法
    private CyclicBarrier cyclicBarrier = new CyclicBarrier(4, this);

    private Executor executor = Executors.newFixedThreadPool(4);

    private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();

    @Override
    public void run() {
        int result = 0;
        for(Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
            result += sheet.getValue();
        }

        sheetBankWaterCount.put("result", result);
        System.out.println(result);
    }

    private void count(){
        for (int i = 0; i < 4; i++) {
            executor.execute(()->{
                sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
                System.out.println(Thread.currentThread().getName() + 1);
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }

    }

    public static void main(String[] args) {
        BankWaterService bankWaterService = new BankWaterService();
        bankWaterService.count();
    }
}

说明:

首先屏障设置为4,优先执行的任务传入this,表示当4个线程到达后会优先执行类中的run()方法。对于4个线程 到计算完自己的流水后到达屏障阻塞,到4个线程都计算完毕后,会优先执行本类中的run()方法,从而完成了对于 流水的统计。

注:

CyclicBarrier的计数器可以使用reset()方法重置,这正好解释了Cyclic循环的意思。

Semaphore类

public class SemaphoreTest {

    private static final int THREAD_COUNT = 30;

    private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

    private static Semaphore s = new Semaphore(10);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            threadPool.execute(()->{
                try {
                    s.acquire();
                    System.out.println("save data");
                    s.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        threadPool.shutdown();
    }
}

Semaphore类可以控制同时访问特定资源的线程数量。例如上述代码有30个线程,但是只能有10个线程来访问数据库(打印save data)。

Exchanger类

public class ExchangerTest {

    private static final Exchanger<String> exgr = new Exchanger<>();

    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {

        threadPool.execute(()->{
            String A = "流水A";
            try {
                String B = exgr.exchange(A + Thread.currentThread().getName());
                System.out.println(Thread.currentThread().getName()+A.equals(B)+ " A: " + A + " B: " + B);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        threadPool.execute(()->{
            String B = "流水B";
            try {
                String A = exgr.exchange(B+ Thread.currentThread().getName());
                System.out.println(Thread.currentThread().getName()+A.equals(B)+ " A: " + A + " B: " + B);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        threadPool.shutdown();
    }
}

可以控制两个线程交换彼此数据。如果两个线程有一个exchange()方法没有执行,则会一直等待。exchange()方法里也可以设置最大等待时间。

参考:java并发编程艺术