/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) thrownew NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; }
/** * Main execution method for delayed or periodic tasks. If pool * is shut down, rejects the task. Otherwise adds task to queue * and starts a thread, if necessary, to run it. (We cannot * prestart the thread to run the task because the task (probably) * shouldn't be run yet.) If the pool is shut down while the task * is being added, cancel and remove it if required by state and * run-after-shutdown parameters. * * @param task the task */ privatevoiddelayedExecute(RunnableScheduledFuture<?> task){ if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * * @return the head of this queue * @throws InterruptedException {@inheritDoc} */ public E take()throws InterruptedException { final ReentrantLock lock = this.lock; // 1 获取lock锁 lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) // 2.1:如果PriorityQueue为空,则当前Condition available等待 available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) // 2.3.1 返回PriorityQueue中的头元素 return q.poll(); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 2.2 available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) //2.3.2 唤醒Condition available等待的线程 available.signal(); //3 lock.unlock(); } }
/** * Inserts the specified element into this delay queue. * * @param e the element to add * @return {@code true} * @throws NullPointerException if the specified element is null */ publicbooleanoffer(E e){ final ReentrantLock lock = this.lock; lock.lock(); //1 try { q.offer(e); //2.1 if (q.peek() == e) { leader = null; // 2.2如果在上面2.1中添加的任务是PriorityQueue的头元素,唤醒在Condition中等待的所有线程。 available.signal(); } returntrue; } finally { lock.unlock();//3 } }