drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From par...@apache.org
Subject [1/4] drill git commit: DRILL-5223: Drill should ensure balanced workload assignment at node level in order to get better query performance
Date Wed, 08 Feb 2017 06:23:59 GMT
Repository: drill
Updated Branches:
  refs/heads/master ddcf89548 -> 2b5f5428a


DRILL-5223: Drill should ensure balanced workload assignment at node level in order to get
better query performance

This closes #730


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e386e5e1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e386e5e1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e386e5e1

Branch: refs/heads/master
Commit: e386e5e145945f9644184c642b94f21438215762
Parents: ddcf895
Author: Padma Penumarthy <ppenumar97@yahoo.com>
Authored: Fri Jan 20 17:57:10 2017 -0800
Committer: Parth Chandra <pchandra@maprtech.com>
Committed: Tue Feb 7 22:21:52 2017 -0800

----------------------------------------------------------------------
 .../exec/store/schedule/AssignmentCreator.java  | 25 ++++++++-
 .../drill/exec/store/store/TestAssignment.java  | 58 +++++++++++++++++++-
 2 files changed, 79 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/e386e5e1/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
index 127264a..aeaf4bf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
@@ -106,9 +106,13 @@ public class AssignmentCreator<T extends CompleteWork> {
     LinkedList<WorkEndpointListPair<T>> unassignedWorkList;
     Map<DrillbitEndpoint,FragIteratorWrapper> endpointIterators = getEndpointIterators();
 
+    // Assign upto minCount per node based on locality.
     unassignedWorkList = assign(workList, endpointIterators, true);
-
+    // Assign upto maxCount per node based on locality.
+    unassignedWorkList = assign(unassignedWorkList, endpointIterators, false);
+    // Assign upto minCount per node in a round robin fashion.
     assignLeftovers(unassignedWorkList, endpointIterators, true);
+    // Assign upto maxCount per node in a round robin fashion.
     assignLeftovers(unassignedWorkList, endpointIterators, false);
 
     if (unassignedWorkList.size() != 0) {
@@ -241,13 +245,30 @@ public class AssignmentCreator<T extends CompleteWork> {
       mmap.put(endpoint, intList);
     }
 
+    int totalMaxCount = 0;
     for (DrillbitEndpoint endpoint : mmap.keySet()) {
       FragIteratorWrapper wrapper = new FragIteratorWrapper();
       wrapper.iter = Iterators.cycle(mmap.get(endpoint));
-      wrapper.maxCount = maxWork * mmap.get(endpoint).size();
+      // To distribute the load among nodes equally, limit the maxCount per node.
+      int maxCount = (int) ((double)mmap.get(endpoint).size()/incomingEndpoints.size() *
units.size());
+      wrapper.maxCount = Math.min(maxWork * mmap.get(endpoint).size(), maxCount);
+      totalMaxCount += wrapper.maxCount;
       wrapper.minCount = Math.max(maxWork - 1, 1) * mmap.get(endpoint).size();
       map.put(endpoint, wrapper);
     }
+
+    // Take care of leftovers.
+    while (totalMaxCount < units.size()) {
+      for (Entry<DrillbitEndpoint, FragIteratorWrapper> entry : map.entrySet()) {
+        FragIteratorWrapper iteratorWrapper = entry.getValue();
+        iteratorWrapper.maxCount++;
+        totalMaxCount++;
+        if (totalMaxCount == units.size()) {
+          break;
+        }
+      }
+    }
+
     return map;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e386e5e1/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
index 65d8cf7..aceaf79 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
@@ -39,18 +39,72 @@ public class TestAssignment {
 
   private static final long FILE_SIZE = 1000;
   private static List<DrillbitEndpoint> endpoints;
+  private static final int numEndPoints = 30;
+  private final int widthPerNode = 23;
 
   @BeforeClass
   public static void setup() {
     endpoints = Lists.newArrayList();
     final String pattern = "node%d";
-    for (int i = 2; i < 32; i++) {
+    for (int i = 0; i < numEndPoints; i++) {
       String host = String.format(pattern, i);
       endpoints.add(DrillbitEndpoint.newBuilder().setAddress(host).build());
     }
   }
 
   @Test
+  public void testBalanceAcrossNodes() throws Exception {
+    int numChunks = widthPerNode * numEndPoints + 100;
+    List<CompleteFileWork> chunks = generateChunks(numChunks);
+    Iterator<DrillbitEndpoint> incomingEndpointsIterator = Iterators.cycle(endpoints);
+    List<DrillbitEndpoint> incomingEndpoints = Lists.newArrayList();
+    List<Integer> expectedAssignments = Lists.newArrayList();
+    List<Integer> actualAssignments = Lists.newArrayList();
+
+    final int width = widthPerNode * numEndPoints;
+    for (int i = 0; i < width; i++) {
+      incomingEndpoints.add(incomingEndpointsIterator.next());
+    }
+
+    // Calculate expected assignments for each node.
+    final int numAssignmentsPerNode = numChunks/numEndPoints;
+    int leftOver = numChunks - numAssignmentsPerNode * numEndPoints;
+    for (int i =0; i < numEndPoints; i++) {
+      int additional = leftOver > 0 ? 1 : 0;
+      expectedAssignments.add(numAssignmentsPerNode + additional);
+      if (leftOver > 0) {
+        leftOver--;
+      }
+    }
+
+    ListMultimap<Integer, CompleteFileWork> mappings = AssignmentCreator.getMappings(incomingEndpoints,
chunks);
+    System.out.println(mappings.keySet().size());
+
+    // Verify that all fragments have chunks assigned.
+    for (int i = 0; i < width; i++) {
+      Assert.assertTrue("no mapping for entry " + i, mappings.get(i) != null && mappings.get(i).size()
> 0);
+    }
+
+    // Verify actual and expected assignments per node match.
+    // Compute actual assignments for each node.
+    for (int i=0; i < numEndPoints; i++) {
+      int numAssignments = 0;
+      int index = i;
+
+      while(index < numEndPoints * widthPerNode) {
+        numAssignments += mappings.get(index).size();
+        index += numEndPoints;
+      }
+
+      actualAssignments.add(numAssignments);
+    }
+
+    for (int i=0; i < numEndPoints; i++) {
+      Assert.assertTrue(actualAssignments.get(i) == expectedAssignments.get(i));
+    }
+  }
+
+  @Test
   public void manyFiles() throws Exception {
     List<CompleteFileWork> chunks = generateChunks(1000);
 
@@ -58,7 +112,7 @@ public class TestAssignment {
 
     List<DrillbitEndpoint> incomingEndpoints = Lists.newArrayList();
 
-    final int width = 28 * 30;
+    final int width = widthPerNode * numEndPoints;
     for (int i = 0; i < width; i++) {
       incomingEndpoints.add(incomingEndpointsIterator.next());
     }


Mime
View raw message