drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ve...@apache.org
Subject [2/2] drill git commit: DRILL-4446: Support mandatory work assignment to endpoint requirements of operators
Date Wed, 13 Apr 2016 18:27:42 GMT
DRILL-4446: Support mandatory work assignment to endpoint requirements of operators


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/10afc708
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/10afc708
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/10afc708

Branch: refs/heads/master
Commit: 10afc708600ea9f4cb0e7c2cd981b5b1001fea0d
Parents: 9f4fff8
Author: vkorukanti <venki.korukanti@gmail.com>
Authored: Wed Mar 2 13:08:36 2016 -0800
Committer: vkorukanti <venki@dremio.com>
Committed: Wed Apr 13 10:36:21 2016 -0700

----------------------------------------------------------------------
 .../drill/exec/physical/EndpointAffinity.java   |  54 ++++-
 .../exec/physical/base/AbstractGroupScan.java   |   5 +
 .../drill/exec/physical/base/AbstractStore.java |   6 +-
 .../drill/exec/physical/base/HasAffinity.java   |   9 +-
 .../drill/exec/physical/config/Screen.java      |   7 +-
 .../planner/fragment/DistributionAffinity.java  |  63 +++++
 .../planner/fragment/FragmentParallelizer.java  |  39 ++++
 .../HardAffinityFragmentParallelizer.java       | 150 ++++++++++++
 .../planner/fragment/ParallelizationInfo.java   |   9 +-
 .../fragment/ParallelizationParameters.java     |  44 ++++
 .../planner/fragment/SimpleParallelizer.java    | 154 ++----------
 .../SoftAffinityFragmentParallelizer.java       | 167 +++++++++++++
 .../drill/exec/planner/fragment/Stats.java      |  12 +-
 .../exec/planner/fragment/StatsCollector.java   |   4 +-
 .../exec/planner/physical/DrillScanPrel.java    |   2 +-
 .../physical/HasDistributionAffinity.java       |  28 +++
 .../drill/exec/planner/physical/ScanPrel.java   |   6 +
 .../drill/exec/planner/physical/ScanPrule.java  |   5 +-
 .../drill/exec/planner/physical/ScreenPrel.java |   7 +-
 .../visitor/ExcessiveExchangeIdentifier.java    |  17 +-
 .../planner/sql/handlers/FindLimit0Visitor.java |  46 +++-
 .../drill/exec/store/sys/SystemTableScan.java   |  15 +-
 .../TestHardAffinityFragmentParallelizer.java   | 232 +++++++++++++++++++
 .../drill/exec/store/sys/TestSystemTable.java   |   6 +
 24 files changed, 937 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
index ce97a87..69f7b51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
@@ -31,6 +31,14 @@ public class EndpointAffinity {
   private final DrillbitEndpoint endpoint;
   private double affinity = 0.0d;
 
+  // Requires including this endpoint at least once? Default is not required.
+  private boolean mandatory;
+
+  /**
+   * Maximum allowed assignments for this endpoint. Default is {@link Integer#MAX_VALUE}
+   */
+  private int maxWidth = Integer.MAX_VALUE;
+
   /**
    * Create EndpointAffinity instance for given Drillbit endpoint. Affinity is initialized to 0. Affinity can be added
    * after EndpointAffinity object creation using {@link #addAffinity(double)}.
@@ -54,6 +62,22 @@ public class EndpointAffinity {
   }
 
   /**
+   * Creates EndpointAffinity instance for given DrillbitEndpoint, affinity and mandatory assignment requirement flag.
+   * @param endpoint Drillbit endpoint
+   * @param affinity Initial affinity value
+   * @param mandatory Is this endpoint requires at least one mandatory assignment?
+   * @param maxWidth Maximum allowed assignments for this endpoint.
+   */
+  public EndpointAffinity(final DrillbitEndpoint endpoint, final double affinity, final boolean mandatory,
+      final int maxWidth) {
+    Preconditions.checkArgument(maxWidth >= 1, "MaxWidth for given endpoint should be at least one.");
+    this.endpoint = endpoint;
+    this.affinity = affinity;
+    this.mandatory = mandatory;
+    this.maxWidth = maxWidth;
+  }
+
+  /**
    * Return the Drillbit endpoint in this instance.
    *
    * @return Drillbit endpoint.
@@ -87,12 +111,35 @@ public class EndpointAffinity {
   }
 
   /**
+   * Set the endpoint requires at least one assignment.
+   */
+  public void setAssignmentRequired() {
+    mandatory = true;
+  }
+
+  /**
    * Is this endpoint required to be in fragment endpoint assignment list?
    *
    * @return Returns true for mandatory assignment, false otherwise.
    */
   public boolean isAssignmentRequired() {
-    return Double.POSITIVE_INFINITY == affinity;
+    return mandatory || Double.POSITIVE_INFINITY == affinity;
+  }
+
+  /**
+   * @return Maximum allowed assignments for this endpoint.
+   */
+  public int getMaxWidth() {
+    return maxWidth;
+  }
+
+  /**
+   * Set the new max width as the minimum of the the given value and current max width.
+   * @param maxWidth
+   */
+  public void setMaxWidth(final int maxWidth) {
+    Preconditions.checkArgument(maxWidth >= 1, "MaxWidth for given endpoint should be at least one.");
+    this.maxWidth = Math.min(this.maxWidth, maxWidth);
   }
 
   @Override
@@ -128,11 +175,12 @@ public class EndpointAffinity {
     } else if (!endpoint.equals(other.endpoint)) {
       return false;
     }
-    return true;
+    return mandatory == other.mandatory;
   }
 
   @Override
   public String toString() {
-    return "EndpointAffinity [endpoint=" + TextFormat.shortDebugString(endpoint) + ", affinity=" + affinity + "]";
+    return "EndpointAffinity [endpoint=" + TextFormat.shortDebugString(endpoint) + ", affinity=" + affinity +
+        ", mandatory=" + mandatory + ", maxWidth=" + maxWidth + "]";
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 57fbd00..ac42766 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Lists;
 
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.planner.fragment.DistributionAffinity;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -162,4 +163,8 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
     return null;
   }
 
+  @Override
+  public DistributionAffinity getDistributionAffinity() {
+    return DistributionAffinity.SOFT;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractStore.java
index 41fbe57..4edda22 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractStore.java
@@ -18,7 +18,7 @@
 package org.apache.drill.exec.physical.base;
 
 
-
+import org.apache.drill.exec.planner.fragment.DistributionAffinity;
 
 public abstract class AbstractStore extends AbstractSingle implements Store, Root{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStore.class);
@@ -33,4 +33,8 @@ public abstract class AbstractStore extends AbstractSingle implements Store, Roo
   }
 
 
