CountDownLatch、CyclicBarrier与Semaphore

Posted by RussXia on May 20, 2019

CountDownLatch

CountDownLatch可以实现多线程之间的计数功能,并实现了阻塞功能。其内部通过实现了AQS的java.util.concurrent.CountDownLatch.Sync保证线程之间同步修改count值。

在构造CountDownLatch的时候,传入计数值,并在构造函数内完成sync的初始化。

 public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

CountDownLatch提供了两个核心的方法,countDownawait()。每次调用countDown(),count值都会减1,当count值减为0时,所有await的线程从等待中恢复。

private static class Printer implements Runnable {
    private final CountDownLatch count;

    private Printer(CountDownLatch count) {
        this.count = count;
    }

    public void run() {
        //do something
        System.out.println(Thread.currentThread().getName() + " ready");
        count.countDown();
    }
}

public static void main(String[] args) throws InterruptedException {
    CountDownLatch count = new CountDownLatch(10);
    for (int i = 0; i < 10; i++) {
        //开启10个工作线程,执行完任务后调用countdown方法
        new Thread(new Printer(count), "work_thread-" + i).start();
    }
    new Thread(() -> {
        try {
             //阻塞线程,等待结束信号变为0时进行工作
            count.await();
            System.out.println(Thread.currentThread().getName() + ":-- all ready");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, "personal-thread").start();
    //阻塞线程,等待结束信号变为0时进行工作
    count.await();
    System.out.println(Thread.currentThread().getName() + ":所有线程准备开始!");
    Thread.sleep(2000);
}

CyclicBarrier

CyclicBarrier可以实现类似栅栏的效果:让一组线程共同等待,触发栅栏预设的值parties后,同时执行。 CyclicBarrier内部使用ReentrantLock保证线程之间的同步修改栅栏值parties。

/**
  * parties是指定的栅栏值,就是必须调用await()方法的线程的数目
  * barrierAction是触发了parties阈值后会执行的任务(由最后到达的线程执行)  
  */
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

下面是CyclicBarrier中await源码

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException, TimeoutException {
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lock();
    try {
        //一个分代的屏障标示,内部只有一个broken字段,如果为true表示屏障被突破
        //不同代的CyclicBarrier的Gereration对象不同(每次都会重新new)
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();
        //如果当前线程已被中断,破坏屏障,唤醒所有await的线程,当前线程抛出InterruptedException
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        //需要等待进入屏障的线程数量-1
        int index = --count;
        if (index == 0) {  // tripped,减少到0,达到阈值
            boolean ranAction = false;
            try {
                //如果构造时有指定触发的任务,执行
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                //触发所有await的线程,然后重置CyclicBarrier(可复用)
                nextGeneration();
                return 0;
            } finally {
                //如果ranAction异常的没有为true,突破屏障
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed) //如果没有设置超时时间,直接await
                    trip.await();
                else if (nanos > 0L)    //等待至超时时间
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {    //等于当前代,且屏障没有被破坏,破坏屏障
                    breakBarrier();
                    throw ie;
                } else {    //不等于当前代,或者屏障已经被破坏
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt(); //中断当前线程
                }
            }
            //屏障被破坏,抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
            //已经不是当前代了,返回索引
            if (g != generation)
                return index;
            //设置超时且超时了,破坏屏障,抛出超时异常
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        //释放锁
        lock.unlock();
    }
}

CyclicBarrier使用示例:

public static void main(String[] args) {
    CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    //任务A
    executorService.submit(() -> {
        try {
            System.out.println("Task A step 1");
            cyclicBarrier.await();
            System.out.println("Task A step 2");
            cyclicBarrier.await();
            System.out.println("Task A step 3");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    });
    //任务B
    executorService.submit(() -> {
        try {
            System.out.println("Task B step 1");
            cyclicBarrier.await();
            System.out.println("Task B step 2");
            cyclicBarrier.await();
            System.out.println("Task B step 3");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    });
    executorService.shutdown();
}   

输出:

Task A step 1
Task B step 1
Task B step 2
Task A step 2
Task A step 3
Task B step 3

Semaphore

Semaphore的意思是信号量,我们可以用Semaphore实现对特定资源的允许线程同时访问数量的控制。

Semaphore类似于令牌,信号量有限,信号量被获取光了,只能等待其他线程释放信号量。Semaphore有两个关键的方法acquire获取信号量许可,release释放信号量许可。其内部还是使用的基于AQS实现的同步控制。

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    final Semaphore semaphore = new Semaphore(2);
    for (int i = 0; i < 5; i++) {
        executorService.submit(() -> {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + " acquire permit");
                System.out.println(Thread.currentThread().getName() + ":permit remains " + semaphore.availablePermits());
                Thread.sleep(1000);
                semaphore.release();
                System.out.println(Thread.currentThread().getName() + " release permit");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    executorService.shutdown();
}

控制台输出:

pool-1-thread-2 acquire permit
pool-1-thread-2:permit remains 0        //这里是pool-1-thread-2和pool-1-thread-1并发了
pool-1-thread-1 acquire permit
pool-1-thread-1:permit remains 0
pool-1-thread-2 release permit
pool-1-thread-1 release permit          //释放完,此时还有两个信号量,
pool-1-thread-3 acquire permit
pool-1-thread-3:permit remains 1
pool-1-thread-4 acquire permit
pool-1-thread-4:permit remains 0        //线程3,4获取完信号量
pool-1-thread-3 release permit
pool-1-thread-5 acquire permit          //线程5获取到刚刚线程3释放的信号量
pool-1-thread-4 release permit          //线程4释放信号量
pool-1-thread-5:permit remains 1        //此时还有一个线程4释放的信号量
pool-1-thread-5 release permit

限流的三种常见算法

常见的三种限流算法:

  • 令牌桶限流:令牌痛容量固定,按照固定速度往里面添加令牌(满了就丢弃),如果令牌桶重令牌数量充足,请求可以被处理,否则拒绝新请求。
  • 漏斗限流:漏斗按固定速度流出,当流入的请求数量达到漏斗的上限时,请求被拒绝。
  • 计数器限流:有时我们也可以用计数器来控制一定时间内的总并发数。