hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kih...@apache.org
Subject hadoop git commit: HADOOP-14032. Reduce fair call queue priority inversion. Contributed by Daryn Sharp.
Date Thu, 09 Feb 2017 16:08:29 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk a8a594b4c -> a0bfb4150


HADOOP-14032. Reduce fair call queue priority inversion. Contributed by Daryn Sharp.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a0bfb415
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a0bfb415
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a0bfb415

Branch: refs/heads/trunk
Commit: a0bfb4150464013a618f30c2e38d88fc6de11ad2
Parents: a8a594b
Author: Kihwal Lee <kihwal@apache.org>
Authored: Thu Feb 9 10:04:28 2017 -0600
Committer: Kihwal Lee <kihwal@apache.org>
Committed: Thu Feb 9 10:04:28 2017 -0600

----------------------------------------------------------------------
 .../org/apache/hadoop/ipc/FairCallQueue.java    | 12 +++--
 .../apache/hadoop/ipc/TestFairCallQueue.java    | 57 +++++++++++++++++++-
 2 files changed, 63 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0bfb415/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
index c2d3cd8..77a9d65 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
@@ -112,19 +112,21 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
   }
 
   /**
-   * Returns the first non-empty queue with equal or lesser priority
-   * than <i>startIdx</i>. Wraps around, searching a maximum of N
-   * queues, where N is this.queues.size().
+   * Returns the first non-empty queue with equal to <i>startIdx</i>, or
+   * or scans from highest to lowest priority queue.
    *
    * @param startIdx the queue number to start searching at
    * @return the first non-empty queue with less priority, or null if
    * everything was empty
    */
   private BlockingQueue<E> getFirstNonEmptyQueue(int startIdx) {
+    BlockingQueue<E> queue = this.queues.get(startIdx);
+    if (queue.size() != 0) {
+      return queue;
+    }
     final int numQueues = this.queues.size();
     for(int i=0; i < numQueues; i++) {
-      int idx = (i + startIdx) % numQueues; // offset and wrap around
-      BlockingQueue<E> queue = this.queues.get(idx);
+      queue = this.queues.get(i);
       if (queue.size() != 0) {
         return queue;
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a0bfb415/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
index 96dea80..901a771 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
@@ -28,9 +28,12 @@ import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
-
 import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
 import org.apache.hadoop.conf.Configuration;
 
 public class TestFairCallQueue extends TestCase {
@@ -43,6 +46,7 @@ public class TestFairCallQueue extends TestCase {
     when(ugi.getUserName()).thenReturn(id);
     when(mockCall.getUserGroupInformation()).thenReturn(ugi);
     when(mockCall.getPriorityLevel()).thenReturn(priority);
+    when(mockCall.toString()).thenReturn("id=" + id + " priority=" + priority);
 
     return mockCall;
   }
@@ -78,6 +82,57 @@ public class TestFairCallQueue extends TestCase {
     assertEquals(fairCallQueue.remainingCapacity(), 1025);
   }
 
+  @Test
+  public void testPrioritization() {
+    int numQueues = 10;
+    Configuration conf = new Configuration();
+    fcq = new FairCallQueue<Schedulable>(numQueues, numQueues, "ns", conf);
+
+    //Schedulable[] calls = new Schedulable[numCalls];
+    List<Schedulable> calls = new ArrayList<>();
+    for (int i=0; i < numQueues; i++) {
+      Schedulable call = mockCall("u", i);
+      calls.add(call);
+      fcq.add(call);
+    }
+
+    final AtomicInteger currentIndex = new AtomicInteger();
+    fcq.setMultiplexer(new RpcMultiplexer(){
+      @Override
+      public int getAndAdvanceCurrentIndex() {
+        return currentIndex.get();
+      }
+    });
+
+    // if there is no call at a given index, return the next highest
+    // priority call available.
+    //   v
+    //0123456789
+    currentIndex.set(3);
+    assertSame(calls.get(3), fcq.poll());
+    assertSame(calls.get(0), fcq.poll());
+    assertSame(calls.get(1), fcq.poll());
+    //      v
+    //--2-456789
+    currentIndex.set(6);
+    assertSame(calls.get(6), fcq.poll());
+    assertSame(calls.get(2), fcq.poll());
+    assertSame(calls.get(4), fcq.poll());
+    //        v
+    //-----5-789
+    currentIndex.set(8);
+    assertSame(calls.get(8), fcq.poll());
+    //         v
+    //-----5-7-9
+    currentIndex.set(9);
+    assertSame(calls.get(9), fcq.poll());
+    assertSame(calls.get(5), fcq.poll());
+    assertSame(calls.get(7), fcq.poll());
+    //----------
+    assertNull(fcq.poll());
+    assertNull(fcq.poll());
+  }
+
   //
   // Ensure that FairCallQueue properly implements BlockingQueue
   //


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message