+  @Override
+  public DistributionAffinity getDistributionAffinity() {
+    return DistributionAffinity.SOFT;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java
index 52462db..6b19173 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.drill.exec.physical.EndpointAffinity;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.drill.exec.planner.fragment.DistributionAffinity;
 
 /**
  * Describes a physical operator that has affinity to particular nodes. Used for assignment decisions.
@@ -33,5 +34,11 @@ public interface HasAffinity extends PhysicalOperator {
    * @return List of EndpointAffinity objects.
    */
   @JsonIgnore
-  public List<EndpointAffinity> getOperatorAffinity();
+  List<EndpointAffinity> getOperatorAffinity();
+
+  /**
+   * Get distribution affinity which describes the parallelization strategy of the operator.
+   */
+  @JsonIgnore
+  DistributionAffinity getDistributionAffinity();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
index 97c2405..4eda79e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.physical.base.AbstractStore;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.planner.fragment.DistributionAffinity;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
@@ -48,7 +49,7 @@ public class Screen extends AbstractStore {
 
   @Override
   public List<EndpointAffinity> getOperatorAffinity() {
-    return Collections.singletonList(new EndpointAffinity(endpoint, Double.POSITIVE_INFINITY));
+    return Collections.singletonList(new EndpointAffinity(endpoint, 1, true, /* maxWidth = */ 1));
   }
 
   @Override
@@ -102,4 +103,8 @@ public class Screen extends AbstractStore {
     return CoreOperatorType.SCREEN_VALUE;
   }
 
+  @Override
+  public DistributionAffinity getDistributionAffinity() {
+    return DistributionAffinity.HARD;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributionAffinity.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributionAffinity.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributionAffinity.java
new file mode 100644
index 0000000..d26d413
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributionAffinity.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+/**
+ * Describes an operator's endpoint assignment requirements. Ordering is from no assignment requirement to mandatory
+ * assignment requirements. Changes/new addition should keep the order of increasing restrictive assignment requirement.
+ */
+public enum DistributionAffinity {
+  /**
+   * No affinity to any endpoints. Operator can run on any endpoint.
+   */
+  NONE(SoftAffinityFragmentParallelizer.INSTANCE),
+
+  /**
+   * Operator has soft distribution affinity to one or more endpoints. Operator performs better when fragments are
+   * assigned to the endpoints with affinity, but not a mandatory requirement.
+   */
+  SOFT(SoftAffinityFragmentParallelizer.INSTANCE),
+
+  /**
+   * Hard distribution affinity to one or more endpoints. Fragments having the operator must be scheduled on the nodes
+   * with affinity.
+   */
+  HARD(HardAffinityFragmentParallelizer.INSTANCE);
+
+  private final FragmentParallelizer fragmentParallelizer;
+
+  DistributionAffinity(final FragmentParallelizer fragmentParallelizer) {
+    this.fragmentParallelizer = fragmentParallelizer;
+  }
+
+  /**
+   * @return {@link FragmentParallelizer} implementation.
+   */
+  public FragmentParallelizer getFragmentParallelizer() {
+    return fragmentParallelizer;
+  }
+
+  /**
+   * Is the current DistributionAffinity less restrictive than the given DistributionAffinity?
+   * @param distributionAffinity
+   * @return
+   */
+  public boolean isLessRestrictiveThan(final DistributionAffinity distributionAffinity) {
+    return ordinal() < distributionAffinity.ordinal();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/FragmentParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/FragmentParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/FragmentParallelizer.java
new file mode 100644
index 0000000..5237098
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/FragmentParallelizer.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import java.util.Collection;
+
+/**
+ * Generic interface to provide different parallelization strategies for MajorFragments.
+ */
+public interface FragmentParallelizer {
+  /**
+   * Parallelize the given fragment.
+   *
+   * @param fragment
+   * @param parameters
+   * @param activeEndpoints
+   * @throws PhysicalOperatorSetupException
+   */
+  void parallelizeFragment(final Wrapper fragment, final ParallelizationParameters parameters,
+      final Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException;
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
new file mode 100644
index 0000000..550dcb2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Implementation of {@link FragmentParallelizer} where fragment requires running on a given set of endpoints. Width
+ * per node is depended on the affinity to the endpoint and total width (calculated using costs)
+ */
+public class HardAffinityFragmentParallelizer implements FragmentParallelizer {
+  private static final Logger logger = org.slf4j.LoggerFactory.getLogger(HardAffinityFragmentParallelizer.class);
+
+  public static final HardAffinityFragmentParallelizer INSTANCE = new HardAffinityFragmentParallelizer();
+
+  private static String EOL = System.getProperty("line.separator");
+
+  private HardAffinityFragmentParallelizer() { /* singleton */}
+
+  @Override
+  public void parallelizeFragment(final Wrapper fragmentWrapper, final ParallelizationParameters parameters,
+      final Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
+
+    final Stats stats = fragmentWrapper.getStats();
+    final ParallelizationInfo pInfo = stats.getParallelizationInfo();
+
+    int totalMaxWidth = 0;
+
+    // Go through the affinity map and extract the endpoints that have mandatory assignment requirement
+    final Map<DrillbitEndpoint, EndpointAffinity> endpointPool = Maps.newHashMap();
+    for(Entry<DrillbitEndpoint, EndpointAffinity> entry : pInfo.getEndpointAffinityMap().entrySet()) {
+      if (entry.getValue().isAssignmentRequired()) {
+        endpointPool.put(entry.getKey(), entry.getValue());
+
+        // Limit the max width of the endpoint to allowed max width.
+        totalMaxWidth += Math.min(parameters.getMaxWidthPerNode(), entry.getValue().getMaxWidth());
+        if (totalMaxWidth < 0) {
+          // If the totalWidth overflows, just keep it at the max value.
+          totalMaxWidth = Integer.MAX_VALUE;
+        }
+      }
+    }
+
+    // Step 1: Find the width taking into various parameters
+    // 1.1. Find the parallelization based on cost. Use max cost of all operators in this fragment; this is consistent
+    //      with the calculation that ExcessiveExchangeRemover uses.
+    int width = (int) Math.ceil(stats.getMaxCost() / parameters.getSliceTarget());
+
+    // 1.2. Make sure the width is at least the number of endpoints that require an assignment
+    width = Math.max(endpointPool.size(), width);
+
+    // 1.3. Cap the parallelization width by fragment level width limit and system level per query width limit
+    width = Math.max(1, Math.min(width, pInfo.getMaxWidth()));
+    checkOrThrow(endpointPool.size() <= width, logger,
+        "Number of mandatory endpoints ({}) that require an assignment is more than the allowed fragment max " +
+            "width ({}).", endpointPool.size(), pInfo.getMaxWidth());
+
+    // 1.4 Cap the parallelization width by global max query width
+    width = Math.max(1, Math.min(width, parameters.getMaxGlobalWidth()));
+    checkOrThrow(endpointPool.size() <= width, logger,
+        "Number of mandatory endpoints ({}) that require an assignment is more than the allowed global query " +
+            "width ({}).", endpointPool.size(), parameters.getMaxGlobalWidth());
+
+    // 1.5 Cap the parallelization width by max allowed parallelization per node
+    width = Math.max(1, Math.min(width, endpointPool.size()*parameters.getMaxWidthPerNode()));
+
+    // 1.6 Cap the parallelization width by total of max allowed width per node. The reason is if we the width is more,
+    // we end up allocating more work units to one or more endpoints that don't have those many work units.
+    width = Math.min(totalMaxWidth, width);
+
+    // Step 2: Select the endpoints
+    final Map<DrillbitEndpoint, Integer> endpoints = Maps.newHashMap();
+
+    // 2.1 First add each endpoint from the pool once so that the mandatory assignment requirement is fulfilled.
+    for(Entry<DrillbitEndpoint, EndpointAffinity> entry : endpointPool.entrySet()) {
+      endpoints.put(entry.getKey(), 1);
+    }
+    int totalAssigned = endpoints.size();
+
+    // 2.2 Assign the remaining slots to endpoints proportional to the affinity of each endpoint
+    int remainingSlots = width - endpoints.size();
+    while (remainingSlots > 0) {
+      for(EndpointAffinity epAf : endpointPool.values()) {
+        final int moreAllocation = (int) Math.ceil(epAf.getAffinity() * remainingSlots);
+        int currentAssignments = endpoints.get(epAf.getEndpoint());
+        for(int i=0;
+            i < moreAllocation &&
+                totalAssigned < width &&
+                currentAssignments < parameters.getMaxWidthPerNode() &&
+                currentAssignments < epAf.getMaxWidth();
+            i++) {
+          totalAssigned++;
+          currentAssignments++;
+        }
+        endpoints.put(epAf.getEndpoint(), currentAssignments);
+      }
+      final int previousRemainingSlots = remainingSlots;
+      remainingSlots = width - totalAssigned;
+      if (previousRemainingSlots == remainingSlots) {
+        logger.error("Can't parallelize fragment: " +
+            "Every mandatory node has exhausted the maximum width per node limit." + EOL +
+            "Endpoint pool: {}" + EOL + "Assignment so far: {}" + EOL + "Width: {}", endpointPool, endpoints, width);
+        throw new PhysicalOperatorSetupException("Can not parallelize fragment.");
+      }
+    }
+
+    final List<DrillbitEndpoint> assignedEndpoints = Lists.newArrayList();
+    for(Entry<DrillbitEndpoint, Integer> entry : endpoints.entrySet()) {
+      for(int i=0; i < entry.getValue(); i++) {
+        assignedEndpoints.add(entry.getKey());
+      }
+    }
+
+    fragmentWrapper.setWidth(width);
+    fragmentWrapper.assignEndpoints(assignedEndpoints);
+  }
+
+  private static void checkOrThrow(final boolean expr, final Logger logger, final String errMsg, Object... args)
+      throws PhysicalOperatorSetupException {
+    if (!expr) {
+      logger.error(errMsg, args);
+      throw new PhysicalOperatorSetupException("Can not parallelize fragment.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java
index 8e775af..ffa843c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java
@@ -121,8 +121,13 @@ public class ParallelizationInfo {
 
     // Helper method to add the given EndpointAffinity to the global affinity map
     private void addEndpointAffinity(EndpointAffinity epAff) {
-      if (affinityMap.containsKey(epAff.getEndpoint())) {
-        affinityMap.get(epAff.getEndpoint()).addAffinity(epAff.getAffinity());
+      final EndpointAffinity epAffAgg = affinityMap.get(epAff.getEndpoint());
+      if (epAffAgg != null) {
+        epAffAgg.addAffinity(epAff.getAffinity());
+        if (epAff.isAssignmentRequired()) {
+          epAffAgg.setAssignmentRequired();
+        }
+        epAffAgg.setMaxWidth(epAff.getMaxWidth());
       } else {
         affinityMap.put(epAff.getEndpoint(), epAff);
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationParameters.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationParameters.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationParameters.java
new file mode 100644
index 0000000..ea5d9e8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationParameters.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+/**
+ * Interface to implement for passing parameters to {@link FragmentParallelizer}.
+ */
+public interface ParallelizationParameters {
+
+  /**
+   * @return Configured max width per slice of work.
+   */
+  long getSliceTarget();
+
+  /**
+   * @return Configured maximum allowed number of parallelization units per node.
+   */
+  int getMaxWidthPerNode();
+
+  /**
+   * @return Configured maximum allowed number of parallelization units per all nodes in the cluster.
+   */
+  int getMaxGlobalWidth();
+
+  /**
+   * @return Factor by which a node with endpoint affinity will be favored while creating assignment.
+   */
+  double getAffinityFactor();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index e04a8a2..9aad9a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -19,19 +19,13 @@ package org.apache.drill.exec.planner.fragment;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.util.DrillStringUtils;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
 import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
@@ -57,9 +51,7 @@ import org.apache.drill.exec.work.foreman.ForemanSetupException;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
 
 /**
@@ -68,17 +60,9 @@ import com.google.common.collect.Sets;
  * parallelization for each major fragment will be determined.  Once the amount of parallelization is done, assignment
  * is done based on round robin assignment ordered by operator affinity (locality) to available execution Drillbits.
  */
-public class SimpleParallelizer {
+public class SimpleParallelizer implements ParallelizationParameters {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleParallelizer.class);
 
-  private static final Ordering<EndpointAffinity> ENDPOINT_AFFINITY_ORDERING = Ordering.from(new Comparator<EndpointAffinity>() {
-    @Override
-    public int compare(EndpointAffinity o1, EndpointAffinity o2) {
-      // Sort in descending order of affinity values
-      return Double.compare(o2.getAffinity(), o1.getAffinity());
-    }
-  });
-
   private final long parallelizationThreshold;
   private final int maxWidthPerNode;
   private final int maxGlobalWidth;
@@ -100,6 +84,25 @@ public class SimpleParallelizer {
     this.affinityFactor = affinityFactor;
   }
 
+  @Override
+  public long getSliceTarget() {
+    return parallelizationThreshold;
+  }
+
+  @Override
+  public int getMaxWidthPerNode() {
+    return maxWidthPerNode;
+  }
+
+  @Override
+  public int getMaxGlobalWidth() {
+    return maxGlobalWidth;
+  }
+
+  @Override
+  public double getAffinityFactor() {
+    return affinityFactor;
+  }
 
   /**
    * Generate a set of assigned fragments based on the provided fragment tree. Do not allow parallelization stages
@@ -209,120 +212,13 @@ public class SimpleParallelizer {
       }
     }
 
-    Fragment fragment = fragmentWrapper.getNode();
-
-    // Step 1: Find stats. Stats include various factors including cost of physical operators, parallelizability of
+    // Find stats. Stats include various factors including cost of physical operators, parallelizability of
     // work in physical operator and affinity of physical operator to certain nodes.
-    fragment.getRoot().accept(new StatsCollector(planningSet), fragmentWrapper);
-
-    // Step 2: Find the parallelization width of fragment
-
-    final Stats stats = fragmentWrapper.getStats();
-    final ParallelizationInfo parallelizationInfo = stats.getParallelizationInfo();
-
-    // 2.1 Use max cost of all operators in this fragment; this is consistent with the
-    //     calculation that ExcessiveExchangeRemover uses
-    // 2.1. Find the parallelization based on cost
-    int width = (int) Math.ceil(stats.getMaxCost() / parallelizationThreshold);
-
-    // 2.2. Cap the parallelization width by fragment level width limit and system level per query width limit
-    width = Math.min(width, Math.min(parallelizationInfo.getMaxWidth(), maxGlobalWidth));
-
-    // 2.3. Cap the parallelization width by system level per node width limit
-    width = Math.min(width, maxWidthPerNode * activeEndpoints.size());
-
-    // 2.4. Make sure width is at least the min width enforced by operators
-    width = Math.max(parallelizationInfo.getMinWidth(), width);
-
-    // 2.4. Make sure width is at most the max width enforced by operators
-    width = Math.min(parallelizationInfo.getMaxWidth(), width);
-
-    // 2.5 Finally make sure the width is at least one
-    width = Math.max(1, width);
-
-    fragmentWrapper.setWidth(width);
-
-    List<DrillbitEndpoint> assignedEndpoints = findEndpoints(activeEndpoints,
-        parallelizationInfo.getEndpointAffinityMap(), fragmentWrapper.getWidth());
-    fragmentWrapper.assignEndpoints(assignedEndpoints);
-  }
-
-  // Assign endpoints based on the given endpoint list, affinity map and width.
-  private List<DrillbitEndpoint> findEndpoints(Collection<DrillbitEndpoint> activeEndpoints,
-      Map<DrillbitEndpoint, EndpointAffinity> endpointAffinityMap, final int width)
-      throws PhysicalOperatorSetupException {
-
-    final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
-
-    if (endpointAffinityMap.size() > 0) {
-      // Get EndpointAffinity list sorted in descending order of affinity values
-      List<EndpointAffinity> sortedAffinityList = ENDPOINT_AFFINITY_ORDERING.immutableSortedCopy(endpointAffinityMap.values());
-
-      // Find the number of mandatory nodes (nodes with +infinity affinity).
-      int numRequiredNodes = 0;
-      for(EndpointAffinity ep : sortedAffinityList) {
-        if (ep.isAssignmentRequired()) {
-          numRequiredNodes++;
-        } else {
-          // As the list is sorted in descending order of affinities, we don't need to go beyond the first occurrance
-          // of non-mandatory node
-          break;
-        }
-      }
-
-      if (width < numRequiredNodes) {
-        throw new PhysicalOperatorSetupException("Can not parallelize the fragment as the parallelization width (" + width + ") is " +
-            "less than the number of mandatory nodes (" + numRequiredNodes + " nodes with +INFINITE affinity).");
-      }
-
-      // Find the maximum number of slots which should go to endpoints with affinity (See DRILL-825 for details)
-      int affinedSlots =
-          Math.max(1, (int) (affinityFactor * width / activeEndpoints.size())) * sortedAffinityList.size();
-
-      // Make sure affined slots is at least the number of mandatory nodes
-      affinedSlots = Math.max(affinedSlots, numRequiredNodes);
-
-      // Cap the affined slots to max parallelization width
-      affinedSlots = Math.min(affinedSlots, width);
-
-      Iterator<EndpointAffinity> affinedEPItr = Iterators.cycle(sortedAffinityList);
-
-      // Keep adding until we have selected "affinedSlots" number of endpoints.
-      while(endpoints.size() < affinedSlots) {
-        EndpointAffinity ea = affinedEPItr.next();
-        endpoints.add(ea.getEndpoint());
-      }
-    }
-
-    // add remaining endpoints if required
-    if (endpoints.size() < width) {
-      // Get a list of endpoints that are not part of the affinity endpoint list
-      List<DrillbitEndpoint> endpointsWithNoAffinity;
-      final Set<DrillbitEndpoint> endpointsWithAffinity = endpointAffinityMap.keySet();
-
-      if (endpointAffinityMap.size() > 0) {
-        endpointsWithNoAffinity = Lists.newArrayList();
-        for (DrillbitEndpoint ep : activeEndpoints) {
-          if (!endpointsWithAffinity.contains(ep)) {
-            endpointsWithNoAffinity.add(ep);
-          }
-        }
-      } else {
-        endpointsWithNoAffinity = Lists.newArrayList(activeEndpoints); // Need to create a copy instead of an
-        // immutable copy, because we need to shuffle the list (next statement) and Collections.shuffle() doesn't
-        // support immutable copy as input.
-      }
-
-      // round robin with random start.
-      Collections.shuffle(endpointsWithNoAffinity, ThreadLocalRandom.current());
-      Iterator<DrillbitEndpoint> otherEPItr =
-          Iterators.cycle(endpointsWithNoAffinity.size() > 0 ? endpointsWithNoAffinity : endpointsWithAffinity);
-      while (endpoints.size() < width) {
-        endpoints.add(otherEPItr.next());
-      }
-    }
+    fragmentWrapper.getNode().getRoot().accept(new StatsCollector(planningSet), fragmentWrapper);
 
-    return endpoints;
+    fragmentWrapper.getStats().getDistributionAffinity()
+        .getFragmentParallelizer()
+        .parallelizeFragment(fragmentWrapper, this, activeEndpoints);
   }
 
   private QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
new file mode 100644
index 0000000..1ebed86
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Ordering;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Implementation of {@link FragmentParallelizer} where fragment has zero or more endpoints with affinities. Width
+ * per node is depended on the affinity to the endpoint and total width (calculated using costs). Based on various
+ * factors endpoints which have no affinity can be assigned to run the fragments.
+ */
+public class SoftAffinityFragmentParallelizer implements FragmentParallelizer {
+  public static final SoftAffinityFragmentParallelizer INSTANCE = new SoftAffinityFragmentParallelizer();
+
+  private static final Ordering<EndpointAffinity> ENDPOINT_AFFINITY_ORDERING =
+      Ordering.from(new Comparator<EndpointAffinity>() {
+        @Override
+        public int compare(EndpointAffinity o1, EndpointAffinity o2) {
+          // Sort in descending order of affinity values
+          return Double.compare(o2.getAffinity(), o1.getAffinity());
+        }
+      });
+
+  @Override
+  public void parallelizeFragment(final Wrapper fragmentWrapper, final ParallelizationParameters parameters,
+      final Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
+    final Fragment fragment = fragmentWrapper.getNode();
+
+    // Find the parallelization width of fragment
+    final Stats stats = fragmentWrapper.getStats();
+    final ParallelizationInfo parallelizationInfo = stats.getParallelizationInfo();
+
+    // 1. Find the parallelization based on cost. Use max cost of all operators in this fragment; this is consistent
+    //    with the calculation that ExcessiveExchangeRemover uses.
+    int width = (int) Math.ceil(stats.getMaxCost() / parameters.getSliceTarget());
+
+    // 2. Cap the parallelization width by fragment level width limit and system level per query width limit
+    width = Math.min(width, Math.min(parallelizationInfo.getMaxWidth(), parameters.getMaxGlobalWidth()));
+
+    // 3. Cap the parallelization width by system level per node width limit
+    width = Math.min(width, parameters.getMaxWidthPerNode() * activeEndpoints.size());
+
+    // 4. Make sure width is at least the min width enforced by operators
+    width = Math.max(parallelizationInfo.getMinWidth(), width);
+
+    // 4. Make sure width is at most the max width enforced by operators
+    width = Math.min(parallelizationInfo.getMaxWidth(), width);
+
+    // 5 Finally make sure the width is at least one
+    width = Math.max(1, width);
+
+    fragmentWrapper.setWidth(width);
+
+    final List<DrillbitEndpoint> assignedEndpoints = findEndpoints(activeEndpoints,
+        parallelizationInfo.getEndpointAffinityMap(), fragmentWrapper.getWidth(), parameters);
+
+    fragmentWrapper.assignEndpoints(assignedEndpoints);
+  }
+
+  // Assign endpoints based on the given endpoint list, affinity map and width.
+  private List<DrillbitEndpoint> findEndpoints(final Collection<DrillbitEndpoint> activeEndpoints,
+      final Map<DrillbitEndpoint, EndpointAffinity> endpointAffinityMap, final int width,
+      final ParallelizationParameters parameters)
+    throws PhysicalOperatorSetupException {
+
+    final List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+
+    if (endpointAffinityMap.size() > 0) {
+      // Get EndpointAffinity list sorted in descending order of affinity values
+      List<EndpointAffinity> sortedAffinityList = ENDPOINT_AFFINITY_ORDERING.immutableSortedCopy(endpointAffinityMap.values());
+
+      // Find the number of mandatory nodes (nodes with +infinity affinity).
+      int numRequiredNodes = 0;
+      for(EndpointAffinity ep : sortedAffinityList) {
+        if (ep.isAssignmentRequired()) {
+          numRequiredNodes++;
+        } else {
+          // As the list is sorted in descending order of affinities, we don't need to go beyond the first occurrance
+          // of non-mandatory node
+          break;
+        }
+      }
+
+      if (width < numRequiredNodes) {
+        throw new PhysicalOperatorSetupException("Can not parallelize the fragment as the parallelization width (" + width + ") is " +
+            "less than the number of mandatory nodes (" + numRequiredNodes + " nodes with +INFINITE affinity).");
+      }
+
+      // Find the maximum number of slots which should go to endpoints with affinity (See DRILL-825 for details)
+      int affinedSlots =
+          Math.max(1, (int) (parameters.getAffinityFactor() * width / activeEndpoints.size())) * sortedAffinityList.size();
+
+      // Make sure affined slots is at least the number of mandatory nodes
+      affinedSlots = Math.max(affinedSlots, numRequiredNodes);
+
+      // Cap the affined slots to max parallelization width
+      affinedSlots = Math.min(affinedSlots, width);
+
+      Iterator<EndpointAffinity> affinedEPItr = Iterators.cycle(sortedAffinityList);
+
+      // Keep adding until we have selected "affinedSlots" number of endpoints.
+      while(endpoints.size() < affinedSlots) {
+        EndpointAffinity ea = affinedEPItr.next();
+        endpoints.add(ea.getEndpoint());
+      }
+    }
+
+    // add remaining endpoints if required
+    if (endpoints.size() < width) {
+      // Get a list of endpoints that are not part of the affinity endpoint list
+      List<DrillbitEndpoint> endpointsWithNoAffinity;
+      final Set<DrillbitEndpoint> endpointsWithAffinity = endpointAffinityMap.keySet();
+
+      if (endpointAffinityMap.size() > 0) {
+        endpointsWithNoAffinity = Lists.newArrayList();
+        for (DrillbitEndpoint ep : activeEndpoints) {
+          if (!endpointsWithAffinity.contains(ep)) {
+            endpointsWithNoAffinity.add(ep);
+          }
+        }
+      } else {
+        endpointsWithNoAffinity = Lists.newArrayList(activeEndpoints); // Need to create a copy instead of an
+        // immutable copy, because we need to shuffle the list (next statement) and Collections.shuffle() doesn't
+        // support immutable copy as input.
+      }
+
+      // round robin with random start.
+      Collections.shuffle(endpointsWithNoAffinity, ThreadLocalRandom.current());
+      Iterator<DrillbitEndpoint> otherEPItr =
+          Iterators.cycle(endpointsWithNoAffinity.size() > 0 ? endpointsWithNoAffinity : endpointsWithAffinity);
+      while (endpoints.size() < width) {
+        endpoints.add(otherEPItr.next());
+      }
+    }
+
+    return endpoints;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
index b5b8ce4..03f16b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.planner.fragment;
 
-
 import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.planner.fragment.ParallelizationInfo.ParallelizationInfoCollector;
 
@@ -26,6 +25,7 @@ import java.util.List;
 public class Stats {
   private final ParallelizationInfoCollector collector = new ParallelizationInfoCollector();
   private double maxCost = 0.0;
+  private DistributionAffinity distributionAffinity = DistributionAffinity.NONE;
 
   public void addParallelizationInfo(ParallelizationInfo parallelizationInfo) {
     collector.add(parallelizationInfo);
@@ -43,6 +43,12 @@ public class Stats {
     collector.addMinWidth(minWidth);
   }
 
+  public void setDistributionAffinity(final DistributionAffinity distributionAffinity) {
+    if (this.distributionAffinity.isLessRestrictiveThan(distributionAffinity)) {
+      this.distributionAffinity = distributionAffinity;
+    }
+  }
+
   public void addEndpointAffinities(List<EndpointAffinity> endpointAffinityList) {
     collector.addEndpointAffinities(endpointAffinityList);
   }
@@ -59,4 +65,8 @@ public class Stats {
   public double getMaxCost() {
     return maxCost;
   }
+
+  public DistributionAffinity getDistributionAffinity() {
+    return distributionAffinity;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
index 4f4e0b5..74031e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
@@ -101,7 +101,9 @@ public class StatsCollector extends AbstractOpWrapperVisitor<Void, RuntimeExcept
   public Void visitOp(PhysicalOperator op, Wrapper wrapper) {
     final Stats stats = wrapper.getStats();
     if (op instanceof HasAffinity) {
-      stats.addEndpointAffinities(((HasAffinity)op).getOperatorAffinity());
+      final HasAffinity hasAffinity = (HasAffinity)op;
+      stats.addEndpointAffinities(hasAffinity.getOperatorAffinity());
+      stats.setDistributionAffinity(hasAffinity.getDistributionAffinity());
     }
     stats.addCost(op.getCost());
     for (PhysicalOperator child : op) {

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillScanPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillScanPrel.java
index a452bac..ae236c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillScanPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillScanPrel.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.planner.physical;
 
 import org.apache.drill.exec.physical.base.GroupScan;
 
-public interface DrillScanPrel extends Prel{
+public interface DrillScanPrel extends Prel, HasDistributionAffinity{
 
   public GroupScan getGroupScan();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HasDistributionAffinity.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HasDistributionAffinity.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HasDistributionAffinity.java
new file mode 100644
index 0000000..42eed19
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HasDistributionAffinity.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.drill.exec.planner.fragment.DistributionAffinity;
+
+/**
+ * Implement this interface if a Prel has distribution affinity requirements.
+ */
+public interface HasDistributionAffinity {
+
+  DistributionAffinity getDistributionAffinity();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
index b100d90..0d42a69 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
+import org.apache.drill.exec.planner.fragment.DistributionAffinity;
 import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.calcite.rel.AbstractRelNode;
@@ -161,4 +162,9 @@ public class ScanPrel extends AbstractRelNode implements DrillScanPrel {
   public boolean needsFinalColumnReordering() {
     return true;
   }
+
+  @Override
+  public DistributionAffinity getDistributionAffinity() {
+    return groupScan.getDistributionAffinity();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
index ee2bde5..d74edf1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.planner.physical;
 
 import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.fragment.DistributionAffinity;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.calcite.plan.RelOptRule;
@@ -37,7 +38,9 @@ public class ScanPrule extends Prule{
 
     GroupScan groupScan = scan.getGroupScan();
 
-    DrillDistributionTrait partition = groupScan.getMaxParallelizationWidth() > 1 ? DrillDistributionTrait.RANDOM_DISTRIBUTED : DrillDistributionTrait.SINGLETON;
+    DrillDistributionTrait partition =
+        (groupScan.getMaxParallelizationWidth() > 1 || groupScan.getDistributionAffinity() == DistributionAffinity.HARD)
+            ? DrillDistributionTrait.RANDOM_DISTRIBUTED : DrillDistributionTrait.SINGLETON;
 
     final RelTraitSet traits = scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(partition);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
index ef77dff..f2d10d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
@@ -24,13 +24,14 @@ import java.util.List;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.planner.common.DrillScreenRelBase;
+import org.apache.drill.exec.planner.fragment.DistributionAffinity;
 import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 
-public class ScreenPrel extends DrillScreenRelBase implements Prel {
+public class ScreenPrel extends DrillScreenRelBase implements Prel, HasDistributionAffinity {
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenPrel.class);
 
@@ -79,4 +80,8 @@ public class ScreenPrel extends DrillScreenRelBase implements Prel {
     return false;
   }
 
+  @Override
+  public DistributionAffinity getDistributionAffinity() {
+    return DistributionAffinity.HARD;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
index 7686d56..2e95e7c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.physical.visitor;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.drill.exec.planner.fragment.DistributionAffinity;
 import org.apache.drill.exec.planner.physical.ExchangePrel;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
@@ -48,7 +49,10 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
     MajorFragmentStat newFrag = new MajorFragmentStat();
     Prel newChild = ((Prel) prel.getInput()).accept(this, newFrag);
 
-    if (newFrag.isSingular() && parent.isSingular()) {
+    if (newFrag.isSingular() && parent.isSingular() &&
+        // if one of them has strict distribution or none, we can remove the exchange
+        (!newFrag.isDistributionStrict() || !parent.isDistributionStrict())
+        ) {
       return newChild;
     } else {
       return (Prel) prel.copy(prel.getTraitSet(), Collections.singletonList((RelNode) newChild));
@@ -57,7 +61,7 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
 
   @Override
   public Prel visitScreen(ScreenPrel prel, MajorFragmentStat s) throws RuntimeException {
-    s.setSingular();
+    s.addScreen(prel);
     RelNode child = ((Prel)prel.getInput()).accept(this, s);
     return (Prel) prel.copy(prel.getTraitSet(), Collections.singletonList(child));
   }
@@ -92,6 +96,7 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
   }
 
   class MajorFragmentStat {
+    private DistributionAffinity distributionAffinity = DistributionAffinity.NONE;
     private double maxRows = 0d;
     private int maxWidth = Integer.MAX_VALUE;
     private boolean isMultiSubScan = false;
@@ -100,13 +105,15 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
       maxRows = Math.max(prel.getRows(), maxRows);
     }
 
-    public void setSingular() {
+    public void addScreen(ScreenPrel screenPrel) {
       maxWidth = 1;
+      distributionAffinity = screenPrel.getDistributionAffinity();
     }
 
     public void addScan(ScanPrel prel) {
       maxWidth = Math.min(maxWidth, prel.getGroupScan().getMaxParallelizationWidth());
       isMultiSubScan = prel.getGroupScan().getMinParallelizationWidth() > 1;
+      distributionAffinity = prel.getDistributionAffinity();
       add(prel);
     }
 
@@ -124,6 +131,10 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
       }
       return w == 1;
     }
+
+    public boolean isDistributionStrict() {
+      return distributionAffinity == DistributionAffinity.HARD;
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
index fa1fe07..e7460b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalIntersect;
 import org.apache.calcite.rel.logical.LogicalJoin;
@@ -34,6 +35,7 @@ import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.exception.SchemaChangeException;
@@ -41,14 +43,18 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.planner.fragment.DistributionAffinity;
 import org.apache.drill.exec.planner.logical.DrillDirectScanRel;
 import org.apache.drill.exec.planner.logical.DrillLimitRel;
 import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DrillTranslatableTable;
 import org.apache.drill.exec.planner.sql.TypeInferenceUtils;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.direct.DirectGroupScan;
 
+import java.io.IOException;
 import java.util.List;
 
 /**
@@ -114,7 +120,15 @@ public class FindLimit0Visitor extends RelShuttleImpl {
   public static boolean containsLimit0(RelNode rel) {
     FindLimit0Visitor visitor = new FindLimit0Visitor();
     rel.accept(visitor);
-    return visitor.isContains();
+
+    if (!visitor.isContains()) {
+      return false;
+    }
+
+    final FindHardDistributionScans hdVisitor = new FindHardDistributionScans();
+    rel.accept(hdVisitor);
+    // Can't optimize limit 0 if the query contains a table which has hard distribution requirement.
+    return !hdVisitor.contains();
   }
 
   private boolean contains = false;
@@ -200,7 +214,7 @@ public class FindLimit0Visitor extends RelShuttleImpl {
     public final List<TypeProtos.DataMode> dataModes;
 
     public RelDataTypeReader(List<String> columnNames, List<SqlTypeName> columnTypes,
-                             List<TypeProtos.DataMode> dataModes) {
+        List<TypeProtos.DataMode> dataModes) {
       Preconditions.checkArgument(columnNames.size() == columnTypes.size() &&
           columnTypes.size() == dataModes.size());
       this.columnNames = columnNames;
@@ -234,4 +248,32 @@ public class FindLimit0Visitor extends RelShuttleImpl {
     public void close() throws Exception {
     }
   }
+  /**
+   * Visitor to scan the RelNode tree and find if it contains any Scans that require hard distribution requirements.
+   */
+  private static class FindHardDistributionScans extends RelShuttleImpl {
+    private boolean contains;
+
+    @Override
+    public RelNode visit(TableScan scan) {
+      DrillTable unwrap;
+      unwrap = scan.getTable().unwrap(DrillTable.class);
+      if (unwrap == null) {
+        unwrap = scan.getTable().unwrap(DrillTranslatableTable.class).getDrillTable();
+      }
+
+      try {
+        if (unwrap.getGroupScan().getDistributionAffinity() == DistributionAffinity.HARD) {
+          contains = true;
+        }
+      } catch (final IOException e) {
+        throw new DrillRuntimeException("Failed to get GroupScan from table.");
+      }
+      return scan;
+    }
+
+    public boolean contains() {
+      return contains;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
index 962e2c6..b77ed23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.sys;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
@@ -36,6 +37,7 @@ import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.planner.fragment.DistributionAffinity;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.store.StoragePluginRegistry;
@@ -121,7 +123,7 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan {
 
   /**
    * If distributed, the scan needs to happen on every node. Since width is enforced, the number of fragments equals
-   * number of Drillbits. And here we set, endpoint affinities to Double.POSITIVE_INFINITY to ensure every
+   * number of Drillbits. And here we set, each endpoint as mandatory assignment required to ensure every
    * Drillbit executes a fragment.
    * @return the Drillbit endpoint affinities
    */
@@ -129,8 +131,10 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan {
   public List<EndpointAffinity> getOperatorAffinity() {
     if (table.isDistributed()) {
       final List<EndpointAffinity> affinities = Lists.newArrayList();
-      for (final DrillbitEndpoint endpoint : plugin.getContext().getBits()) {
-        affinities.add(new EndpointAffinity(endpoint, Double.POSITIVE_INFINITY));
+      final Collection<DrillbitEndpoint> bits = plugin.getContext().getBits();
+      final double affinityPerNode = 1d / bits.size();
+      for (final DrillbitEndpoint endpoint : bits) {
+        affinities.add(new EndpointAffinity(endpoint, affinityPerNode, true, /* maxWidth = */ 1));
       }
       return affinities;
     } else {
@@ -139,6 +143,11 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan {
   }
 
   @Override
+  public DistributionAffinity getDistributionAffinity() {
+    return table.isDistributed() ? DistributionAffinity.HARD : DistributionAffinity.SOFT;
+  }
+
+  @Override
   public GroupScan clone(List<SchemaPath> columns) {
     return this;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/test/java/org/apache/drill/exec/planner/fragment/TestHardAffinityFragmentParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/fragment/TestHardAffinityFragmentParallelizer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/fragment/TestHardAffinityFragmentParallelizer.java
new file mode 100644
index 0000000..a58404b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/fragment/TestHardAffinityFragmentParallelizer.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.ImmutableList;
+import mockit.Mocked;
+import mockit.NonStrictExpectations;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static java.lang.Integer.MAX_VALUE;
+import static org.apache.drill.exec.ExecConstants.SLICE_TARGET_DEFAULT;
+import static org.apache.drill.exec.planner.fragment.HardAffinityFragmentParallelizer.INSTANCE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestHardAffinityFragmentParallelizer {
+
+  // Create a set of test endpoints
+  private static final DrillbitEndpoint N1_EP1 = newDrillbitEndpoint("node1", 30010);
+  private static final DrillbitEndpoint N1_EP2 = newDrillbitEndpoint("node1", 30011);
+  private static final DrillbitEndpoint N2_EP1 = newDrillbitEndpoint("node2", 30010);
+  private static final DrillbitEndpoint N2_EP2 = newDrillbitEndpoint("node2", 30011);
+  private static final DrillbitEndpoint N3_EP1 = newDrillbitEndpoint("node3", 30010);
+  private static final DrillbitEndpoint N3_EP2 = newDrillbitEndpoint("node3", 30011);
+  private static final DrillbitEndpoint N4_EP2 = newDrillbitEndpoint("node4", 30011);
+
+  @Mocked private Fragment fragment;
+  @Mocked private PhysicalOperator root;
+
+  private static final DrillbitEndpoint newDrillbitEndpoint(String address, int port) {
+    return DrillbitEndpoint.newBuilder().setAddress(address).setControlPort(port).build();
+  }
+
+  private static final ParallelizationParameters newParameters(final long threshold, final int maxWidthPerNode,
+      final int maxGlobalWidth) {
+    return new ParallelizationParameters() {
+      @Override
+      public long getSliceTarget() {
+        return threshold;
+      }
+
+      @Override
+      public int getMaxWidthPerNode() {
+        return maxWidthPerNode;
+      }
+
+      @Override
+      public int getMaxGlobalWidth() {
+        return maxGlobalWidth;
+      }
+
+      /**
+       * {@link HardAffinityFragmentParallelizer} doesn't use affinity factor.
+       * @return
+       */
+      @Override
+      public double getAffinityFactor() {
+        return 0.0f;
+      }
+    };
+  }
+
+  private final Wrapper newWrapper(double cost, int minWidth, int maxWidth, List<EndpointAffinity> epAffs) {
+    new NonStrictExpectations() {
+      {
+        fragment.getRoot(); result = root;
+      }
+    };
+
+    final Wrapper fragmentWrapper = new Wrapper(fragment, 1);
+    final Stats stats = fragmentWrapper.getStats();
+    stats.setDistributionAffinity(DistributionAffinity.HARD);
+    stats.addCost(cost);
+    stats.addMinWidth(minWidth);
+    stats.addMaxWidth(maxWidth);
+    stats.addEndpointAffinities(epAffs);
+
+    return fragmentWrapper;
+  }
+
+  @Test
+  public void simpleCase1() throws Exception {
+    final Wrapper wrapper = newWrapper(200, 1, 20, Collections.singletonList(new EndpointAffinity(N1_EP1, 1.0, true, MAX_VALUE)));
+    INSTANCE.parallelizeFragment(wrapper, newParameters(SLICE_TARGET_DEFAULT, 5, 20), null);
+
+    // Expect the fragment parallelization to be just one because:
+    // The cost (200) is below the threshold (SLICE_TARGET_DEFAULT) (which gives width of 200/10000 = ~1) and
+    assertEquals(1, wrapper.getWidth());
+
+    final List<DrillbitEndpoint> assignedEps = wrapper.getAssignedEndpoints();
+    assertEquals(1, assignedEps.size());
+    assertEquals(N1_EP1, assignedEps.get(0));
+  }
+
+  @Test
+  public void simpleCase2() throws Exception {
+    // Set the slice target to 1
+    final Wrapper wrapper = newWrapper(200, 1, 20, Collections.singletonList(new EndpointAffinity(N1_EP1, 1.0, true, MAX_VALUE)));
+    INSTANCE.parallelizeFragment(wrapper, newParameters(1, 5, 20), null);
+
+    // Expect the fragment parallelization to be 5:
+    // 1. the cost (200) is above the threshold (SLICE_TARGET_DEFAULT) (which gives 200/1=200 width) and
+    // 2. Max width per node is 5 (limits the width 200 to 5)
+    assertEquals(5, wrapper.getWidth());
+
+    final List<DrillbitEndpoint> assignedEps = wrapper.getAssignedEndpoints();
+    assertEquals(5, assignedEps.size());
+    for (DrillbitEndpoint ep : assignedEps) {
+      assertEquals(N1_EP1, ep);
+    }
+  }
+
+  @Test
+  public void multiNodeCluster1() throws Exception {
+    final Wrapper wrapper = newWrapper(200, 1, 20,
+        ImmutableList.of(
+            new EndpointAffinity(N1_EP1, 0.15, true, MAX_VALUE),
+            new EndpointAffinity(N1_EP2, 0.15, true, MAX_VALUE),
+            new EndpointAffinity(N2_EP1, 0.10, true, MAX_VALUE),
+            new EndpointAffinity(N3_EP2, 0.20, true, MAX_VALUE),
+            new EndpointAffinity(N4_EP2, 0.20, true, MAX_VALUE)
+        ));
+    INSTANCE.parallelizeFragment(wrapper, newParameters(SLICE_TARGET_DEFAULT, 5, 20), null);
+
+    // Expect the fragment parallelization to be 5 because:
+    // 1. The cost (200) is below the threshold (SLICE_TARGET_DEFAULT) (which gives width of 200/10000 = ~1) and
+    // 2. Number of mandoatory node assignments are 5 which overrides the cost based width of 1.
+    assertEquals(5, wrapper.getWidth());
+
+    // As there are 5 required eps and the width is 5, everyone gets assigned 1.
+    final List<DrillbitEndpoint> assignedEps = wrapper.getAssignedEndpoints();
+    assertEquals(5, assignedEps.size());
+    assertTrue(assignedEps.contains(N1_EP1));
+    assertTrue(assignedEps.contains(N1_EP2));
+    assertTrue(assignedEps.contains(N2_EP1));
+    assertTrue(assignedEps.contains(N3_EP2));
+    assertTrue(assignedEps.contains(N4_EP2));
+  }
+
+  @Test
+  public void multiNodeCluster2() throws Exception {
+    final Wrapper wrapper = newWrapper(200, 1, 20,
+        ImmutableList.of(
+            new EndpointAffinity(N1_EP2, 0.15, true, MAX_VALUE),
+            new EndpointAffinity(N2_EP2, 0.15, true, MAX_VALUE),
+            new EndpointAffinity(N3_EP1, 0.10, true, MAX_VALUE),
+            new EndpointAffinity(N4_EP2, 0.20, true, MAX_VALUE),
+            new EndpointAffinity(N1_EP1, 0.20, true, MAX_VALUE)
+        ));
+    INSTANCE.parallelizeFragment(wrapper, newParameters(1, 5, 20), null);
+
+    // Expect the fragment parallelization to be 20 because:
+    // 1. the cost (200) is above the threshold (SLICE_TARGET_DEFAULT) (which gives 200/1=200 width) and
+    // 2. Number of mandatory node assignments are 5 (current width 200 satisfies the requirement)
+    // 3. max fragment width is 20 which limits the width
+    assertEquals(20, wrapper.getWidth());
+
+    final List<DrillbitEndpoint> assignedEps = wrapper.getAssignedEndpoints();
+    assertEquals(20, assignedEps.size());
+    final HashMultiset<DrillbitEndpoint> counts = HashMultiset.create();
+    for(final DrillbitEndpoint ep : assignedEps) {
+      counts.add(ep);
+    }
+    // Each node gets at max 5.
+    assertTrue(counts.count(N1_EP2) <= 5);
+    assertTrue(counts.count(N2_EP2) <= 5);
+    assertTrue(counts.count(N3_EP1) <= 5);
+    assertTrue(counts.count(N4_EP2) <= 5);
+    assertTrue(counts.count(N1_EP1) <= 5);
+  }
+
+  @Test
+  public void multiNodeClusterNegative1() throws Exception {
+    final Wrapper wrapper = newWrapper(200, 1, 20,
+        ImmutableList.of(
+            new EndpointAffinity(N1_EP2, 0.15, true, MAX_VALUE),
+            new EndpointAffinity(N2_EP2, 0.15, true, MAX_VALUE),
+            new EndpointAffinity(N3_EP1, 0.10, true, MAX_VALUE),
+            new EndpointAffinity(N4_EP2, 0.20, true, MAX_VALUE),
+            new EndpointAffinity(N1_EP1, 0.20, true, MAX_VALUE)
+        ));
+
+    try {
+      INSTANCE.parallelizeFragment(wrapper, newParameters(1, 2, 2), null);
+      fail("Expected an exception, because max global query width (2) is less than the number of mandatory nodes (5)");
+    } catch (Exception e) {
+      // ok
+    }
+  }
+
+  @Test
+  public void multiNodeClusterNegative2() throws Exception {
+    final Wrapper wrapper = newWrapper(200, 1, 3,
+        ImmutableList.of(
+            new EndpointAffinity(N1_EP2, 0.15, true, MAX_VALUE),
+            new EndpointAffinity(N2_EP2, 0.15, true, MAX_VALUE),
+            new EndpointAffinity(N3_EP1, 0.10, true, MAX_VALUE),
+            new EndpointAffinity(N4_EP2, 0.20, true, MAX_VALUE),
+            new EndpointAffinity(N1_EP1, 0.20, true, MAX_VALUE)
+        ));
+
+    try {
+      INSTANCE.parallelizeFragment(wrapper, newParameters(1, 2, 2), null);
+      fail("Expected an exception, because max fragment width (3) is less than the number of mandatory nodes (5)");
+    } catch (Exception e) {
+      // ok
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/10afc708/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
index e86fc28..4c29dbe 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestSystemTable.java
@@ -19,11 +19,17 @@ package org.apache.drill.exec.store.sys;
 
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.exec.ExecConstants;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestSystemTable extends BaseTestQuery {
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSystemTable.class);
 
+  @BeforeClass
+  public static void setupMultiNodeCluster() throws Exception {
+    updateTestCluster(3, null);
+  }
+
   @Test
   public void alterSessionOption() throws Exception {
 


Mime
View raw message