/** * Returns the current value of synchronization state. * This operation has memory semantics of a {@code volatile} read. * @return current state value */ protectedfinalintgetState(){ return state; }
/** * Sets the value of synchronization state. * This operation has memory semantics of a {@code volatile} write. * @param newState the new state value */ protectedfinalvoidsetState(int newState){ state = newState; }
/** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */ protectedfinalbooleancompareAndSetState(int expect, int update){ // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
需要记录当前是哪个线程持有锁。
1 2 3 4 5
/** * The current owner of exclusive mode synchronization. * 记录锁被哪个线程持有 */ privatetransient Thread exclusiveOwnerThread;
/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ privatetransientvolatile Node head;
/** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ privatetransientvolatile Node tail;
/** * Creates and enqueues node for current thread and given mode. * 把当前线程封装成Node,然后把Node放入双向链表的尾部。 * 注意:只是把当前线程放入队列,线程本身并未阻塞 * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode){ // 用当前线程构造一个Node对象,mode是一个表示Node类型的字段,或者说是这个节点是独占的还是共享的 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 将目前队列中尾部节点给pred Node pred = tail; // 队列不为空的时候 if (pred != null) { node.prev = pred; // 尝试加入队尾,失败之后执行enq(node) // 先尝试通过AQS方式修改尾节点为最新的节点,如果修改失败,意味着有并发, if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //第一次尝试添加尾部失败说明有并发,此时进入自旋 enq(node); return node; }
/** * Attempts to acquire in exclusive mode, aborting if interrupted, * and failing if the given timeout elapses. Implemented by first * checking interrupt status, then invoking at least once {@link * #tryAcquire}, returning on success. Otherwise, the thread is * queued, possibly repeatedly blocking and unblocking, invoking * {@link #tryAcquire} until success or the thread is interrupted * or the timeout elapses. This method can be used to implement * method {@link Lock#tryLock(long, TimeUnit)}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. * @param nanosTimeout the maximum number of nanoseconds to wait * @return {@code true} if acquired; {@code false} if timed out * @throws InterruptedException if the current thread is interrupted */ publicfinalbooleantryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) thrownew InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
/** * Acquires in exclusive timed mode. * * @param arg the acquire argument * @param nanosTimeout max wait time * @return {@code true} if acquired */ privatebooleandoAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) returnfalse; finallong deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; returntrue; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) returnfalse; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) thrownew InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
共享式获取
acquireShared
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/** * Acquires in shared mode, ignoring interrupts. Implemented by * first invoking at least once {@link #tryAcquireShared}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquireShared} until success. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. */ publicfinalvoidacquireShared(int arg){ if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
/** * Attempts to acquire in shared mode. This method should query if * the state of the object permits it to be acquired in the shared * mode, and if so to acquire it. * * <p>This method is always invoked by the thread performing * acquire. If this method reports failure, the acquire method * may queue the thread, if it is not already queued, until it is * signalled by a release from some other thread. * * <p>The default implementation throws {@link * UnsupportedOperationException}. * * @param arg the acquire argument. This value is always the one * passed to an acquire method, or is the value saved on entry * to a condition wait. The value is otherwise uninterpreted * and can represent anything you like. * @return a negative value on failure; zero if acquisition in shared * mode succeeded but no subsequent shared-mode acquire can * succeed; and a positive value if acquisition in shared * mode succeeded and subsequent shared-mode acquires might * also succeed, in which case a subsequent waiting thread * must check availability. (Support for three different * return values enables this method to be used in contexts * where acquires only sometimes act exclusively.) Upon * success, this object has been acquired. * @throws IllegalMonitorStateException if acquiring would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if shared mode is not supported */ protectedinttryAcquireShared(int arg){ thrownew UnsupportedOperationException(); }
/** * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once * {@link #tryAcquireShared}, returning on success. Otherwise the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted. * @param arg the acquire argument. * This value is conveyed to {@link #tryAcquireShared} but is * otherwise uninterpreted and can represent anything * you like. * @throws InterruptedException if the current thread is interrupted */ publicfinalvoidacquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) thrownew InterruptedException(); // tryAcquireShared由具体的子类实现 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
/** * Acquires in shared interruptible mode. * @param arg the acquire argument */ privatevoiddoAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) thrownew InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
/** * Attempts to acquire in shared mode, aborting if interrupted, and * failing if the given timeout elapses. Implemented by first * checking interrupt status, then invoking at least once {@link * #tryAcquireShared}, returning on success. Otherwise, the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted or the timeout elapses. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. * @param nanosTimeout the maximum number of nanoseconds to wait * @return {@code true} if acquired; {@code false} if timed out * @throws InterruptedException if the current thread is interrupted */ publicfinalbooleantryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) thrownew InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
/** * Acquires in shared timed mode. * * @param arg the acquire argument * @param nanosTimeout max wait time * @return {@code true} if acquired */ privatebooleandoAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) returnfalse; finallong deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; returntrue; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) returnfalse; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) thrownew InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ publicfinalbooleanrelease(int arg){ // 释放锁 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 唤醒队列中的后继者 unparkSuccessor(h); returntrue; } returnfalse; }
/** * Attempts to set the state to reflect a release in exclusive * mode. * * <p>This method is always invoked by the thread performing release. * * <p>The default implementation throws * {@link UnsupportedOperationException}. * * @param arg the release argument. This value is always the one * passed to a release method, or the current state value upon * entry to a condition wait. The value is otherwise * uninterpreted and can represent anything you like. * @return {@code true} if this object is now in a fully released * state, so that any waiting threads may attempt to acquire; * and {@code false} otherwise. * @throws IllegalMonitorStateException if releasing would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if exclusive mode is not supported */ protectedbooleantryRelease(int arg){ thrownew UnsupportedOperationException(); }
共享式释放
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ publicfinalbooleanreleaseShared(int arg){ if (tryReleaseShared(arg)) {// 由具体子类实现 doReleaseShared();//一次性唤醒队列中所有阻塞的线程 returntrue; } returnfalse; }
/** * Attempts to set the state to reflect a release in shared mode. * * <p>This method is always invoked by the thread performing release. * * <p>The default implementation throws * {@link UnsupportedOperationException}. * * @param arg the release argument. This value is always the one * passed to a release method, or the current state value upon * entry to a condition wait. The value is otherwise * uninterpreted and can represent anything you like. * @return {@code true} if this release of shared mode may permit a * waiting acquire (shared or exclusive) to succeed; and * {@code false} otherwise * @throws IllegalMonitorStateException if releasing would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if shared mode is not supported */ protectedbooleantryReleaseShared(int arg){ thrownew UnsupportedOperationException(); }
/** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ privatevoiddoReleaseShared(){ /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } elseif (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
/** waitStatus value to indicate thread has cancelled */ staticfinalint CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ staticfinalint SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ staticfinalint CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ staticfinalint PROPAGATE = -3;
/** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatileint waitStatus;
/** 前置节点 */ volatile Node prev;
/** 后置节点 */ volatile Node next;
/** * The thread that enqueued this node. Initialized on * construction and nulled out after use. * 获取锁失败的线程保存在Node节点中。 */ volatile Thread thread;
/** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. * 当我们调用了Condition后他也有一个等待队列 */ Node nextWaiter;
/** * Returns true if node is waiting in shared mode. */ finalbooleanisShared(){ return nextWaiter == SHARED; }
/** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor()throws NullPointerException { Node p = prev; if (p == null) thrownew NullPointerException(); else return p; } }
staticfinalclassFairSyncextendsSync{ /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protectedfinalbooleantryAcquire(int acquires){ final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 只有当c==0(没有线程持有锁),并且排在队列的第1个时(即当队列中没有其他线程的时候), // 才去抢锁,否则继续排队 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); returntrue; } } elseif (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) thrownew Error("Maximum lock count exceeded"); setState(nextc); returntrue; } returnfalse; } }
/** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protectedfinalbooleantryAcquire(int acquires){ final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 只有当c==0(没有线程持有锁),并且排在队列的第1个时(即当队列中没有其他线程的时候), // 才去抢锁,否则继续排队 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); returntrue; } } elseif (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) thrownew Error("Maximum lock count exceeded"); setState(nextc); returntrue; } returnfalse; }
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node){ // 获得前驱节点的状态 int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ // 2. 返回true,并且执行parkAndCheckInterrupt returntrue; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ // 3. 当pred所维护的获取请求被取消时(也就是node.waitStatus = CANCELLED, // 这时就会循环移除所有被取消的前继节点pred,直到找到未被取消的pred。 // 移除所有被取消的前继节点后,直接返回false do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 1. addWaiter方法调用时执行该方法(初次设置),设置为SIGNAL,并且返回false,继续执行acquireQueued compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } returnfalse; }
/** * Wakes up node's successor, if one exists. * * @param node the node */ privatevoidunparkSuccessor(Node node){ /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ // 获得头节点状态 int ws = node.waitStatus; if (ws < 0) // 如果头节点装小于0 则将其置为0 compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 这个是新的头节点 Node s = node.next; if (s == null || s.waitStatus > 0) { // 如果新头节点不满足要求 s = null; for (Node t = tail; t != null && t != node; t = t.prev) // 从队列尾部开始往前去找最前面的一个waitStatus小于0的节点 if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒后继节点对应的线程 LockSupport.unpark(s.thread); }