drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amansi...@apache.org
Subject [1/2] drill git commit: DRILL-1957: Support nested loop join planning in order to enable NOT-IN, Inequality, Cartesian, uncorrelated EXISTS planning.
Date Wed, 29 Apr 2015 03:44:35 GMT
Repository: drill
Updated Branches:
  refs/heads/master 50c5197fb -> 3a232d81e


DRILL-1957: Support nested loop join planning in order to enable NOT-IN, Inequality, Cartesian, uncorrelated EXISTS planning.

Add support for nested loop join planning where right input is scalar and is broadcast.

Add check for scalar subquery for NLJ. Add support for creating a Filter-NLJ plan.

Rebase on the branch with Jinfeng's Calcite rebasing work.

Conflicts:
	exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java

Add unit tests for NLJoin.

Added test for inequality join.

Tests with BroadcastExchange, with HJ/MJ disabled.

Fix filter push down for NL joins by modifying row count computation for joins with always true conditions.  Rebase on master.  Refactor unit tests.

Improved checking of preconditions for NL join.

Handle the case where scalar aggregate is a child of Filter.

DRILL-1957: Support nested loop join planning in order to enable NOT-IN, Inequality, EXISTS planning.

Better checks for cartesian and inequality joins. Rebase on latest master.

Refactor costing for logical join.  Add tests.  Enable more TPC-H tests.

Remove the check for cartesian join from DrillJoinRel constructor.

Clear left and right keys before calling splitJoinCondition.

Address review comments: Remove redundant call to getJoinCategory.  Added comment in DrillRuleSet.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c3b79ac6
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c3b79ac6
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c3b79ac6

Branch: refs/heads/master
Commit: c3b79ac60a33fa5dcc48f3f49bb54c55dc1923e2
Parents: 50c5197
Author: Aman Sinha <asinha@maprtech.com>
Authored: Sat Mar 14 15:21:46 2015 -0700
Committer: Aman Sinha <asinha@maprtech.com>
Committed: Tue Apr 28 20:34:50 2015 -0700

----------------------------------------------------------------------
 .../exec/physical/impl/join/JoinUtils.java      |  50 +++++
 .../exec/planner/common/DrillJoinRelBase.java   |  51 ++++-
 .../exec/planner/logical/DrillFilterRel.java    |   2 +-
 .../exec/planner/logical/DrillJoinRel.java      |  16 +-
 .../exec/planner/logical/DrillJoinRule.java     |   6 +-
 .../exec/planner/logical/DrillRuleSets.java     |   7 +
 .../exec/planner/physical/HashJoinPrel.java     |   9 +-
 .../exec/planner/physical/HashJoinPrule.java    |  11 +-
 .../drill/exec/planner/physical/JoinPrel.java   |   3 +
 .../exec/planner/physical/JoinPruleBase.java    |  71 ++++---
 .../exec/planner/physical/MergeJoinPrel.java    |  15 +-
 .../exec/planner/physical/MergeJoinPrule.java   |   6 +-
 .../planner/physical/NestedLoopJoinPrel.java    | 114 +++++++++++
 .../planner/physical/NestedLoopJoinPrule.java   | 107 ++++++++++
 .../exec/planner/physical/PlannerSettings.java  |  15 ++
 .../server/options/SystemOptionManager.java     |   3 +
 .../apache/drill/TestDisabledFunctionality.java |   2 +
 .../org/apache/drill/TestTpchDistributed.java   |   9 +-
 .../physical/impl/join/TestNestedLoopJoin.java  | 204 +++++++++++++++++++
 .../src/test/resources/queries/tpch/11_1.sql    |  28 +++
 20 files changed, 657 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/c3b79ac6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
index 45b1093..5ed5d27 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
@@ -24,6 +24,9 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.drill.common.logical.data.JoinCondition;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.drill.exec.planner.logical.DrillAggregateRel;
+import org.apache.drill.exec.planner.logical.DrillFilterRel;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.ErrorCollector;
@@ -38,6 +41,7 @@ import org.apache.drill.exec.resolver.TypeCastRules;
 
 import java.util.LinkedList;
 import java.util.List;
+import com.google.common.collect.Lists;
 
 public class JoinUtils {
   public static enum JoinComparator {
@@ -46,6 +50,12 @@ public class JoinUtils {
     IS_NOT_DISTINCT_FROM // 'IS NOT DISTINCT FROM' comparator
   }
 
+  public static enum JoinCategory {
+    EQUALITY,  // equality join
+    INEQUALITY,  // inequality join: <>, <, >
+    CARTESIAN   // no join condition
+  }
+
   // Check the comparator for the join condition. Note that a similar check is also
   // done in JoinPrel; however we have to repeat it here because a physical plan
   // may be submitted directly to Drill.
@@ -194,4 +204,44 @@ public class JoinUtils {
       }
     }
   }
