热衷学习,热衷生活!😄

沉淀、分享、成长,让自己和他人都能有所收获!😄

一、基于AQS实现的锁

AQS(AbstractQueuedSynchronizer) 是Java并发包JUC中非常重要的一个类,大部分锁都是基于AQS实现的,主要实现的类如下:

  • ReentrantLock:可重入锁,独占锁,实现了公平锁和非公平锁,这个是上篇内容介绍的类,也是最常用类,通常会和synchronized作比较。

  • ReentrantReadWriteLock:读写锁,可共享也可独占锁,读是共享锁,写是独占锁,也实现了公平锁和非公平锁。

  • Semaphore:信号锁,共享锁,也实现了公平锁和非公平锁,主要同于控制流量,比如:数据库连接池给你分配10个链接,来一个分配一个,如果10个都分配完了且没有释放那就等待释放。

  • CountDownLatch:闭锁,共享锁,也实现了公平锁和非公平锁,Latch门闩的意思,比如:说四个人一个漂流艇,坐满了就推下水。

二、Semaphore

Semaphore是什么

上面已经介绍了Semaphore,基于AQS实现的信号锁,是共享锁,实现了公平锁和非公平锁。可以用来控制同时访问特定资源的线程数,通过协调各个线程以保证合理的使用资源。

Semaphore使用场景

通常用于资源有明确访问数量限制的场景,常用于限流。

比如:数据库连接池,同时进行连接的线程数量有限制,连接不能超过一定的数量,当连接达到了限制的数量后,后面的线程只能排队等待前面的线程释放了数据库链接才能获取数据库链接。

比如:停车场场景,车位数量有限,同时只能听一定数量的车,当停满了之后外面的车只能等里面的车出来才能进去停车。

Semaphore的常用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 从信号锁获取获取一个锁,在获取到锁之前一直处理阻塞状态,除非线程被中断
acquire();

// 从信号锁获取获取指定数量锁,在获取到锁之前一直处理阻塞状态,除非线程被中断
acquire(int permits);

// 从信号锁获取一个锁,在获取到锁之前线程一直处于阻塞状态(忽略中断)
acquireUninterruptibly();

// 尝试从信号锁获取锁,返回获取成功或者失败,不会阻塞线程
tryAcquire();

// 尝试从信好锁获取锁,指定获取时间,在指定时间内没有获取到则超时返回,不会阻塞线程
tryAcquire(long timeount, TimeUnit unit);

// 释放锁
release();

// 获取等待队列中是否还有等待线程
hadQueuedThreads();

// 获取等待队列里阻塞线程的数量
getQueuedLength();

// 清空锁,返回清空锁的数量
drainPermits();

// 返回可用的锁的数量
availabelPermits();

Semaphore实现原理

初始化

Semaphore提供了两个构造方法,默认构造方法创建指定锁数量的非公平信号锁,另外一个构造方法多了一个指定是公平锁还是非公平锁的参数,源码如下:

1
2
3
4
5
6
7
8
9
// 构建指定数量锁的非公平信号锁
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

// 构建指定数量锁的公平/非公平信号锁
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
获取锁过程
acquire

acquire()是获取锁方法,调用了内部类同步器Sync继承了AQS实际调用AQSacquireSharedInterruptibly()核心,源码如下:

1
2
3
public void acquire() throws InterruptedExcetion {
sync.acquireSharedInterruptibly();
}

加JDK中,与锁相关的方法,Interruptibly表示可中断,也就是可中断锁。可中断锁的意思是线程在等待获取锁的过程中是可以被中断的,换言之,线程在等待锁的过程中可以响应中断。

acquireSharedInterruptiby

acquireSharedInterruptibly方法是获取可中断锁,源码如下:

1
2
3
4
5
6
7
8
9
10
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
// 检测线程的中断状态,如果已经被中断了,就响应中断,该方法会清除线程中的中断标识
throw new InterruptedException();
// 尝试获取锁,arg为锁的数量
// 当锁被获取完了之后,则为当前线程创建一个节点加入阻塞队列
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

acquireSharedInterruptibly方法首先会判断当前线程的中断状态如果是中断状态则响应中断,抛异常,然后调用tryAcquireShared()方法获取锁,如果锁被获取完了就为当前线程创建一个节点加入等待队列。

tryAcquireShared

