Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DF4DF200C16 for ; Thu, 9 Feb 2017 23:18:12 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id DDC6C160B50; Thu, 9 Feb 2017 22:18:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0B40F160B4B for ; Thu, 9 Feb 2017 23:18:11 +0100 (CET) Received: (qmail 65560 invoked by uid 500); 9 Feb 2017 22:18:11 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 65551 invoked by uid 99); 9 Feb 2017 22:18:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Feb 2017 22:18:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F3BE7DFBDB; Thu, 9 Feb 2017 22:18:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kihwal@apache.org To: common-commits@hadoop.apache.org Message-Id: <05090232aace48a88dc663a5a330be2b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HADOOP-14033. Reduce fair call queue lock contention. Contributed by Daryn Sharp. Date: Thu, 9 Feb 2017 22:18:10 +0000 (UTC) archived-at: Thu, 09 Feb 2017 22:18:13 -0000 Repository: hadoop Updated Branches: refs/heads/trunk 9b8505358 -> 0c01cf579 HADOOP-14033. Reduce fair call queue lock contention. Contributed by Daryn Sharp. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0c01cf57 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0c01cf57 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0c01cf57 Branch: refs/heads/trunk Commit: 0c01cf57987bcc7a17154a3538960b67f625a9e5 Parents: 9b85053 Author: Kihwal Lee Authored: Thu Feb 9 16:17:24 2017 -0600 Committer: Kihwal Lee Committed: Thu Feb 9 16:17:24 2017 -0600 ---------------------------------------------------------------------- .../org/apache/hadoop/ipc/FairCallQueue.java | 167 ++++++------------- 1 file changed, 51 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c01cf57/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java index 77a9d65..820f24c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java @@ -27,8 +27,7 @@ import java.util.AbstractQueue; import java.util.HashMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.Condition; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -55,16 +54,15 @@ public class FairCallQueue extends AbstractQueue /* The queues */ private final ArrayList> queues; - /* Read locks */ - private final ReentrantLock takeLock = new ReentrantLock(); - private final Condition notEmpty = takeLock.newCondition(); + /* Track available permits for scheduled objects. All methods that will + * mutate a subqueue must acquire or release a permit on the semaphore. + * A semaphore is much faster than an exclusive lock because producers do + * not contend with consumers and consumers do not block other consumers + * while polling. + */ + private final Semaphore semaphore = new Semaphore(0); private void signalNotEmpty() { - takeLock.lock(); - try { - notEmpty.signal(); - } finally { - takeLock.unlock(); - } + semaphore.release(); } /* Multiplexer picks which queue to draw from */ @@ -112,28 +110,25 @@ public class FairCallQueue extends AbstractQueue } /** - * Returns the first non-empty queue with equal to startIdx, or - * or scans from highest to lowest priority queue. + * Returns an element first non-empty queue equal to the priority returned + * by the multiplexer or scans from highest to lowest priority queue. + * + * Caller must always acquire a semaphore permit before invoking. * - * @param startIdx the queue number to start searching at * @return the first non-empty queue with less priority, or null if * everything was empty */ - private BlockingQueue getFirstNonEmptyQueue(int startIdx) { - BlockingQueue queue = this.queues.get(startIdx); - if (queue.size() != 0) { - return queue; - } - final int numQueues = this.queues.size(); - for(int i=0; i < numQueues; i++) { - queue = this.queues.get(i); - if (queue.size() != 0) { - return queue; + private E removeNextElement() { + int priority = multiplexer.getAndAdvanceCurrentIndex(); + E e = queues.get(priority).poll(); + if (e == null) { + for (int idx = 0; e == null && idx < queues.size(); idx++) { + e = queues.get(idx).poll(); } } - - // All queues were empty - return null; + // guaranteed to find an element if caller acquired permit. + assert e != null : "consumer didn't acquire semaphore!"; + return e; } /* AbstractQueue and BlockingQueue methods */ @@ -184,9 +179,9 @@ public class FairCallQueue extends AbstractQueue int priorityLevel = e.getPriorityLevel(); BlockingQueue q = this.queues.get(priorityLevel); boolean ret = q.offer(e, timeout, unit); - - signalNotEmpty(); - + if (ret) { + signalNotEmpty(); + } return ret; } @@ -195,72 +190,21 @@ public class FairCallQueue extends AbstractQueue int priorityLevel = e.getPriorityLevel(); BlockingQueue q = this.queues.get(priorityLevel); boolean ret = q.offer(e); - - signalNotEmpty(); - + if (ret) { + signalNotEmpty(); + } return ret; } @Override public E take() throws InterruptedException { - int startIdx = this.multiplexer.getAndAdvanceCurrentIndex(); - - takeLock.lockInterruptibly(); - try { - // Wait while queue is empty - for (;;) { - BlockingQueue q = this.getFirstNonEmptyQueue(startIdx); - if (q != null) { - // Got queue, so return if we can poll out an object - E e = q.poll(); - if (e != null) { - return e; - } - } - - notEmpty.await(); - } - } finally { - takeLock.unlock(); - } + semaphore.acquire(); + return removeNextElement(); } @Override - public E poll(long timeout, TimeUnit unit) - throws InterruptedException { - - int startIdx = this.multiplexer.getAndAdvanceCurrentIndex(); - - long nanos = unit.toNanos(timeout); - takeLock.lockInterruptibly(); - try { - for (;;) { - BlockingQueue q = this.getFirstNonEmptyQueue(startIdx); - if (q != null) { - E e = q.poll(); - if (e != null) { - // Escape condition: there might be something available - return e; - } - } - - if (nanos <= 0) { - // Wait has elapsed - return null; - } - - try { - // Now wait on the condition for a bit. If we get - // spuriously awoken we'll re-loop - nanos = notEmpty.awaitNanos(nanos); - } catch (InterruptedException ie) { - notEmpty.signal(); // propagate to a non-interrupted thread - throw ie; - } - } - } finally { - takeLock.unlock(); - } + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + return semaphore.tryAcquire(timeout, unit) ? removeNextElement() : null; } /** @@ -269,15 +213,7 @@ public class FairCallQueue extends AbstractQueue */ @Override public E poll() { - int startIdx = this.multiplexer.getAndAdvanceCurrentIndex(); - - BlockingQueue q = this.getFirstNonEmptyQueue(startIdx); - if (q == null) { - return null; // everything is empty - } - - // Delegate to the sub-queue's poll, which could still return null - return q.poll(); + return semaphore.tryAcquire() ? removeNextElement() : null; } /** @@ -285,12 +221,11 @@ public class FairCallQueue extends AbstractQueue */ @Override public E peek() { - BlockingQueue q = this.getFirstNonEmptyQueue(0); - if (q == null) { - return null; - } else { - return q.peek(); + E e = null; + for (int i=0; e == null && i < queues.size(); i++) { + e = queues.get(i).peek(); } + return e; } /** @@ -301,11 +236,7 @@ public class FairCallQueue extends AbstractQueue */ @Override public int size() { - int size = 0; - for (BlockingQueue q : this.queues) { - size += q.size(); - } - return size; + return semaphore.availablePermits(); } /** @@ -324,20 +255,24 @@ public class FairCallQueue extends AbstractQueue */ @Override public int drainTo(Collection c, int maxElements) { - int sum = 0; - for (BlockingQueue q : this.queues) { - sum += q.drainTo(c, maxElements); + // initially take all permits to stop consumers from modifying queues + // while draining. will restore any excess when done draining. + final int permits = semaphore.drainPermits(); + final int numElements = Math.min(maxElements, permits); + int numRemaining = numElements; + for (int i=0; numRemaining > 0 && i < queues.size(); i++) { + numRemaining -= queues.get(i).drainTo(c, numRemaining); } - return sum; + int drained = numElements - numRemaining; + if (permits > drained) { // restore unused permits. + semaphore.release(permits - drained); + } + return drained; } @Override public int drainTo(Collection c) { - int sum = 0; - for (BlockingQueue q : this.queues) { - sum += q.drainTo(c); - } - return sum; + return drainTo(c, Integer.MAX_VALUE); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org