drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject [20/27] drill git commit: DRILL-5304: Queries fail intermittently when there is skew in data distribution
Date Thu, 02 Mar 2017 20:59:47 GMT
DRILL-5304: Queries fail intermittently when there is skew in data distribution

close #766


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/69de3a1e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/69de3a1e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/69de3a1e

Branch: refs/heads/master
Commit: 69de3a1e409bb1fb9a25e679ce1750d9f9daf238
Parents: 974c613
Author: Padma Penumarthy <ppenumar97@yahoo.com>
Authored: Mon Feb 27 18:32:24 2017 -0800
Committer: Jinfeng Ni <jni@apache.org>
Committed: Wed Mar 1 23:15:34 2017 -0800

----------------------------------------------------------------------
 .../SoftAffinityFragmentParallelizer.java       |  2 +-
 .../exec/store/schedule/AssignmentCreator.java  | 28 +++++++++++++-------
 2 files changed, 19 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/69de3a1e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
index 1ebed86..644263e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
@@ -117,7 +117,7 @@ public class SoftAffinityFragmentParallelizer implements FragmentParallelizer
{
 
       // Find the maximum number of slots which should go to endpoints with affinity (See
DRILL-825 for details)
       int affinedSlots =
-          Math.max(1, (int) (parameters.getAffinityFactor() * width / activeEndpoints.size()))
* sortedAffinityList.size();
+          Math.max(1, (int) (Math.ceil((double)parameters.getAffinityFactor() * width / activeEndpoints.size())
* sortedAffinityList.size()));
 
       // Make sure affined slots is at least the number of mandatory nodes
       affinedSlots = Math.max(affinedSlots, numRequiredNodes);

http://git-wip-us.apache.org/repos/asf/drill/blob/69de3a1e/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
index aeaf4bf..198d1ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
@@ -106,13 +106,16 @@ public class AssignmentCreator<T extends CompleteWork> {
     LinkedList<WorkEndpointListPair<T>> unassignedWorkList;
     Map<DrillbitEndpoint,FragIteratorWrapper> endpointIterators = getEndpointIterators();
 
-    // Assign upto minCount per node based on locality.
-    unassignedWorkList = assign(workList, endpointIterators, true);
     // Assign upto maxCount per node based on locality.
-    unassignedWorkList = assign(unassignedWorkList, endpointIterators, false);
+    unassignedWorkList = assign(workList, endpointIterators, false);
+
     // Assign upto minCount per node in a round robin fashion.
     assignLeftovers(unassignedWorkList, endpointIterators, true);
-    // Assign upto maxCount per node in a round robin fashion.
+
+    // Assign upto maxCount + leftovers per node based on locality.
+    unassignedWorkList = assign(unassignedWorkList, endpointIterators,  true);
+
+    // Assign upto maxCount + leftovers per node in a round robin fashion.
     assignLeftovers(unassignedWorkList, endpointIterators, false);
 
     if (unassignedWorkList.size() != 0) {
@@ -127,10 +130,12 @@ public class AssignmentCreator<T extends CompleteWork> {
    *
    * @param workList the list of work units to assign
    * @param endpointIterators the endpointIterators to assign to
-   * @param assignMinimum whether to assign only up to the minimum required
+   * @param assignMaxLeftOvers whether to assign upto maximum including leftovers
    * @return a list of unassigned work units
    */
-  private LinkedList<WorkEndpointListPair<T>> assign(List<WorkEndpointListPair<T>>
workList, Map<DrillbitEndpoint,FragIteratorWrapper> endpointIterators, boolean assignMinimum)
{
+  private LinkedList<WorkEndpointListPair<T>> assign(List<WorkEndpointListPair<T>>
workList,
+                                                     Map<DrillbitEndpoint,FragIteratorWrapper>
endpointIterators,
+                                                     boolean assignMaxLeftOvers) {
     LinkedList<WorkEndpointListPair<T>> currentUnassignedList = Lists.newLinkedList();
     outer: for (WorkEndpointListPair<T> workPair : workList) {
       List<DrillbitEndpoint> endpoints = workPair.sortedEndpoints;
@@ -139,7 +144,7 @@ public class AssignmentCreator<T extends CompleteWork> {
         if (iteratorWrapper == null) {
           continue;
         }
-        if (iteratorWrapper.count < (assignMinimum ? iteratorWrapper.minCount : iteratorWrapper.maxCount))
{
+        if (iteratorWrapper.count < (assignMaxLeftOvers ? (iteratorWrapper.maxCount +
iteratorWrapper.maxCountLeftOver) : iteratorWrapper.maxCount)) {
           Integer assignment = iteratorWrapper.iter.next();
           iteratorWrapper.count++;
           mappings.put(assignment, workPair.work);
@@ -157,9 +162,11 @@ public class AssignmentCreator<T extends CompleteWork> {
    * @param endpointIterators the endpointIterators to assign to
    * @param assignMinimum wheterh to assign the minimum amount
    */
-  private void assignLeftovers(LinkedList<WorkEndpointListPair<T>> unassignedWorkList,
Map<DrillbitEndpoint,FragIteratorWrapper> endpointIterators, boolean assignMinimum)
{
+  private void assignLeftovers(LinkedList<WorkEndpointListPair<T>> unassignedWorkList,
+                               Map<DrillbitEndpoint,FragIteratorWrapper> endpointIterators,
+                               boolean assignMinimum) {
     outer: for (FragIteratorWrapper iteratorWrapper : endpointIterators.values()) {
-      while (iteratorWrapper.count < (assignMinimum ? iteratorWrapper.minCount : iteratorWrapper.maxCount))
{
+      while (iteratorWrapper.count < (assignMinimum ? iteratorWrapper.minCount : (iteratorWrapper.maxCount
+ iteratorWrapper.maxCountLeftOver))) {
         WorkEndpointListPair<T> workPair = unassignedWorkList.poll();
         if (workPair == null) {
           break outer;
@@ -261,7 +268,7 @@ public class AssignmentCreator<T extends CompleteWork> {
     while (totalMaxCount < units.size()) {
       for (Entry<DrillbitEndpoint, FragIteratorWrapper> entry : map.entrySet()) {
         FragIteratorWrapper iteratorWrapper = entry.getValue();
-        iteratorWrapper.maxCount++;
+        iteratorWrapper.maxCountLeftOver++;
         totalMaxCount++;
         if (totalMaxCount == units.size()) {
           break;
@@ -279,6 +286,7 @@ public class AssignmentCreator<T extends CompleteWork> {
   private static class FragIteratorWrapper {
     int count = 0;
     int maxCount;
+    int maxCountLeftOver;
     int minCount;
     Iterator<Integer> iter;
   }


Mime
View raw message