+
+  public static boolean isScalarSubquery(RelNode childrel) {
+    DrillAggregateRel agg = null;
+    RelNode currentrel = childrel;
+    while (agg == null && currentrel != null) {
+      if (currentrel instanceof DrillAggregateRel) {
+        agg = (DrillAggregateRel)currentrel;
+      } else if (currentrel instanceof DrillFilterRel) {
+        currentrel = currentrel.getInput(0);
+      } else if (currentrel instanceof RelSubset) {
+        currentrel = ((RelSubset)currentrel).getBest() ;
+      } else {
+        break;
+      }
+    }
+
+    if (agg != null) {
+      if (agg.getGroupSet().isEmpty()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public static JoinCategory getJoinCategory(RelNode left, RelNode right, RexNode condition,
+      List<Integer> leftKeys, List<Integer> rightKeys) {
+    if (condition.isAlwaysTrue()) {
+      return JoinCategory.CARTESIAN;
+    }
+    leftKeys.clear();
+    rightKeys.clear();
+    RexNode remaining = RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys);
+
+    if (!remaining.isAlwaysTrue() || (leftKeys.size() == 0 || rightKeys.size() == 0) ) {
+      // for practical purposes these cases could be treated as inequality
+      return JoinCategory.INEQUALITY;
+    }
+    return JoinCategory.EQUALITY;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c3b79ac6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
index 8dc5cf1..5ab416c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
@@ -21,11 +21,12 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 
-import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.expr.holders.IntHolder;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.physical.impl.join.JoinUtils;
+import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory;
 import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
 import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.calcite.rel.InvalidRelException;
@@ -57,22 +58,32 @@ public abstract class DrillJoinRelBase extends Join implements DrillRelNode {
 
   @Override
   public RelOptCost computeSelfCost(RelOptPlanner planner) {
-    List<Integer> tmpLeftKeys = Lists.newArrayList();
-    List<Integer> tmpRightKeys = Lists.newArrayList();
-    RexNode remaining = RelOptUtil.splitJoinCondition(left, right, condition, tmpLeftKeys, tmpRightKeys);
-    if (!remaining.isAlwaysTrue() || (tmpLeftKeys.size() == 0 || tmpRightKeys.size() == 0)) {
+    JoinCategory category = JoinUtils.getJoinCategory(left, right, condition, leftKeys, rightKeys);
+    if (category == JoinCategory.CARTESIAN || category == JoinCategory.INEQUALITY) {
+      if (PrelUtil.getPlannerSettings(planner).isNestedLoopJoinEnabled()) {
+        if (PrelUtil.getPlannerSettings(planner).isNlJoinForScalarOnly()) {
+          if (hasScalarSubqueryInput()) {
+            return computeLogicalJoinCost(planner);
+          } else {
+            return ((DrillCostFactory)planner.getCostFactory()).makeInfiniteCost();
+          }
+        } else {
+          return computeLogicalJoinCost(planner);
+        }
+      }
       return ((DrillCostFactory)planner.getCostFactory()).makeInfiniteCost();
     }
 
-    // We do not know which join method, i.e HASH-join or MergeJoin, will be used in Logical Planning.
-    // Here, we assume to use Hash-join, since this is a more commonly-used Join method in Drill.
-    return computeHashJoinCost(planner);
-    // return super.computeSelfCost(planner);
+    return computeLogicalJoinCost(planner);
   }
 
   @Override
   public double getRows() {
-    return joinRowFactor * Math.max(this.getLeft().getRows(), this.getRight().getRows());
+    if (this.condition.isAlwaysTrue()) {
+      return joinRowFactor * this.getLeft().getRows() * this.getRight().getRows();
+    } else {
+      return joinRowFactor * Math.max(this.getLeft().getRows(), this.getRight().getRows());
+    }
   }
 
   /**
@@ -98,6 +109,17 @@ public abstract class DrillJoinRelBase extends Join implements DrillRelNode {
     return this.rightKeys;
   }
 
+  protected RelOptCost computeLogicalJoinCost(RelOptPlanner planner) {
+    // During Logical Planning, although we don't care much about the actual physical join that will
+    // be chosen, we do care about which table - bigger or smaller - is chosen as the right input
+    // of the join since that is important at least for hash join and we don't currently have
+    // hybrid-hash-join that can swap the inputs dynamically.  The Calcite planner's default cost of a join
+    // is the same whether the bigger table is used as left input or right. In order to overcome that,
+    // we will use the Hash Join cost as the logical cost such that cardinality of left and right inputs
+    // is considered appropriately.
+    return computeHashJoinCost(planner);
+  }
+
   protected RelOptCost computeHashJoinCost(RelOptPlanner planner) {
     double probeRowCount = RelMetadataQuery.getRowCount(this.getLeft());
     double buildRowCount = RelMetadataQuery.getRowCount(this.getRight());
@@ -131,4 +153,13 @@ public abstract class DrillJoinRelBase extends Join implements DrillRelNode {
     return costFactory.makeCost(buildRowCount + probeRowCount, cpuCost, 0, 0, memCost);
 
   }
+  private boolean hasScalarSubqueryInput() {
+    if (JoinUtils.isScalarSubquery(this.getLeft())
+        || JoinUtils.isScalarSubquery(this.getRight())) {
+      return true;
+    }
+
+    return false;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c3b79ac6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRel.java
index a914f47..dbd08f4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRel.java
@@ -31,7 +31,7 @@ import org.apache.calcite.rex.RexNode;
 
 
 public class DrillFilterRel extends DrillFilterRelBase implements DrillRel {
-  protected DrillFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
+  public DrillFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
     super(DRILL_LOGICAL, cluster, traits, child, condition);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c3b79ac6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
index 1f602c7..dcccdb0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
@@ -51,26 +51,14 @@ public class DrillJoinRel extends DrillJoinRelBase implements DrillRel {
       JoinRelType joinType) throws InvalidRelException {
     super(cluster, traits, left, right, condition, joinType);
 
-    RexNode remaining = RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys);
-    if (!remaining.isAlwaysTrue() && (leftKeys.size() == 0 || rightKeys.size() == 0)) {
-      // throw new InvalidRelException("DrillJoinRel only supports equi-join");
-    }
+    RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys);
   }
 
   public DrillJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
-      JoinRelType joinType, List<Integer> leftKeys, List<Integer> rightKeys, boolean checkCartesian) throws InvalidRelException {
+      JoinRelType joinType, List<Integer> leftKeys, List<Integer> rightKeys) throws InvalidRelException {
     super(cluster, traits, left, right, condition, joinType);
 
     assert (leftKeys != null && rightKeys != null);
-
-    if (checkCartesian)  {
-      List<Integer> tmpLeftKeys = Lists.newArrayList();
-      List<Integer> tmpRightKeys = Lists.newArrayList();
-      RexNode remaining = RelOptUtil.splitJoinCondition(left, right, condition, tmpLeftKeys, tmpRightKeys);
-      if (!remaining.isAlwaysTrue() && (tmpLeftKeys.size() == 0 || tmpRightKeys.size() == 0)) {
-        // throw new InvalidRelException("DrillJoinRel only supports equi-join");
-      }
-    }
     this.leftKeys = leftKeys;
     this.rightKeys = rightKeys;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c3b79ac6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java
index f832dfe..f3b9f6a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java
@@ -97,7 +97,7 @@ public class DrillJoinRule extends RelOptRule {
         newJoinCondition = RexUtil.composeConjunction(builder, equijoinList, false);
       } else {
 //        tracer.warning("Non-equijoins are only supported in the presence of an equijoin.");
-        return;
+//        return;
       }
     }
     //else {
@@ -108,11 +108,11 @@ public class DrillJoinRule extends RelOptRule {
     try {
       if (!addFilter) {
        RelNode joinRel = new DrillJoinRel(join.getCluster(), traits, convertedLeft, convertedRight, origJoinCondition,
-                                         join.getJoinType(), leftKeys, rightKeys, false);
+                                         join.getJoinType(), leftKeys, rightKeys);
        call.transformTo(joinRel);
       } else {
         RelNode joinRel = new DrillJoinRel(join.getCluster(), traits, convertedLeft, convertedRight, newJoinCondition,
-                                           join.getJoinType(), leftKeys, rightKeys, false);
+                                           join.getJoinType(), leftKeys, rightKeys);
         call.transformTo(new DrillFilterRel(join.getCluster(), traits, joinRel, remaining));
       }
     } catch (InvalidRelException e) {

http://git-wip-us.apache.org/repos/asf/drill/blob/c3b79ac6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index 532fd43..53e1bff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.planner.physical.HashAggPrule;
 import org.apache.drill.exec.planner.physical.HashJoinPrule;
 import org.apache.drill.exec.planner.physical.LimitPrule;
 import org.apache.drill.exec.planner.physical.MergeJoinPrule;
+import org.apache.drill.exec.planner.physical.NestedLoopJoinPrule;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.ProjectPrule;
 import org.apache.drill.exec.planner.physical.PushLimitToTopN;
@@ -235,6 +236,12 @@ public class DrillRuleSets {
 
     }
 
+    // NLJ plans consist of broadcasting the right child, hence we need
+    // broadcast join enabled.
+    if (ps.isNestedLoopJoinEnabled() && ps.isBroadcastJoinEnabled()) {
+      ruleList.add(NestedLoopJoinPrule.INSTANCE);
+    }
+
     return new DrillRuleSet(ImmutableSet.copyOf(ruleList));
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c3b79ac6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
index aca55a0..dc21bdb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
@@ -24,6 +24,9 @@ import org.apache.calcite.rel.core.Join;
 import org.apache.drill.common.logical.data.JoinCondition;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.impl.join.JoinUtils;
+import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory;
+import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.core.JoinRelType;
@@ -31,7 +34,6 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rex.RexNode;
 
@@ -50,7 +52,7 @@ public class HashJoinPrel  extends JoinPrel {
       JoinRelType joinType, boolean swapped) throws InvalidRelException {
     super(cluster, traits, left, right, condition, joinType);
     this.swapped = swapped;
-    RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys);
+    joincategory = JoinUtils.getJoinCategory(left, right, condition, leftKeys, rightKeys);
   }
 
   @Override
@@ -67,6 +69,9 @@ public class HashJoinPrel  extends JoinPrel {
     if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
       return super.computeSelfCost(planner).multiplyBy(.1);
     }
+    if (joincategory == JoinCategory.CARTESIAN || joincategory == JoinCategory.INEQUALITY) {
+      return ((DrillCostFactory)planner.getCostFactory()).makeInfiniteCost();
+    }
     return computeHashJoinCost(planner);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c3b79ac6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
index 24df0b1..1fd0e51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
@@ -48,7 +48,8 @@ public class HashJoinPrule extends JoinPruleBase {
 
   @Override
   public void onMatch(RelOptRuleCall call) {
-    if (!PrelUtil.getPlannerSettings(call.getPlanner()).isHashJoinEnabled()) {
+    PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+    if (!settings.isHashJoinEnabled()) {
       return;
     }
 
@@ -56,7 +57,7 @@ public class HashJoinPrule extends JoinPruleBase {
     final RelNode left = join.getLeft();
     final RelNode right = join.getRight();
 
-    if (!checkPreconditions(join, left, right)) {
+    if (!checkPreconditions(join, left, right, settings)) {
       return;
     }
 
@@ -65,10 +66,12 @@ public class HashJoinPrule extends JoinPruleBase {
     try {
 
       if(isDist){
-        createDistBothPlan(call, join, PhysicalJoinType.HASH_JOIN, left, right, null /* left collation */, null /* right collation */, hashSingleKey);
+        createDistBothPlan(call, join, PhysicalJoinType.HASH_JOIN,
+            left, right, null /* left collation */, null /* right collation */, hashSingleKey);
       }else{
         if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
-          createBroadcastPlan(call, join, PhysicalJoinType.HASH_JOIN, left, right, null /* left collation */, null /* right collation */);
+          createBroadcastPlan(call, join, join.getCondition(), PhysicalJoinType.HASH_JOIN,
+              left, right, null /* left collation */, null /* right collation */);
         }
       }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c3b79ac6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
index 59b9f41..45e8bc0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.exec.physical.impl.join.JoinUtils;
 import org.apache.drill.exec.planner.common.DrillJoinRelBase;
 import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
 import org.apache.calcite.rel.InvalidRelException;
@@ -47,6 +48,8 @@ import com.google.common.collect.Lists;
  */
 public abstract class JoinPrel extends DrillJoinRelBase implements Prel{
 
+  protected JoinUtils.JoinCategory joincategory;
+
   public JoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
       JoinRelType joinType) throws InvalidRelException{
     super(cluster, traits, left, right, condition, joinType);

http://git-wip-us.apache.org/repos/asf/drill/blob/c3b79ac6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
index d6f1672..fd0ea69 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
@@ -20,6 +20,8 @@ package org.apache.drill.exec.planner.physical;
 
 import java.util.List;
 
+import org.apache.drill.exec.physical.impl.join.JoinUtils;
+import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory;
 import org.apache.drill.exec.planner.common.DrillJoinRelBase;
 import org.apache.drill.exec.planner.logical.DrillJoinRel;
 import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
@@ -31,8 +33,9 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptRuleOperand;
-import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 
 import com.google.common.collect.ImmutableList;
@@ -41,23 +44,18 @@ import com.google.common.collect.Lists;
 // abstract base class for the join physical rules
 public abstract class JoinPruleBase extends Prule {
 
-  protected static enum PhysicalJoinType {HASH_JOIN, MERGE_JOIN};
+  protected static enum PhysicalJoinType {HASH_JOIN, MERGE_JOIN, NESTEDLOOP_JOIN};
 
   protected JoinPruleBase(RelOptRuleOperand operand, String description) {
     super(operand, description);
   }
 
-  protected boolean checkPreconditions(DrillJoinRel join, RelNode left, RelNode right) {
-    if (join.getCondition().isAlwaysTrue()) {
-      // this indicates a cartesian join which is not supported by existing rules
-      return false;
-    }
-
+  protected boolean checkPreconditions(DrillJoinRel join, RelNode left, RelNode right,
+      PlannerSettings settings) {
     List<Integer> leftKeys = Lists.newArrayList();
     List<Integer> rightKeys = Lists.newArrayList() ;
-    RexNode remaining = RelOptUtil.splitJoinCondition(left, right, join.getCondition(), leftKeys, rightKeys);
-    if (!remaining.isAlwaysTrue() && (leftKeys.size() == 0 || rightKeys.size() == 0)) {
-      // this is a non-equijoin which is not supported by existing rules
+    JoinCategory category = JoinUtils.getJoinCategory(left, right, join.getCondition(), leftKeys, rightKeys);
+    if (category == JoinCategory.CARTESIAN || category == JoinCategory.INEQUALITY) {
       return false;
     }
     return true;
@@ -167,7 +165,8 @@ public abstract class JoinPruleBase extends Prule {
   // Create join plan with left child ANY distributed and right child BROADCAST distributed. If the physical join type
   // is MergeJoin, a collation must be provided for both left and right child and the plan will contain sort converter
   // if necessary to provide the collation.
-  protected void createBroadcastPlan(RelOptRuleCall call, DrillJoinRel join,
+  protected void createBroadcastPlan(final RelOptRuleCall call, final DrillJoinRel join,
+      final RexNode joinCondition,
       final PhysicalJoinType physicalJoinType,
       final RelNode left, final RelNode right,
       final RelCollation collationLeft, final RelCollation collationRight) throws InvalidRelException {
@@ -180,7 +179,8 @@ public abstract class JoinPruleBase extends Prule {
       assert collationLeft != null && collationRight != null;
       traitsLeft = traitsLeft.plus(collationLeft);
       traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationRight).plus(distBroadcastRight);
-    } else {
+    } else if (physicalJoinType == PhysicalJoinType.HASH_JOIN ||
+        physicalJoinType == PhysicalJoinType.NESTEDLOOP_JOIN) {
       traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distBroadcastRight);
     }
 
@@ -199,16 +199,28 @@ public abstract class JoinPruleBase extends Prule {
             RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, collationLeft, toDist);
 
             RelNode newLeft = convert(left, newTraitsLeft);
-              return new MergeJoinPrel(join.getCluster(), newTraitsLeft, newLeft, convertedRight, join.getCondition(),
+              return new MergeJoinPrel(join.getCluster(), newTraitsLeft, newLeft, convertedRight, joinCondition,
                                           join.getJoinType());
           }
 
         }.go(join, convertedLeft);
 
 
-      }else{
+      } else if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
+        new SubsetTransformer<DrillJoinRel, InvalidRelException>(call) {
+
+          @Override
+          public RelNode convertChild(final DrillJoinRel join,  final RelNode rel) throws InvalidRelException {
+            DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+            RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
+            RelNode newLeft = convert(left, newTraitsLeft);
+            return new HashJoinPrel(join.getCluster(), newTraitsLeft, newLeft, convertedRight, joinCondition,
+                                         join.getJoinType());
 
+          }
 
+        }.go(join, convertedLeft);
+      } else if (physicalJoinType == PhysicalJoinType.NESTEDLOOP_JOIN) {
         new SubsetTransformer<DrillJoinRel, InvalidRelException>(call) {
 
           @Override
@@ -216,27 +228,38 @@ public abstract class JoinPruleBase extends Prule {
             DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
             RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
             RelNode newLeft = convert(left, newTraitsLeft);
-            return new HashJoinPrel(join.getCluster(), newTraitsLeft, newLeft, convertedRight, join.getCondition(),
+            return new NestedLoopJoinPrel(join.getCluster(), newTraitsLeft, newLeft, convertedRight, joinCondition,
                                          join.getJoinType());
-
           }
 
         }.go(join, convertedLeft);
       }
 
-    }else{
+    } else {
       if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
-        call.transformTo(new MergeJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, convertedRight, join.getCondition(),
+        call.transformTo(new MergeJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, convertedRight, joinCondition,
             join.getJoinType()));
 
-      }else{
-        call.transformTo(new HashJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, convertedRight, join.getCondition(),
-                                       join.getJoinType()));
+      } else if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
+        call.transformTo(new HashJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, convertedRight, joinCondition,
+            join.getJoinType()));
+      } else if (physicalJoinType == PhysicalJoinType.NESTEDLOOP_JOIN) {
+        if (joinCondition.isAlwaysTrue()) {
+          call.transformTo(new NestedLoopJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, convertedRight, joinCondition,
+            join.getJoinType()));
+        } else {
+          RexBuilder builder = join.getCluster().getRexBuilder();
+          RexLiteral condition = builder.makeLiteral(true); // TRUE condition for the NLJ
+
+          FilterPrel newFilterRel = new FilterPrel(join.getCluster(), convertedLeft.getTraitSet(),
+              new NestedLoopJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft, convertedRight,
+                  condition, join.getJoinType()),
+              joinCondition);
+          call.transformTo(newFilterRel);
+        }
       }
     }
 
