Ja He
Ja He's Blog

Ja He's Blog

Java Semaphore使用及原理介绍

Java Semaphore使用及原理介绍

Java semaphore usage and source code introduction

Ja He's photo
Ja He
·Jan 25, 2022·

2 min read

Java Semaphore简介

信号量(Semaphore)的概念是由荷兰计算机科学家艾兹赫尔·戴克斯特拉(Edsger W. Dijkstra)发明,较为广泛地应用于计算机操作系统进程通信,其核心原理是维护一个计数值,通过等待(wait)或者释放(release)操作来减少或者增加计数值。当计数值为零时,等待(wait)操作需要等到该计数值重新大于0时才能成功。

不同的编程语言一般都有Semaphore的实现,Java语言中Semaphore是concurrent包中提供的一个并发控制工具,其提供了一套许可的机制来实现并发访问资源控制。Semaphore保证这些操作的线程安全,一般较为适合用在有限资源并发管理的场景,下面是常见的两种用法。

  • Binary Semaphore

    这个用法即只有两个状态的信号量,该用法类似于排它锁。但Binary semaphore存在一个很大的问题在于其没有Ownership的概念,也就意味着A线程的Permit可以被B线程Release,所以如果直接将其用作排它锁,则可能会出现并发安全问题。 但另一方面,这个机制可能会在特定场景有用武之地,例如你想实现死锁恢复的场景。

  • Count Semaphores

    拥有多个permit并允许多个线程同时持有permit。其具体用法比较多样,可以用来进行资源管理或模拟CountDownLatch等。

    /** 模拟资源管理*/
    class Pool {
      private final Semaphore available = new Semaphore(0, true);
      private ConcurrentLinkedQueue<Object> poolObjects = new ConcurrentLinkedQueue<>();
      public Object getItem() throws InterruptedException {
          available.acquire();
          return poolObjects.poll();
      }
      public void putItem(Object x) {
          poolObjects.offer(x);
          available.release();
      }
    }
    
      /** 模拟CountDownLatch典型用法*/
      public void SemaphoreCountDown() throws Exception{
          Semaphore semaphore = new Semaphore(0);
          new Thread(() -> doXXX(semaphore)).start();
          new Thread(() -> doXXX(semaphore)).start();
          new Thread(() -> doXXX(semaphore)).start();
          semaphore.acquire(3);
      }
      private void doXXX(Semaphore semaphore) {
          /*do something*/
          semaphore.release();
      }
    
      /** 模拟CyclicBarrier典型用法*/
      public void SemaphoreCountUp() throws Exception{
          Semaphore semaphore = new Semaphore(0);
          new Thread(() -> doXXX(semaphore)).start();
          new Thread(() -> doXXX(semaphore)).start();
          new Thread(() -> doXXX(semaphore)).start();
          semaphore.release(3);
      }
    
      private void doXXX(Semaphore semaphore) {
          try {
              /*do something*/
              semaphore.acquire();
              /*do something in the same time*/
          } catch (InterruptedException e) {}
      }
    

Semaphore原理

字段、方法简介

Semaphore类的字段主要包括一个Sync类的对象、公平策略配置fair以及排队线程集合和长度。Sync是Semaphore内部类,其继承了AQS(AbstractQueuedSynchronizer),提供了利用Unsafe类的CAS操作来进行队列同步的能力。Sync下有两个子类:FairSync、NonfairSync,这个两个类是公平策略配置的具体实现。

类的方法主要分两种,一种是获取Permit的acquire类相关方法,一种是release类相关方法。acquire相关方法一般会阻塞,可设置阻塞时间和是否可被interrupt,如果需要非阻塞可使用tryAcquire相关方法,但要注意tryAquire()方法强制使用非公平策略。其他还有permit相关的方法如获取当前可用许可及直接操作许可。 semaphore.png

所有的acquire类和release相关方法内部都会调用到持有的sync类对象。

Sync类介绍

当创建一个Semaphore实际会创建一个Sync,前面介绍了Sync类继承AQS,会将permits数量设置为AQS的state。那么Sync类是如何实现信号量机制、FairSync和NonFairSync是如何实现公平和非公平策略的呢?

acquire()方法为例,来了解下获取permit的流程。

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

acquire()方法会调用到AQS的acquireSharedInterruptibly方法,该方法主要做了两个事:尝试获取及失败排队。尝试获取主要是调用tryAcquireShared方法。这个方法在FairSync和NonFairSync有不同的实现。先来看FairSync的实现:

protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

可以看到其先会调用hasQueuedPredecessors判断是否存在已经在等待permit的线程。如果有就返回-1表示尝试获取失败。如果没有等待的线程且剩余permit数量大于尝试获取的数量,则后续通过unsafe的CAS操作更新state。最后返回剩余的permit数量。再对比下NonFairSync的实现:

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

可以看到,NonFairSync没有hasQueuedPredecessors的判断,当前线程可能会在AQS中等待线程之前获取到permit,这样FairSync和NonFairSync便实现了公平和非公平策略。

当tryAcquireShared尝试获取失败后,则会调用AQS的doAcquireSharedInterruptibly方法进行排队等待。该方法实现了排队、park和interrupt相关功能,最后也会通过tryAcquireShared来获取permit。

到这里你会发现,acquire调用的AQS方法中都含有Shared,这是因为Semaphore实际就是共享的模式,允许多个线程同时去请求permit。

现在你应该对Semaphore的基本用法和原理有初步了解,但很多核心的功能是由AQS来实现的,下一篇文章我们再继续深入了解下AQS。

 
Share this