Semaphore
Semaphore 字面意思是信号量的意思,它的作用是控制访问特定资源的线程数目。应用场景:资源访问,服务限流。
Semaphore 实现AbstractQueuedSynchronizer的方法与 ReentrantLock一样
Semaphore构造方法
public Semaphore(int permits) {------permits 表示能同时有多少个线程访问我们的资源
sync = new NonfairSync(permits); -------------默认创建的是非公平锁。
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);-------传入的permits做i为了state的值,作为资源总数
}
semaphore.acquire();获取资源,源码实现
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);---------每次申请一次资源
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();-----------------线程无效直接抛异常
if (tryAcquireShared(arg) < 0) --------------------拿不到资源,需要进行入队操作
doAcquireSharedInterruptibly(arg); ---------入队操作
}
final int nonfairTryAcquireShared(int acquires) { --------获取资源的操作
for (;;) {
int available = getState(); --------------拿到现有的资源
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining)) -----------原子操作,多线程情况下会可能失败,所以无线循环自旋下去,直到成功;
return remaining;---------------------如果大于等于0那么就是拿到了资源,如果小于0,那么线程就要进入等待队列
}
}
为什么要用死循环----compareAndSetState这个是cas原子操作,失败之后要循环重复继续操作,直到成功。死循环也就结束了。
private void doAcquireSharedInterruptibly(int arg)-------------线程入队操作
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);---------------注意这里是以共享的方式入队
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) { --------新入队的节点会判断他上一个节点是不是头节点,如果是头节点会再次尝试获取资源,
int r = tryAcquireShared(arg);
if (r >= 0) { -------------如果获取到资源,那么这个阻塞队列就要清空了,里面没有在等待的线程了。
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && ----------如果获取不到资源,那么就要线程阻塞了
parkAndCheckInterrupt()) -----------parkAndCheckInterrupt这个方法会将线程阻塞(挂起),线程都阻塞了,这个死循环就不会执行了,这也就是为什么juc源码写了很多
死循环都没问题地原因,我们可以借鉴。当线程被唤醒之后又开始这个死循环,尝试拿资源(非公平锁有可能拿不到),
拿不到再次被阻塞挂起。
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { ---------------判断线程能否被正常阻塞
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) -----------------如果上一个节点是有效的在等待的线程,那么该线程就可以插入到队列后面
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { -----------如果上一个节点是无效的,那就查找上上个节点是不是有效的,直到找到那个有效的节点,然后将该节点插入到那个有效节点后面,中间的无效节点从链表中删除,后面的节点要找前面
的节点这也就说明了为什么我们地等待队列要设计成双链表,不光有next。next这种找后驱节点地操作还有pre .pre这样前驱节点。所以需要双链表。
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
semaphore.release();释放资源,源码分析
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases; -----------获取当前的资源然后给资源加回去
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) -----------------CAS算法还资源,死循环,直到成功还回去,死循环结束。
return true;
}
}
资源还回去之后执行
doReleaseShared方法唤醒其他线程抢资源
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) { --------发现阻塞队列有阻塞线程
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); ---------跳过头节点,唤醒下一个节点
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t; ------循环找到waitStatus<0能唤醒的节点调用unpark方法唤醒线程。
}
if (s != null)
LockSupport.unpark(s.thread);
}
CountDownLatch是什么?
CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。
CountDownLatch如何工作?
CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
API
CountDownLatch.countDown()分线程执行------ 线程执行完任务之后处于等待状态
CountDownLatch.await()? 主线程执行 -------监控所有线程,所有线程结束之后主线程继续走下去。
CountDownLatch 不可重用
CyclicBarrier
栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
下一篇 top命令查看进程信息详解