-
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c3b79ac6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
index 3c0022f..e7141d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
@@ -24,6 +24,8 @@ import org.apache.calcite.rel.core.Join;
 import org.apache.drill.common.logical.data.JoinCondition;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.MergeJoinPOP;
+import org.apache.drill.exec.physical.impl.join.JoinUtils;
+import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -46,15 +48,7 @@ public class MergeJoinPrel  extends JoinPrel {
   public MergeJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
       JoinRelType joinType) throws InvalidRelException {
     super(cluster, traits, left, right, condition, joinType);
-
-    if (condition.isAlwaysTrue()) {
-      throw new InvalidRelException("MergeJoinPrel does not support cartesian product join");
-    }
-
-    RexNode remaining = RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys);
-    if (!remaining.isAlwaysTrue() && (leftKeys.size() == 0 || rightKeys.size() == 0)) {
-      throw new InvalidRelException("MergeJoinPrel only supports equi-join");
-    }
+    joincategory = JoinUtils.getJoinCategory(left, right, condition, leftKeys, rightKeys);
   }
 
 
@@ -72,6 +66,9 @@ public class MergeJoinPrel  extends JoinPrel {
     if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
       return super.computeSelfCost(planner).multiplyBy(.1);
     }
+    if (joincategory == JoinCategory.CARTESIAN || joincategory == JoinCategory.INEQUALITY) {
+      return ((DrillCostFactory)planner.getCostFactory()).makeInfiniteCost();
+    }
     double leftRowCount = RelMetadataQuery.getRowCount(this.getLeft());
     double rightRowCount = RelMetadataQuery.getRowCount(this.getRight());
     // cost of evaluating each leftkey=rightkey join condition