tryAcquireShared()AQS定义的一个模版方法,具体由子类实现,Semaphore也实现了公平锁和非公平锁,两种锁大同小异,我们具体来看一下公平锁的具体实现,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected int tryAcquireShared(int acquires) {
// 自旋
for (;;) {
// 判断是否有排在自己前面的线程,如果有直接返回-1,进入阻塞状态
if (hasQueuedPredecessors())
return -1;
// 获取同步状态的值(当前可用锁数量)
int available = getState();
// 剩余锁数量, 可用的-申请的
int remaining = available - acquires;
// 如果剩余的锁小于0, 或者设置成功就返回,如果设置失败继续循环设置
// 如果剩余锁数量小于0,返回负数,表示获取锁失败
// 如果剩余锁数量大于0,且设置状态成功,表示获取锁成功
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

tryAcquireShared()通过自旋+CAS的方式获取锁和保证线程安全。

doAcquireSharedInterruptibly

doAcquireSharedInterruptibly()方法在公平锁的时候如果当前线程前面有等待线程或者锁被获取完了之后,当前线程需要进入等待状态时会被调用,用于为当前线程创建节点并加入等待队列,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 为当前线程创建共享模式节点加入队列结尾
final Node node = addWaiter(Node.SHARED);
// 操作失败标记
boolean failed = true;
try {
//自旋
for (;;) {
// 获取当前节点的前一节点
final Node p = node.predecessor();
if (p == head) {
// 如果前一节点是头节点, 则尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
//如果获取成功,设置头节点和共享模式传播
setHeadAndPropagate(node, r);
p.next = null;
failed = false;
return;
}
}
// 如果前一节点不是头节点或者没有获取到锁
// shouldParkAfterFailedAcquire方法判断当前线程是否需要被阻塞
// parkAndCheckInterrupt方法用于阻塞线程并检测线程是否被中断,如果被中断抛错
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
// 自旋异常退出 取消线程获取锁
cancelAcquire(node);
}
}
获取锁过程总结
  1. 首先调用Semaphore.acquire()方法获取令牌,该方法调用内置同步器Sync.acquireSharedInterruptibly()方法,该同步器继承AQS实际调用的是AQSacquireSharedInterruptibly()方法。
  2. acquireSharedInterruptibly()方法首先判断当前线程是否被中断,如果中断了就抛InterruptedException异常,如果没有被中单,就调用tryAcquireShared()方法尝试获取锁。
  3. tryAcquireShared()方法AQS只是定了一个模版方法由子类实现,Semaphore.Sync同步器提供了两种实现,分别是FairSync(公平锁)和NonfairSync(非公平锁),这两种锁实现差不多,公平锁就是多一个hasQueuedPredecessor()方法判断是否有排在自己前面的线程,如果有则返回-1。
  4. tryAcquireShared()方法通过自旋或许锁,先获取当前可用锁,减去需要获取的锁获取到剩余锁,如果剩余锁小于0直接返回,表示获取失败,否则通过CAS去获取锁,成功获取返回成功,失败获取返回失败。
  5. tryAcquireShared()如果获取锁失败,就要调用doAcquireSharedInterruptibly()方法用于为当前线程创建节点加入等待队列,该方法首先创建共享模式的节点加入队列,然后自旋,判断当前节点是不是头节点,如果是头节点也尝试获取锁,获取成功的话设置头节点并成功返回,如果获取失败则会调用shouldParkAfterFailedAcquire()判断当前线程是否需要等待,如果需要等待然后调用parkAndCheckInterrupt()方法阻塞线程并判断线程是否中断,如果中断则抛错,如果没有中断就阻塞再通过自旋获取锁。如果自旋异常退出,则调用cancelAcquire()方法取消线程获取锁。
释放锁过程
release

Semaphore.relese()方法用于释放锁,释放一个锁,源码如下:

1
2
3
4
public void release(){
// 释放一个共享锁
sync.releaseShared(1);
}

release()方法调用Semaphore.Sync同步器的releaseShared()方法,该同步器继承与AQS实际调用的是AQS.releaseShared()方法。

releaseShared

releaseShared()方法释放指定数量的共享锁,释放成功之后会唤醒等待队列中的一个线程,源码如下:

1
2
3
4
5
6
7
8
9
public final boolean releaseShared(int arg) {
// 尝试释放锁
if (tryReleaseShared(arg)) {
// 释放成功,唤醒等待队列中的线程
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared

tryReleaseShared()方法是AQS定义的模版方法由子类实现,调用了Semaphore.tryReleaseShared(),该方法通过自旋+CAS释放锁,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected final boolean tryReleaseShared(int releases) {
// 自旋
for(;;){
// 获取当前可用的锁数量
int current = getState();
// 可用的+释放的
int next = current + releases;
if (next < current) {
throw new Error("Maximum permit count exceeded");
}
// 通过CAS修改状态值释放锁
if (compareAndSetState(current, next))
return true;
}
}
doReleaseShared

doReleaseShared()方法用于释放锁成功之后唤醒等待队列中的线程,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void doReleaseShared() {
// 自旋
for (;;) {
// 将队列头节点赋值与节点h
Node h = head;
// 如果节点不为null,且不等于尾节点
if (h != null && h != tail) {
// 得到h节点的状态
int ws = h.waitStatus;
// 如果节点状态是Node.SIGNAL,就要唤醒节点h下一节点
if (ws == Node.SIGNAL) {
// 设置节点h的状态为取消状态,如果失败就循环再试一次
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒节点h下一节点线程
unparkSuccessor(h);
}
// 如果节点h状态为0,就设置ws的状态为PROPAGATE,说明下次循环的时候节点h应该无条件被传播
// 在shouldParkAfterFailedAcquire方法中使用
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 如果队列中头节点发生变化就继续循环
// 如果没有发生变化就跳出循环
if (h == head)
break;
}
}
释放锁过程总结
  1. 首先调用Semaphore.release()方法释放锁,该方法调用同步器SyncreleasShared()方法,因为同步器继承与AQS所以实际调用的是AQS.releaseShared()方法。
  2. AQS.releaseShared()方法首先调用tryReleaseShared()方法尝试释放锁,该方法是AQS定义的模版方法,通过子类实现,调用的是Semaphore.tryReleaseShared()方法。
  3. Semaphore.tryReleaseShared()方法通过自旋+CAS释放锁,先获取当前的锁数量加上释放的锁数量,会判断超过判断,然后通过CAS修改锁的数量达到释放锁的目的。tryReleaseShared()释放锁成功之后,会调用AQS.doReleaseShared()唤醒等待队列中的线程。
  4. AQS.doReleaseShared()用于释放锁成功之后唤醒等待队列中的线程,也是通过自旋+CAS实现,首先获取头节点h,先判断节点h不为null且不是尾节点,得到节点h的状态,如果状态是Node.SIGNAL说明这个节点已经要被唤醒应该唤醒下一节点,通过CAS操作设置节点h状态为取消,然后调用unparkSuccessor()方法唤醒下一节点。如果节点h的状态为0,就设置状态为PROPAGATE说明下一次应该被无条件传播。如果队列中头节点发生变化就继续循环,没有发生变化就终止循环。

三、使用Semaphor实现停车场提示牌功能

每个停车场入口都有一个提示牌,上面显示着停车场剩余的车位是多少,当剩余车位为0时,则不允许车辆进入停车场,直到停车场有车离开停车场,这是提示牌显示新的剩余车位数。

业务场景

  1. 停车场容纳量为10。
  2. 当一辆车进入停车场后,显示牌的剩余车位数响应的减1.
  3. 每有一辆车驶出停车场后,显示牌的剩余车位数响应的加1。
  4. 停车场剩余车位不足时,车辆只能在外面等待。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class SemaphoreDemo {

/**
* 停车场同时容纳10辆车
*/
private static Semaphore semaphore = new Semaphore(10);

public static void main(String[] args) {
// 模拟15辆车同时停车
for (int i = 0; i < 15; i++) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("=====" + Thread.currentThread().getName() + "车辆来到停车场");
if (semaphore.availablePermits() == 0) {
// 没有车位了
System.out.println("车位不足,请耐心等待," + Thread.currentThread().getName() + "车辆正在等待");
}
// 获取车位停车
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"成功进入停车场");
//模拟车辆在停车场停留的时间
Thread.sleep(1000);
//驶出停车场
semaphore.release();
System.out.println(Thread.currentThread().getName()+"驶出停车场");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
}
}
}

上面代码输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
=====Thread-0车辆来到停车场
=====Thread-3车辆来到停车场
Thread-3成功进入停车场
=====Thread-1车辆来到停车场
=====Thread-2车辆来到停车场
Thread-1成功进入停车场
=====Thread-5车辆来到停车场
=====Thread-4车辆来到停车场
Thread-0成功进入停车场
=====Thread-7车辆来到停车场
Thread-4成功进入停车场
Thread-7成功进入停车场
Thread-5成功进入停车场
=====Thread-6车辆来到停车场
Thread-2成功进入停车场
Thread-6成功进入停车场
=====Thread-8车辆来到停车场
Thread-8成功进入停车场
=====Thread-10车辆来到停车场
Thread-10成功进入停车场
=====Thread-9车辆来到停车场
=====Thread-14车辆来到停车场
车位不足,请耐心等待,Thread-14车辆正在等待
=====Thread-12车辆来到停车场
车位不足,请耐心等待,Thread-12车辆正在等待
=====Thread-11车辆来到停车场
车位不足,请耐心等待,Thread-11车辆正在等待
=====Thread-13车辆来到停车场
车位不足,请耐心等待,Thread-9车辆正在等待
车位不足,请耐心等待,Thread-13车辆正在等待
Thread-14成功进入停车场
Thread-10驶出停车场
Thread-2驶出停车场
Thread-6驶出停车场
Thread-11成功进入停车场
Thread-7驶出停车场
Thread-3驶出停车场
Thread-9成功进入停车场
Thread-13成功进入停车场
Thread-1驶出停车场
Thread-0驶出停车场
Thread-4驶出停车场
Thread-5驶出停车场
Thread-8驶出停车场
Thread-12成功进入停车场
Thread-13驶出停车场
Thread-9驶出停车场
Thread-12驶出停车场
Thread-14驶出停车场
Thread-11驶出停车场

从上面输出可以看出,当10个车位被停满了之后,再进来的5辆车进入等待状态直到有车驶出停车场,然后再停车,达到了我们预期的效果。