tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [1/2] tajo git commit: TAJO-680: Improve the IN operator to support sub queries.
Date Fri, 14 Aug 2015 03:33:59 GMT
Repository: tajo
Updated Branches:
  refs/heads/master f0ab0ca20 -> 042c3e882


http://git-wip-us.apache.org/repos/asf/tajo/blob/042c3e88/tajo-plan/src/main/java/org/apache/tajo/plan/expr/ValueSetEval.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/ValueSetEval.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/ValueSetEval.java
new file mode 100644
index 0000000..4e24f5d
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/ValueSetEval.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.expr;
+
+import org.apache.tajo.datum.Datum;
+
+/**
+ * ValueSetEval is an abstract class to represent both {@link RowConstantEval} and {@link
SubqueryEval}.
+ * This is allowed only for the right child of {@link InEval}.
+ */
+public abstract class ValueSetEval extends EvalNode implements Cloneable {
+
+  public ValueSetEval(EvalType evalType) {
+    super(evalType);
+  }
+
+  public abstract Datum[] getValues();
+
+  @Override
+  public int childNum() {
+    return 0;
+  }
+
+  @Override
+  public EvalNode getChild(int idx) {
+    return null;
+  }
+
+  @Override
+  public void preOrder(EvalNodeVisitor visitor) {
+    visitor.visit(this);
+  }
+
+  @Override
+  public void postOrder(EvalNodeVisitor visitor) {
+    visitor.visit(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/042c3e88/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java
b/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java
index 403d0b8..7984024 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/GreedyHeuristicJoinOrderAlgorithm.java
@@ -306,7 +306,7 @@ public class GreedyHeuristicJoinOrderAlgorithm implements JoinOrderAlgorithm
{
           if (interchangeableWithRightVertex.contains(rightTarget)) {
             JoinEdge targetEdge = joinGraph.getEdge(leftTarget, rightTarget);
             if (targetEdge == null) {
-              if (joinGraph.isSymmetricJoinOnly()) {
+              if (joinGraph.allowArbitraryCrossJoin()) {
                 // Since the targets of the both sides are searched with symmetric characteristics,
                 // the join type is assumed as CROSS.
                 // TODO: This must be improved to consider a case when a query involves multiple
commutative and
@@ -380,6 +380,11 @@ public class GreedyHeuristicJoinOrderAlgorithm implements JoinOrderAlgorithm
{
                   SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getSchema()) /
                   SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getRightVertex().getSchema()));
           break;
+        case LEFT_ANTI:
+        case LEFT_SEMI:
+          factor *= DEFAULT_SELECTION_FACTOR * SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getSchema())
/
+              SchemaUtil.estimateRowByteSizeWithSchema(joinEdge.getLeftVertex().getSchema());
+          break;
         case INNER:
         default:
           // by default, do the same operation with that of inner join

http://git-wip-us.apache.org/repos/asf/tajo/blob/042c3e88/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/JoinGraph.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/JoinGraph.java b/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/JoinGraph.java
index 8fcdce7..1d4d48d 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/JoinGraph.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/JoinGraph.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.plan.joinorder;
 
+import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.plan.logical.JoinSpec;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.util.graph.SimpleUndirectedGraph;
@@ -29,11 +30,14 @@ import java.util.List;
  */
 public class JoinGraph extends SimpleUndirectedGraph<JoinVertex, JoinEdge> {
 
-  private boolean isSymmetricJoinOnly = true;
+  private boolean allowArbitraryCrossJoin = true;
 
   public JoinEdge addJoin(JoinGraphContext context, JoinSpec joinSpec, JoinVertex left, JoinVertex
right) {
     JoinEdge edge = context.getCachedOrNewJoinEdge(joinSpec, left, right);
-    isSymmetricJoinOnly &= PlannerUtil.isCommutativeJoinType(edge.getJoinType());
+    // TODO: the below will be improved after TAJO-1683
+    allowArbitraryCrossJoin &= PlannerUtil.isCommutativeJoinType(edge.getJoinType())
+        || edge.getJoinType() == JoinType.LEFT_SEMI || edge.getJoinType() == JoinType.LEFT_ANTI;
+
     this.addEdge(left, right, edge);
     List<JoinEdge> incomeToLeft = getIncomingEdges(left);
     if (incomeToLeft == null || incomeToLeft.isEmpty()) {
@@ -46,7 +50,7 @@ public class JoinGraph extends SimpleUndirectedGraph<JoinVertex, JoinEdge>
{
     return edge;
   }
 
-  public boolean isSymmetricJoinOnly() {
-    return isSymmetricJoinOnly;
+  public boolean allowArbitraryCrossJoin() {
+    return allowArbitraryCrossJoin;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/042c3e88/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/JoinOrderingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/JoinOrderingUtil.java
b/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/JoinOrderingUtil.java
index 3f6a1ca..35a7933 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/JoinOrderingUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/joinorder/JoinOrderingUtil.java
@@ -99,8 +99,10 @@ public class JoinOrderingUtil {
       JoinedRelationsVertex tempLeftChild = new JoinedRelationsVertex(leftEdge);
       JoinEdge tempEdge = context.getCachedOrNewJoinEdge(rightEdge.getJoinSpec(), tempLeftChild,
           rightEdge.getRightVertex());
-      if ((rightEdge.getJoinType() != JoinType.INNER && rightEdge.getJoinType() !=
JoinType.CROSS)
-          || (leftEdge.getJoinType() != JoinType.INNER && leftEdge.getJoinType()
!= JoinType.CROSS)) {
+      if ((rightEdge.getJoinType() != JoinType.INNER && rightEdge.getJoinType() !=
JoinType.CROSS
+          && rightEdge.getJoinType() != JoinType.LEFT_SEMI && rightEdge.getJoinType()
!= JoinType.LEFT_ANTI)
+          || (leftEdge.getJoinType() != JoinType.INNER && leftEdge.getJoinType()
!= JoinType.CROSS
+          && leftEdge.getJoinType() != JoinType.LEFT_SEMI && leftEdge.getJoinType()
!= JoinType.LEFT_ANTI)) {
         if (!findJoinConditionForJoinVertex(context.getCandidateJoinConditions(), tempEdge,
true).isEmpty()) {
           return false;
         }
@@ -139,6 +141,8 @@ public class JoinOrderingUtil {
    * (A full B) full C    | A full (B full C)     | Equivalent
    * ==============================================================
    *
+   * Cross, Semi and Anti joins follow the rule of the Inner join.
+   *
    * @param leftType
    * @param rightType
    * @return true if two join types are associative.
@@ -148,8 +152,12 @@ public class JoinOrderingUtil {
       return true;
     }
 
-    if (leftType == JoinType.INNER && rightType == JoinType.CROSS ||
-        leftType == JoinType.CROSS && rightType == JoinType.INNER) {
+    boolean isLeftInner = leftType == JoinType.INNER || leftType == JoinType.CROSS
+        || leftType == JoinType.LEFT_SEMI || leftType == JoinType.LEFT_ANTI;
+    boolean isRightInner = rightType == JoinType.INNER || rightType == JoinType.CROSS
+        || rightType == JoinType.LEFT_SEMI || rightType == JoinType.LEFT_ANTI;
+
+    if (isLeftInner && isRightInner) {
       return true;
     }
 
@@ -164,7 +172,7 @@ public class JoinOrderingUtil {
       return false;
     }
 
-    if ((leftType == JoinType.INNER) || leftType == JoinType.CROSS) {
+    if (isLeftInner) {
       if (rightType == JoinType.LEFT_OUTER) {
         return true;
       } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/042c3e88/tajo-plan/src/main/java/org/apache/tajo/plan/logical/RelationNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/RelationNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/RelationNode.java
index ced9a36..60a9405 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/RelationNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/RelationNode.java
@@ -32,6 +32,8 @@ import org.apache.tajo.catalog.Schema;
  */
 public abstract class RelationNode extends LogicalNode {
 
+  protected boolean nameResolveBase = true;
+
   protected RelationNode(int pid, NodeType nodeType) {
     super(pid, nodeType);
     assert(nodeType == NodeType.SCAN || nodeType == NodeType.PARTITIONS_SCAN || nodeType
== NodeType.TABLE_SUBQUERY);
@@ -58,4 +60,12 @@ public abstract class RelationNode extends LogicalNode {
    * @return A logical schema
    */
   public abstract Schema getLogicalSchema();
+
+  public boolean isNameResolveBase() {
+    return nameResolveBase;
+  }
+
+  public void setNameResolveBase(boolean isCandidate) {
+    this.nameResolveBase = isCandidate;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/042c3e88/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolver.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolver.java b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolver.java
index 3eb51ba..f5b9c43 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolver.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolver.java
@@ -26,11 +26,7 @@ import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.NestedPathUtil;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.exception.AmbiguousTableException;
-import org.apache.tajo.exception.UndefinedColumnException;
-import org.apache.tajo.exception.UndefinedTableException;
-import org.apache.tajo.exception.AmbiguousColumnException;
-import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.exception.*;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.logical.RelationNode;
@@ -150,14 +146,31 @@ public abstract class NameResolver {
    * @throws PlanningException
    */
   static Column resolveFromRelsWithinBlock(LogicalPlan plan, LogicalPlan.QueryBlock block,
-                                                  ColumnReferenceExpr columnRef)
+                                           ColumnReferenceExpr columnRef)
       throws AmbiguousColumnException, AmbiguousTableException, UndefinedColumnException,
UndefinedTableException {
-
     String qualifier;
     String canonicalName;
 
     if (columnRef.hasQualifier()) {
-      Pair<String, String> normalized = lookupQualifierAndCanonicalName(block, columnRef);
+      Pair<String, String> normalized;
+      try {
+        normalized = lookupQualifierAndCanonicalName(block, columnRef);
+      } catch (UndefinedColumnException udce) {
+        // is it correlated subquery?
+        // if the search column is not found at the current block, find it at all ancestors
of the block.
+        LogicalPlan.QueryBlock current = block;
+        while (!plan.getRootBlock().getName().equals(current.getName())) {
+          LogicalPlan.QueryBlock parentBlock = plan.getParentBlock(current);
+          for (RelationNode relationNode : parentBlock.getRelations()) {
+            if (relationNode.getLogicalSchema().containsByQualifiedName(columnRef.getCanonicalName()))
{
+              throw new NotImplementedException("Correlated subquery");
+            }
+          }
+          current = parentBlock;
+        }
+
+        throw udce;
+      }
       qualifier = normalized.getFirst();
       canonicalName = normalized.getSecond();
 
@@ -227,9 +240,11 @@ public abstract class NameResolver {
     List<Column> candidates = TUtil.newList();
 
     for (RelationNode rel : block.getRelations()) {
-      Column found = rel.getLogicalSchema().getColumn(columnName);
-      if (found != null) {
-        candidates.add(found);
+      if (rel.isNameResolveBase()) {
+        Column found = rel.getLogicalSchema().getColumn(columnName);
+        if (found != null) {
+          candidates.add(found);
+        }
       }
     }
 
@@ -363,7 +378,7 @@ public abstract class NameResolver {
 
     // throw exception if no column cannot be founded or two or more than columns are founded
     if (guessedRelations.size() == 0) {
-      throw new UndefinedColumnException(columnRef.getQualifier());
+      throw new UndefinedColumnException(columnRef.getCanonicalName());
     } else if (guessedRelations.size() > 1) {
       throw new AmbiguousColumnException(columnRef.getCanonicalName());
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/042c3e88/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteRuleProvider.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteRuleProvider.java
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteRuleProvider.java
index f73aa54..120529c 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteRuleProvider.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteRuleProvider.java
@@ -40,6 +40,8 @@ public class BaseLogicalPlanRewriteRuleProvider extends LogicalPlanRewriteRulePr
     List<Class<? extends LogicalPlanRewriteRule>> rules = TUtil.newList();
 
     rules.add(CommonConditionReduceRule.class);
+    // In-subquery rewrite phase must be executed before the filter push down phase.
+    rules.add(InSubqueryRewriteRule.class);
 
     if (systemConf.getBoolVar(TajoConf.ConfVars.$TEST_FILTER_PUSHDOWN_ENABLED)) {
       rules.add(FilterPushDownRule.class);

http://git-wip-us.apache.org/repos/asf/tajo/blob/042c3e88/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/InSubqueryRewriteRule.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/InSubqueryRewriteRule.java
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/InSubqueryRewriteRule.java
new file mode 100644
index 0000000..d0ff8b6
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/InSubqueryRewriteRule.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite.rules;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlan.QueryBlock;
+import org.apache.tajo.plan.Target;
+import org.apache.tajo.plan.expr.*;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
+import org.apache.tajo.util.TUtil;
+
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+/**
+ * InSubqueryRewriteRule finds all subqueries occurring in the where clause with "IN" keywords,
+ * and replaces them with appropriate join plans.
+ * This rule must be executed before {@link FilterPushDownRule}.
+ *
+ */
+public class InSubqueryRewriteRule implements LogicalPlanRewriteRule {
+
+  private static final String NAME = "InSubqueryRewrite";
+  private final Rewriter rewriter = new Rewriter();
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  @Override
+  public boolean isEligible(LogicalPlanRewriteRuleContext context) {
+    for (LogicalNode eachNode : PlannerUtil.findAllNodes(context.getPlan().getRootNode(),
NodeType.SELECTION)) {
+      SelectionNode selectionNode = (SelectionNode) eachNode;
+      if (!extractInSubquery(selectionNode.getQual()).isEmpty()) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  static List<InEval> extractInSubquery(EvalNode qual) {
+    List<InEval> inSubqueries = TUtil.newList();
+    for (EvalNode eachQual : EvalTreeUtil.findEvalsByType(qual, EvalType.IN)) {
+      InEval inEval = (InEval) eachQual;
+      if (inEval.getRightExpr().getType() == EvalType.SUBQUERY) {
+        inSubqueries.add(inEval);
+      }
+    }
+    return inSubqueries;
+  }
+
+  @Override
+  public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws TajoException
{
+    LogicalPlan.QueryBlock rootBlock = context.getPlan().getRootBlock();
+    LogicalPlan plan = context.getPlan();
+    rewriter.visit(context.getQueryContext(), plan, rootBlock, rootBlock.getRoot(), new Stack<LogicalNode>());
+    return plan;
+  }
+
+  private static final class Rewriter extends BasicLogicalPlanVisitor<Object, Object>
{
+    @Override
+    public Object visitFilter(Object context, LogicalPlan plan, LogicalPlan.QueryBlock block,
SelectionNode node,
+                              Stack<LogicalNode> stack) throws TajoException {
+      // Since InSubqueryRewriteRule is executed before FilterPushDownRule,
+      // we can expect that in-subqueries are found at only SelectionNode.
+
+      // Visit every child first.
+      List<InEval> inSubqueries = extractInSubquery(node.getQual());
+      stack.push(node);
+      for (InEval eachIn : inSubqueries) {
+        SubqueryEval subqueryEval = eachIn.getRightExpr();
+        QueryBlock childBlock = plan.getBlock(subqueryEval.getSubQueryNode().getSubQuery());
+        visit(context, plan, childBlock, childBlock.getRoot(), stack);
+      }
+      visit(context, plan, block, node.getChild(), stack);
+      stack.pop();
+
+      LogicalNode baseRelation = node.getChild();
+      for (InEval eachIn : inSubqueries) {
+        // 1. find the base relation for the column of the outer query
+
+        // We assume that the left child of an in-subquery is either a FieldEval or a CastEval.
+        Preconditions.checkArgument(eachIn.getLeftExpr().getType() == EvalType.FIELD ||
+            eachIn.getLeftExpr().getType() == EvalType.CAST);
+        EvalNode leftEval = eachIn.getLeftExpr();
+        SubqueryEval subqueryEval = eachIn.getRightExpr();
+        QueryBlock childBlock = plan.getBlock(subqueryEval.getSubQueryNode().getSubQuery());
+
+        // 2. create join
+        JoinType joinType = eachIn.isNot() ? JoinType.LEFT_ANTI : JoinType.LEFT_SEMI;
+        JoinNode joinNode = new JoinNode(plan.newPID());
+        joinNode.init(joinType, baseRelation, subqueryEval.getSubQueryNode());
+        joinNode.setJoinQual(buildJoinCondition(leftEval, subqueryEval.getSubQueryNode()));
+        ProjectionNode projectionNode = PlannerUtil.findTopNode(subqueryEval.getSubQueryNode(),
NodeType.PROJECTION);
+        // Insert an aggregation operator rather than just setting the distinct flag of the
ProjectionNode
+        // because the performance of distinct aggregation is poor.
+        insertDistinctOperator(plan, childBlock, projectionNode, projectionNode.getChild());
+
+        Schema inSchema = SchemaUtil.merge(joinNode.getLeftChild().getOutSchema(),
+            joinNode.getRightChild().getOutSchema());
+        joinNode.setInSchema(inSchema);
+        joinNode.setOutSchema(node.getOutSchema());
+
+        List<Target> targets = TUtil.newList(PlannerUtil.schemaToTargets(inSchema));
+        joinNode.setTargets(targets.toArray(new Target[targets.size()]));
+
+        block.addJoinType(joinType);
+        block.registerNode(joinNode);
+        plan.addHistory("IN subquery is rewritten into " + (eachIn.isNot() ? "anti" : "semi")
+ " join.");
+
+        // 3. set the created join as the base relation
+        baseRelation = joinNode;
+      }
+      
+      // 4. remove in quals
+      EvalNode[] originDnfs = AlgebraicUtil.toDisjunctiveNormalFormArray(node.getQual());
+      List<EvalNode> rewrittenDnfs = TUtil.newList();
+      for (EvalNode eachDnf : originDnfs) {
+        Set<EvalNode> cnfs = TUtil.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(eachDnf));
+        cnfs.removeAll(inSubqueries);
+        if (!cnfs.isEmpty()) {
+          rewrittenDnfs.add(AlgebraicUtil.createSingletonExprFromCNF(cnfs));
+        }
+      }
+      if (rewrittenDnfs.size() > 0) {
+        node.setQual(AlgebraicUtil.createSingletonExprFromDNF(rewrittenDnfs.toArray(new EvalNode[rewrittenDnfs.size()])));
+        // The current selection node is expected to be removed at the filter push down phase.
+        node.setChild(baseRelation);
+      } else {
+        PlannerUtil.replaceNode(plan, block.getRoot(), node, baseRelation);
+        block.unregisterNode(node);
+      }
+
+      return null;
+    }
+
+    private void insertDistinctOperator(LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                        ProjectionNode projectionNode, LogicalNode child)
throws TajoException {
+      if (projectionNode.getChild().getType() != NodeType.GROUP_BY) {
+        Schema outSchema = projectionNode.getOutSchema();
+        GroupbyNode dupRemoval = plan.createNode(GroupbyNode.class);
+        dupRemoval.setChild(child);
+        dupRemoval.setInSchema(projectionNode.getInSchema());
+        dupRemoval.setTargets(PlannerUtil.schemaToTargets(outSchema));
+        dupRemoval.setGroupingColumns(outSchema.toArray());
+
+        block.registerNode(dupRemoval);
+        block.setAggregationRequire();
+
+        projectionNode.setChild(dupRemoval);
+        projectionNode.setInSchema(dupRemoval.getOutSchema());
+      }
+    }
+
+    private EvalNode buildJoinCondition(EvalNode leftField, TableSubQueryNode subQueryNode)
{
+      FieldEval rightField = new FieldEval(subQueryNode.getOutSchema().getColumn(0));
+      return new BinaryEval(EvalType.EQUAL, leftField, rightField);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/042c3e88/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
index a7cf85e..5322868 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
@@ -1124,7 +1124,7 @@ public class ProjectionPushDownRule extends
 
   @Override
   public LogicalNode visitTableSubQuery(Context upperContext, LogicalPlan plan, LogicalPlan.QueryBlock
block,
-                                   TableSubQueryNode node, Stack<LogicalNode> stack)
throws TajoException {
+                                        TableSubQueryNode node, Stack<LogicalNode>
stack) throws TajoException {
     Context childContext = new Context(plan, upperContext.requiredSet);
     stack.push(node);
     LogicalNode child = super.visitTableSubQuery(childContext, plan, block, node, stack);

http://git-wip-us.apache.org/repos/asf/tajo/blob/042c3e88/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java
index 3a1d257..fa952ab 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeDeserializer.java
@@ -35,6 +35,7 @@ import org.apache.tajo.datum.*;
 import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.function.python.PythonScriptEngine;
+import org.apache.tajo.plan.logical.TableSubQueryNode;
 import org.apache.tajo.plan.logical.WindowSpec;
 import org.apache.tajo.plan.serder.PlanProto.WinFunctionEvalSpec;
 
@@ -105,7 +106,7 @@ public class EvalNodeDeserializer {
 
         switch (type) {
         case IN:
-          current = new InEval(lhs, (RowConstantEval) rhs, binProto.getNegative());
+          current = new InEval(lhs, (ValueSetEval) rhs, binProto.getNegative());
           break;
         case LIKE: {
           PlanProto.PatternMatchEvalSpec patternMatchProto = protoNode.getPatternMatch();
@@ -142,6 +143,12 @@ public class EvalNodeDeserializer {
         }
         current = new RowConstantEval(values);
 
+      } else if (type == EvalType.SUBQUERY) {
+        PlanProto.SubqueryEval subqueryProto = protoNode.getSubquery();
+        TableSubQueryNode subQueryNode = (TableSubQueryNode) LogicalNodeDeserializer.deserialize(context,
evalContext,
+            subqueryProto.getSubquery());
+        current = new SubqueryEval(subQueryNode);
+
       } else if (type == EvalType.FIELD) {
         CatalogProtos.ColumnProto columnProto = protoNode.getField();
         current = new FieldEval(new Column(columnProto));

http://git-wip-us.apache.org/repos/asf/tajo/blob/042c3e88/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java
index a03b637..7de0b05 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeSerializer.java
@@ -307,6 +307,21 @@ public class EvalNodeSerializer
     return function;
   }
 
+  @Override
+  public EvalNode visitSubquery(EvalTreeProtoBuilderContext context, SubqueryEval subquery,
Stack<EvalNode> stack) {
+    super.visitSubquery(context, subquery, stack);
+
+    PlanProto.SubqueryEval.Builder subqueryBuilder = PlanProto.SubqueryEval.newBuilder();
+    subqueryBuilder.setSubquery(LogicalNodeSerializer.serialize(subquery.getSubQueryNode()));
+
+    PlanProto.EvalNode.Builder builder = createEvalBuilder(context, subquery);
+    builder.setSubquery(subqueryBuilder);
+
+    context.treeBuilder.addNodes(builder);
+
+    return subquery;
+  }
+
   private WindowFrame buildWindowFrame(WindowSpec.WindowFrame frame) {
     WindowFrame.Builder windowFrameBuilder = WindowFrame.newBuilder();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/042c3e88/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
index dad9893..6ba525d 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
@@ -437,6 +437,7 @@ public class LogicalNodeDeserializer {
     }
     scan.setInSchema(convertSchema(protoNode.getInSchema()));
     scan.setOutSchema(convertSchema(protoNode.getOutSchema()));
+    scan.setNameResolveBase(scanProto.getNameResolveBase());
   }
 
   private static IndexScanNode convertIndexScan(OverridableConf context, EvalContext evalContext,
@@ -481,6 +482,7 @@ public class LogicalNodeDeserializer {
     if (proto.getTargetsCount() > 0) {
       tableSubQuery.setTargets(convertTargets(context, evalContext, proto.getTargetsList()));
     }
+    tableSubQuery.setNameResolveBase(proto.getNameResolveBase());
 
     return tableSubQuery;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/042c3e88/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
index ae74e30..13d6433 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
@@ -443,6 +443,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe
     }
 
     scanBuilder.setBroadcast(scan.isBroadcastTable());
+    scanBuilder.setNameResolveBase(scan.isNameResolveBase());
     return scanBuilder;
   }
 
@@ -505,6 +506,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe
     if (node.hasTargets()) {
       builder.addAllTargets(ProtoUtil.<PlanProto.Target>toProtoObjects(node.getTargets()));
     }
+    builder.setNameResolveBase(node.isNameResolveBase());
 
     PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node);
     nodeBuilder.setTableSubQuery(builder);

http://git-wip-us.apache.org/repos/asf/tajo/blob/042c3e88/tajo-plan/src/main/java/org/apache/tajo/plan/util/ExprFinder.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/ExprFinder.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/ExprFinder.java
index bc9ec28..461ffd6 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/ExprFinder.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/ExprFinder.java
@@ -24,17 +24,18 @@ import org.apache.tajo.algebra.OpType;
 import org.apache.tajo.algebra.UnaryOperator;
 import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.exception.TajoInternalError;
-import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.visitor.SimpleAlgebraVisitor;
+import org.apache.tajo.util.TUtil;
 
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.Stack;
 
 public class ExprFinder extends SimpleAlgebraVisitor<ExprFinder.Context, Object> {
 
   static class Context {
-    Set<Expr> set = new HashSet<Expr>();
+    List<Expr> set = TUtil.newList();
     OpType targetType;
 
     Context(OpType type) {
@@ -43,17 +44,18 @@ public class ExprFinder extends SimpleAlgebraVisitor<ExprFinder.Context,
Object>
   }
 
   public static <T extends Expr> Set<T> finds(Expr expr, OpType type) {
+    return (Set<T>) new HashSet<Expr>(findsInOrder(expr, type));
+  }
+
+  public static <T extends Expr> List<T> findsInOrder(Expr expr, OpType type)
{
     Context context = new Context(type);
     ExprFinder finder = new ExprFinder();
-    Stack<Expr> stack = new Stack<Expr>();
-    stack.push(expr);
     try {
       finder.visit(context, new Stack<Expr>(), expr);
     } catch (TajoException e) {
       throw new TajoInternalError(e);
     }
-    stack.pop();
-    return (Set<T>) context.set;
+    return (List<T>) context.set;
   }
 
   public Object visit(Context ctx, Stack<Expr> stack, Expr expr) throws TajoException
{

http://git-wip-us.apache.org/repos/asf/tajo/blob/042c3e88/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
index e4bf8bc..99e95be 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
@@ -992,4 +992,21 @@ public class PlannerUtil {
 
     return tableDescTobeCreated;
   }
+
+  /**
+   * Extract all in-subqueries from the given qual.
+   *
+   * @param qual
+   * @return
+   */
+  public static List<Expr> extractInSubquery(Expr qual) {
+    List<Expr> inSubqueries = TUtil.newList();
+    for (Expr eachIn : ExprFinder.findsInOrder(qual, OpType.InPredicate)) {
+      InPredicate inPredicate = (InPredicate) eachIn;
+      if (inPredicate.getInValue().getType() == OpType.SimpleTableSubquery) {
+        inSubqueries.add(eachIn);
+      }
+    }
+    return inSubqueries;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/042c3e88/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/LogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/LogicalPlanVerifier.java
b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/LogicalPlanVerifier.java
index f249def..57e8e3e 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/LogicalPlanVerifier.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/LogicalPlanVerifier.java
@@ -19,12 +19,9 @@
 package org.apache.tajo.plan.verifier;
 
 import com.google.common.base.Preconditions;
-import org.apache.tajo.OverridableConf;
-import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.error.Errors;
 import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.exception.TajoInternalError;
@@ -40,20 +37,20 @@ import java.util.Stack;
 import static org.apache.tajo.plan.verifier.SyntaxErrorUtil.*;
 
 public class LogicalPlanVerifier extends BasicLogicalPlanVisitor<LogicalPlanVerifier.Context,
LogicalNode> {
-  public LogicalPlanVerifier(TajoConf conf, CatalogService catalog) {
+  public LogicalPlanVerifier() {
   }
 
   public static class Context {
     VerificationState state;
 
-    public Context(OverridableConf queryContext, VerificationState state) {
+    public Context(VerificationState state) {
       this.state = state;
     }
   }
 
-  public VerificationState verify(OverridableConf queryContext, VerificationState state,
LogicalPlan plan)
+  public VerificationState verify(VerificationState state, LogicalPlan plan)
       throws TajoException {
-    Context context = new Context(queryContext, state);
+    Context context = new Context(state);
     visit(context, plan, plan.getRootBlock());
     return context.state;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/042c3e88/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/SimpleAlgebraVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/SimpleAlgebraVisitor.java
b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/SimpleAlgebraVisitor.java
index 4854d7f..ab1f74d 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/SimpleAlgebraVisitor.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/SimpleAlgebraVisitor.java
@@ -105,6 +105,12 @@ public abstract class SimpleAlgebraVisitor<CONTEXT, RESULT> extends
BaseAlgebraV
   }
 
   @Override
+  public RESULT visitSimpleTableSubquery(CONTEXT ctx, Stack<Expr> stack, SimpleTableSubquery
expr)
+      throws TajoException {
+    return super.visitSimpleTableSubquery(ctx, stack, expr);
+  }
+
+  @Override
   public RESULT visitRelationList(CONTEXT ctx, Stack<Expr> stack, RelationList expr)
throws TajoException {
     return super.visitRelationList(ctx, stack, expr);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/042c3e88/tajo-plan/src/main/proto/Plan.proto
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto
index 5acbbee..da1e187 100644
--- a/tajo-plan/src/main/proto/Plan.proto
+++ b/tajo-plan/src/main/proto/Plan.proto
@@ -112,6 +112,7 @@ message ScanNode {
   repeated Target targets = 4;
   optional EvalNodeTree qual = 5;
   optional bool broadcast = 6;
+  required bool nameResolveBase = 7;
 }
 
 message PartitionScanSpec {
@@ -190,6 +191,7 @@ message TableSubQueryNode {
   required int32 childSeq = 1;
   required string tableName = 2;
   repeated Target targets = 3;
+  required bool nameResolveBase = 4;
 }
 
 message ProjectionNode {
@@ -396,6 +398,8 @@ enum EvalType {
   ROW_CONSTANT = 32;
   FIELD = 33;
   CONST = 34;
+
+  SUBQUERY = 35;
 }
 
 message EvalNodeTree {
@@ -419,6 +423,7 @@ message EvalNode {
   optional CaseWhenEval casewhen = 13;
   optional IfCondEval ifCond = 14;
   optional PatternMatchEvalSpec patternMatch = 15;
+  optional SubqueryEval subquery = 16;
 }
 
 message UnaryEval {
@@ -438,6 +443,10 @@ message PatternMatchEvalSpec { // requires BinaryEval
   optional bool caseSensitive = 1;
 }
 
+message SubqueryEval {
+  required LogicalNodeTree subquery = 1;
+}
+
 message BetweenEval {
   required int32 predicand = 1;
   required int32 begin = 2;


Mime
View raw message