http://git-wip-us.apache.org/repos/asf/drill/blob/c3b79ac6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
index cbcc920..dd587b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
@@ -53,11 +53,12 @@ public class MergeJoinPrule extends JoinPruleBase {
 
   @Override
   public void onMatch(RelOptRuleCall call) {
+    PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
     final DrillJoinRel join = (DrillJoinRel) call.rel(0);
     final RelNode left = join.getLeft();
     final RelNode right = join.getRight();
 
-    if (!checkPreconditions(join, left, right)) {
+    if (!checkPreconditions(join, left, right, settings)) {
       return;
     }
 
@@ -71,7 +72,8 @@ public class MergeJoinPrule extends JoinPruleBase {
         createDistBothPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight, hashSingleKey);
       }else{
         if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
-          createBroadcastPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight);
+          createBroadcastPlan(call, join, join.getCondition(), PhysicalJoinType.MERGE_JOIN,
+              left, right, collationLeft, collationRight);
         }
       }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c3b79ac6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java
new file mode 100644
index 0000000..b35017e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.NestedLoopJoinPOP;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+
+import com.google.common.collect.Lists;
+
+public class NestedLoopJoinPrel  extends JoinPrel {
+
+  public NestedLoopJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
+                      JoinRelType joinType) throws InvalidRelException {
+    super(cluster, traits, left, right, condition, joinType);
+    RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys);
+  }
+
+  @Override
+  public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
+    try {
+      return new NestedLoopJoinPrel(this.getCluster(), traitSet, left, right, conditionExpr, joinType);
+    }catch (InvalidRelException e) {
+      throw new AssertionError(e);
+    }
+  }
+
+  @Override
+  public double getRows() {
+    return this.getLeft().getRows() * this.getRight().getRows();
+  }
+
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner) {
+    if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+      return super.computeSelfCost(planner).multiplyBy(.1);
+    }
+    double leftRowCount = RelMetadataQuery.getRowCount(this.getLeft());
+    double rightRowCount = RelMetadataQuery.getRowCount(this.getRight());
+    double nljFactor = PrelUtil.getSettings(getCluster()).getNestedLoopJoinFactor();
+
+    // cpu cost of evaluating each leftkey=rightkey join condition
+    double joinConditionCost = DrillCostBase.COMPARE_CPU_COST * this.getLeftKeys().size();
+
+    double cpuCost = joinConditionCost * (leftRowCount * rightRowCount) * nljFactor;
+
+    DrillCostFactory costFactory = (DrillCostFactory) planner.getCostFactory();
+    return costFactory.makeCost(leftRowCount * rightRowCount, cpuCost, 0, 0, 0);
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+    final List<String> fields = getRowType().getFieldNames();
+    assert isUnique(fields);
+
+    final List<String> leftFields = left.getRowType().getFieldNames();
+    final List<String> rightFields = right.getRowType().getFieldNames();
+
+    PhysicalOperator leftPop = ((Prel)left).getPhysicalOperator(creator);
+    PhysicalOperator rightPop = ((Prel)right).getPhysicalOperator(creator);
+
+    JoinRelType jtype = this.getJoinType();
+
+    List<JoinCondition> conditions = Lists.newArrayList();
+
+    buildJoinConditions(conditions, leftFields, rightFields, leftKeys, rightKeys);
+
+    NestedLoopJoinPOP nljoin = new NestedLoopJoinPOP(leftPop, rightPop, conditions, jtype);
+    return creator.addMetadata(this, nljoin);
+  }
+
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    return SelectionVectorMode.DEFAULT;
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c3b79ac6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
new file mode 100644
index 0000000..24be433
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.drill.exec.physical.impl.join.JoinUtils;
+import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory;
+import org.apache.drill.exec.planner.logical.DrillJoinRel;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.util.trace.CalciteTrace;
+
+import com.google.common.collect.Lists;
+
+
+public class NestedLoopJoinPrule extends JoinPruleBase {
+  public static final RelOptRule INSTANCE = new NestedLoopJoinPrule("Prel.NestedLoopJoinPrule", RelOptHelper.any(DrillJoinRel.class));
+
+  protected static final Logger tracer = CalciteTrace.getPlannerTracer();
+
+  private NestedLoopJoinPrule(String name, RelOptRuleOperand operand) {
+    super(operand, name);
+  }
+
+  @Override
+  protected boolean checkPreconditions(DrillJoinRel join, RelNode left, RelNode right,
+      PlannerSettings settings) {
+    JoinRelType type = join.getJoinType();
+
+    if (! (type == JoinRelType.INNER || type == JoinRelType.LEFT)) {
+      return false;
+    }
+
+    List<Integer> leftKeys = Lists.newArrayList();
+    List<Integer> rightKeys = Lists.newArrayList() ;
+    JoinCategory category = JoinUtils.getJoinCategory(left, right, join.getCondition(), leftKeys, rightKeys);
+    if (category == JoinCategory.EQUALITY
+        && (settings.isHashJoinEnabled() || settings.isMergeJoinEnabled())) {
+      return false;
+    }
+
+    if (settings.isNlJoinForScalarOnly()) {
+      if (JoinUtils.isScalarSubquery(left) || JoinUtils.isScalarSubquery(right)) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    return PrelUtil.getPlannerSettings(call.getPlanner()).isNestedLoopJoinEnabled();
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+    if (!settings.isNestedLoopJoinEnabled()) {
+      return;
+    }
+
+    final DrillJoinRel join = (DrillJoinRel) call.rel(0);
+    final RelNode left = join.getLeft();
+    final RelNode right = join.getRight();
+
+    if (!checkPreconditions(join, left, right, settings)) {
+      return;
+    }
+
+    try {
+
+      if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
+        createBroadcastPlan(call, join, join.getCondition(), PhysicalJoinType.NESTEDLOOP_JOIN,
+            left, right, null /* left collation */, null /* right collation */);
+      }
+
+    } catch (InvalidRelException e) {
+      tracer.warning(e.toString());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c3b79ac6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index ac86c4a..8f089c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -43,10 +43,13 @@ public class PlannerSettings implements Context{
   public static final OptionValidator STREAMAGG = new BooleanValidator("planner.enable_streamagg", true);
   public static final OptionValidator HASHJOIN = new BooleanValidator("planner.enable_hashjoin", true);
   public static final OptionValidator MERGEJOIN = new BooleanValidator("planner.enable_mergejoin", true);
+  public static final OptionValidator NESTEDLOOPJOIN = new BooleanValidator("planner.enable_nestedloopjoin", true);
   public static final OptionValidator MULTIPHASE = new BooleanValidator("planner.enable_multiphase_agg", true);
   public static final OptionValidator BROADCAST = new BooleanValidator("planner.enable_broadcast_join", true);
   public static final OptionValidator BROADCAST_THRESHOLD = new PositiveLongValidator("planner.broadcast_threshold", MAX_BROADCAST_THRESHOLD, 10000000);
   public static final OptionValidator BROADCAST_FACTOR = new RangeDoubleValidator("planner.broadcast_factor", 0, Double.MAX_VALUE, 1.0d);
+  public static final OptionValidator NESTEDLOOPJOIN_FACTOR = new RangeDoubleValidator("planner.nestedloopjoin_factor", 0, Double.MAX_VALUE, 100.0d);
+  public static final OptionValidator NLJOIN_FOR_SCALAR = new BooleanValidator("planner.enable_nljoin_for_scalar_only", true);
   public static final OptionValidator JOIN_ROW_COUNT_ESTIMATE_FACTOR = new RangeDoubleValidator("planner.join.row_count_estimate_factor", 0, Double.MAX_VALUE, 1.0d);
   public static final OptionValidator MUX_EXCHANGE = new BooleanValidator("planner.enable_mux_exchange", true);
   public static final OptionValidator DEMUX_EXCHANGE = new BooleanValidator("planner.enable_demux_exchange", false);
@@ -91,6 +94,14 @@ public class PlannerSettings implements Context{
     return options.getOption(BROADCAST_FACTOR.getOptionName()).float_val;
   }
 
+  public double getNestedLoopJoinFactor(){
+    return options.getOption(NESTEDLOOPJOIN_FACTOR.getOptionName()).float_val;
+  }
+
+  public boolean isNlJoinForScalarOnly() {
+    return options.getOption(NLJOIN_FOR_SCALAR.getOptionName()).bool_val;
+  }
+
   public boolean useDefaultCosting() {
     return useDefaultCosting;
   }
@@ -123,6 +134,10 @@ public class PlannerSettings implements Context{
     return options.getOption(MERGEJOIN.getOptionName()).bool_val;
   }
 
+  public boolean isNestedLoopJoinEnabled() {
+    return options.getOption(NESTEDLOOPJOIN.getOptionName()).bool_val;
+  }
+
   public boolean isMultiPhaseAggEnabled() {
     return options.getOption(MULTIPHASE.getOptionName()).bool_val;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c3b79ac6/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index a394efe..9127c7c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -51,10 +51,13 @@ public class SystemOptionManager extends BaseOptionManager {
       PlannerSettings.STREAMAGG,
       PlannerSettings.HASHJOIN,
       PlannerSettings.MERGEJOIN,
+      PlannerSettings.NESTEDLOOPJOIN,
       PlannerSettings.MULTIPHASE,
       PlannerSettings.BROADCAST,
       PlannerSettings.BROADCAST_THRESHOLD,
       PlannerSettings.BROADCAST_FACTOR,
+      PlannerSettings.NESTEDLOOPJOIN_FACTOR,
+      PlannerSettings.NLJOIN_FOR_SCALAR,
       PlannerSettings.JOIN_ROW_COUNT_ESTIMATE_FACTOR,
       PlannerSettings.MUX_EXCHANGE,
       PlannerSettings.DEMUX_EXCHANGE,

http://git-wip-us.apache.org/repos/asf/drill/blob/c3b79ac6/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java b/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
index e049943..d304f26 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
@@ -21,6 +21,7 @@ import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
 import org.apache.drill.exec.work.foreman.UnsupportedDataTypeException;
 import org.apache.drill.exec.work.foreman.UnsupportedFunctionException;
 import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class TestDisabledFunctionality extends BaseTestQuery{
@@ -251,6 +252,7 @@ public class TestDisabledFunctionality extends BaseTestQuery{
   }
 
   @Test(expected = UnsupportedRelOperatorException.class) // see DRILL-1325,
+  @Ignore // TODO: currently errors out in NLJ
   public void testSubqueryWithoutCorrelatedJoinCondition() throws Exception {
     try {
       test("select a.lastname " +

http://git-wip-us.apache.org/repos/asf/drill/blob/c3b79ac6/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java
index 2b41912..5d2a1c6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java
@@ -80,11 +80,16 @@ public class TestTpchDistributed extends BaseTestQuery {
   }
 
   @Test
-  @Ignore // cartesion problem
+  @Ignore // depends on fix for Calcite-695 or an implementation of SqlSingleValueAggFunction in Drill
   public void tpch11() throws Exception{
     testDistributed("queries/tpch/11.sql");
   }
 
+  @Test // slight variant of tpch-11 that does not require SqlSingleValueAggFunction
+  public void tpch11_1() throws Exception{
+    testDistributed("queries/tpch/11_1.sql");
+  }
+
   @Test
   public void tpch12() throws Exception{
     testDistributed("queries/tpch/12.sql");
@@ -101,13 +106,11 @@ public class TestTpchDistributed extends BaseTestQuery {
   }
 
   @Test
-  @Ignore // non-equality join
   public void tpch15() throws Exception{
     testDistributed("queries/tpch/15.sql");
   }
 
   @Test
-  @Ignore // invalid plan, due to Nulls value NOT IN sub-q
   public void tpch16() throws Exception{
     testDistributed("queries/tpch/16.sql");
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c3b79ac6/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java
new file mode 100644
index 0000000..35a95dd
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.common.util.TestTools;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestNestedLoopJoin extends PlanTestBase {
+
+  private static String nlpattern = "NestedLoopJoin";
+  private static final String WORKING_PATH = TestTools.getWorkingPath();
+  private static final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
+
+  private static final String NLJ = "Alter session set `planner.enable_hashjoin` = false; " +
+      "alter session set `planner.enable_mergejoin` = false; " +
+      "alter session set `planner.enable_nljoin_for_scalar_only` = false; ";
+  private static final String SINGLE_NLJ = "alter session set `planner.disable_exchanges` = true; " + NLJ;
+  private static final String DISABLE_HJ = "alter session set `planner.enable_hashjoin` = false";
+  private static final String ENABLE_HJ = "alter session set `planner.enable_hashjoin` = true";
+  private static final String DISABLE_MJ = "alter session set `planner.enable_mergejoin` = false";
+  private static final String ENABLE_MJ = "alter session set `planner.enable_mergejoin` = true";
+  private static final String DISABLE_NLJ_SCALAR = "alter session set `planner.enable_nljoin_for_scalar_only` = false";
+  private static final String ENABLE_NLJ_SCALAR = "alter session set `planner.enable_nljoin_for_scalar_only` = true";
+
+  // Test queries used by planning and execution tests
+  private static final String testNlJoinExists_1 = "select r_regionkey from cp.`tpch/region.parquet` "
+      + " where exists (select n_regionkey from cp.`tpch/nation.parquet` "
+      + " where n_nationkey < 10)";
+
+  private static final String testNlJoinNotIn_1 = "select r_regionkey from cp.`tpch/region.parquet` "
+      + " where r_regionkey not in (select n_regionkey from cp.`tpch/nation.parquet` "
+      + "                            where n_nationkey < 4)";
+
+  // not-in subquery produces empty set
+  private static final String testNlJoinNotIn_2 = "select r_regionkey from cp.`tpch/region.parquet` "
+      + " where r_regionkey not in (select n_regionkey from cp.`tpch/nation.parquet` "
+      + "                            where 1=0)";
+
+  private static final String testNlJoinInequality_1 = "select r_regionkey from cp.`tpch/region.parquet` "
+      + " where r_regionkey > (select min(n_regionkey) from cp.`tpch/nation.parquet` "
+      + "                        where n_nationkey < 4)";
+
+  private static final String testNlJoinInequality_2 = "select r.r_regionkey, n.n_nationkey from cp.`tpch/nation.parquet` n "
+      + " inner join cp.`tpch/region.parquet` r on n.n_regionkey < r.r_regionkey where n.n_nationkey < 3";
+
+  private static final String testNlJoinInequality_3 = "select r_regionkey from cp.`tpch/region.parquet` "
+      + " where r_regionkey > (select min(n_regionkey) * 2 from cp.`tpch/nation.parquet` )";
+
+
+  @Test
+  public void testNlJoinExists_1_planning() throws Exception {
+    testPlanMatchingPatterns(testNlJoinExists_1, new String[]{nlpattern}, new String[]{});
+  }
+
+  @Test
+  // @Ignore
+  public void testNlJoinNotIn_1_planning() throws Exception {
+    testPlanMatchingPatterns(testNlJoinNotIn_1, new String[]{nlpattern}, new String[]{});
+  }
+
+  @Test
+  public void testNlJoinInequality_1() throws Exception {
+    testPlanMatchingPatterns(testNlJoinInequality_1, new String[]{nlpattern}, new String[]{});
+  }
+
+  @Test
+  public void testNlJoinInequality_2() throws Exception {
+    test(DISABLE_NLJ_SCALAR);
+    testPlanMatchingPatterns(testNlJoinInequality_2, new String[]{nlpattern}, new String[]{});
+    test(ENABLE_NLJ_SCALAR);
+  }
+
+  @Test
+  @Ignore // Re-test after CALCITE-695 is resolved
+  public void testNlJoinInequality_3() throws Exception {
+    test(DISABLE_NLJ_SCALAR);
+    testPlanMatchingPatterns(testNlJoinInequality_3, new String[]{nlpattern}, new String[]{});
+    test(ENABLE_NLJ_SCALAR);
+  }
+
+  @Test
+  public void testNlJoinAggrs_1_planning() throws Exception {
+    String query = "select total1, total2 from "
+       + "(select sum(l_quantity) as total1 from cp.`tpch/lineitem.parquet` where l_suppkey between 100 and 200), "
+       + "(select sum(l_quantity) as total2 from cp.`tpch/lineitem.parquet` where l_suppkey between 200 and 300)  ";
+    testPlanMatchingPatterns(query, new String[]{nlpattern}, new String[]{});
+  }
+
+  @Test // equality join and scalar right input, hj and mj disabled
+  public void testNlJoinEqualityScalar_1_planning() throws Exception {
+    String query = "select r_regionkey from cp.`tpch/region.parquet` "
+        + " where r_regionkey = (select min(n_regionkey) from cp.`tpch/nation.parquet` "
+        + "                        where n_nationkey < 10)";
+    test(DISABLE_HJ);
+    test(DISABLE_MJ);
+    testPlanMatchingPatterns(query, new String[]{nlpattern}, new String[]{});
+    test(ENABLE_HJ);
+    test(ENABLE_MJ);
+  }
+
+  @Test // equality join and scalar right input, hj and mj disabled, enforce exchanges
+  public void testNlJoinEqualityScalar_2_planning() throws Exception {
+    String query = "select r_regionkey from cp.`tpch/region.parquet` "
+        + " where r_regionkey = (select min(n_regionkey) from cp.`tpch/nation.parquet` "
+        + "                        where n_nationkey < 10)";
+    test("alter session set `planner.slice_target` = 1");
+    test(DISABLE_HJ);
+    test(DISABLE_MJ);
+    testPlanMatchingPatterns(query, new String[]{nlpattern, "BroadcastExchange"}, new String[]{});
+    test(ENABLE_HJ);
+    test(ENABLE_MJ);
+    test("alter session set `planner.slice_target` = 100000");
+  }
+
+  @Test // equality join and non-scalar right input, hj and mj disabled
+  public void testNlJoinEqualityNonScalar_1_planning() throws Exception {
+    String query = "select r.r_regionkey from cp.`tpch/region.parquet` r inner join cp.`tpch/nation.parquet` n"
+        + " on r.r_regionkey = n.n_regionkey where n.n_nationkey < 10";
+    test(DISABLE_HJ);
+    test(DISABLE_MJ);
+    test(DISABLE_NLJ_SCALAR);
+    testPlanMatchingPatterns(query, new String[]{nlpattern}, new String[]{});
+    test(ENABLE_HJ);
+    test(ENABLE_MJ);
+    test(ENABLE_NLJ_SCALAR);
+  }
+
+  @Test // equality join and non-scalar right input, hj and mj disabled, enforce exchanges
+  public void testNlJoinEqualityNonScalar_2_planning() throws Exception {
+    String query = String.format("select n.n_nationkey from cp.`tpch/nation.parquet` n, "
+        + " dfs_test.`%s/multilevel/parquet` o "
+        + " where n.n_regionkey = o.o_orderkey and o.o_custkey < 5", TEST_RES_PATH);
+    test("alter session set `planner.slice_target` = 1");
+    test(DISABLE_HJ);
+    test(DISABLE_MJ);
+    test(DISABLE_NLJ_SCALAR);
+    testPlanMatchingPatterns(query, new String[]{nlpattern, "BroadcastExchange"}, new String[]{});
+    test(ENABLE_HJ);
+    test(ENABLE_MJ);
+    test(ENABLE_NLJ_SCALAR);
+    test("alter session set `planner.slice_target` = 100000");
+  }
+
+  // EXECUTION TESTS
+
+  @Test
+  public void testNlJoinExists_1_exec() throws Exception {
+    testBuilder()
+        .sqlQuery(testNlJoinExists_1)
+        .unOrdered()
+        .baselineColumns("r_regionkey")
+        .baselineValues(0)
+        .baselineValues(1)
+        .baselineValues(2)
+        .baselineValues(3)
+        .baselineValues(4)
+        .go();
+  }
+
+  @Test
+  public void testNlJoinNotIn_1_exec() throws Exception {
+    testBuilder()
+        .sqlQuery(testNlJoinNotIn_1)
+        .unOrdered()
+        .baselineColumns("r_regionkey")
+        .baselineValues(2)
+        .baselineValues(3)
+        .baselineValues(4)
+        .go();
+  }
+
+  @Test
+  public void testNlJoinNotIn_2_exec() throws Exception {
+    testBuilder()
+        .sqlQuery(testNlJoinNotIn_2)
+        .unOrdered()
+        .baselineColumns("r_regionkey")
+        .baselineValues(0)
+        .baselineValues(1)
+        .baselineValues(2)
+        .baselineValues(3)
+        .baselineValues(4)
+        .go();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c3b79ac6/exec/java-exec/src/test/resources/queries/tpch/11_1.sql
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/queries/tpch/11_1.sql b/exec/java-exec/src/test/resources/queries/tpch/11_1.sql
new file mode 100644
index 0000000..6eb08fe
--- /dev/null
+++ b/exec/java-exec/src/test/resources/queries/tpch/11_1.sql
@@ -0,0 +1,28 @@
+-- tpch11 using 1395599672 as a seed to the RNG
+select
+  ps.ps_partkey,
+  sum(ps.ps_supplycost * ps.ps_availqty) as `value`
+from
+  cp.`tpch/partsupp.parquet` ps,
+  cp.`tpch/supplier.parquet` s,
+  cp.`tpch/nation.parquet` n
+where
+  ps.ps_suppkey = s.s_suppkey
+  and s.s_nationkey = n.n_nationkey
+  and n.n_name = 'JAPAN'
+group by
+  ps.ps_partkey having
+    sum(ps.ps_supplycost * ps.ps_availqty) > (
+      select
+        sum(ps.ps_supplycost * ps.ps_availqty * 0.01)
+      from
+        cp.`tpch/partsupp.parquet` ps,
+        cp.`tpch/supplier.parquet` s,
+        cp.`tpch/nation.parquet` n
+      where
+        ps.ps_suppkey = s.s_suppkey
+        and s.s_nationkey = n.n_nationkey
+        and n.n_name = 'JAPAN'
+    )
+order by
+  `value` desc;
\ No newline at end of file


Mime
View raw message