tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [25/50] [abbrv] git commit: TAJO-853: Refactoring FilterPushDown for OUTER JOIN. (Hyoungjun Kim via hyunsik)
Date Wed, 09 Jul 2014 04:10:42 GMT
TAJO-853: Refactoring FilterPushDown for OUTER JOIN. (Hyoungjun Kim via hyunsik)

Closes #28


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8cf508aa
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8cf508aa
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8cf508aa

Branch: refs/heads/window_function
Commit: 8cf508aad4cfa631f86229d7ed2875e2a0929c70
Parents: be21bc7
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Tue Jun 24 15:59:37 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Tue Jun 24 15:59:37 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   3 +
 .../apache/tajo/engine/eval/CaseWhenEval.java   |   4 +-
 .../tajo/engine/planner/LogicalOptimizer.java   |  24 +-
 .../join/GreedyHeuristicJoinOrderAlgorithm.java |  43 +-
 .../engine/planner/logical/join/JoinEdge.java   |   1 -
 .../engine/planner/logical/join/JoinGraph.java  |  76 +--
 .../planner/physical/HashLeftOuterJoinExec.java |  18 +-
 .../physical/RightOuterMergeJoinExec.java       |  29 +-
 .../engine/planner/physical/SeqScanExec.java    |   6 +-
 .../planner/rewrite/FilterPushDownRule.java     | 130 ++++--
 .../org/apache/tajo/master/GlobalEngine.java    |   2 +-
 .../master/querymaster/QueryMasterTask.java     |   2 +-
 .../org/apache/tajo/master/session/Session.java |  10 +
 .../java/org/apache/tajo/QueryTestCaseBase.java |  17 +-
 .../org/apache/tajo/TajoTestingCluster.java     |  44 +-
 .../query/TestJoinOnPartitionedTables.java      |  24 +-
 .../apache/tajo/engine/query/TestJoinQuery.java | 460 ++++++++++++++++++-
 ...estFilterPushDownPartitionColumnCaseWhen.sql |   5 +
 .../testPartialFilterPushDownOuterJoin.sql      |   4 +
 .../testPartialFilterPushDownOuterJoin2.sql     |   5 +
 .../TestJoinQuery/testWhereClauseJoin1.sql      |   3 +-
 .../TestJoinQuery/testWhereClauseJoin2.sql      |   3 +-
 .../TestJoinQuery/testWhereClauseJoin3.sql      |   3 +-
 .../TestJoinQuery/testWhereClauseJoin4.sql      |   3 +-
 ...FilterPushDownPartitionColumnCaseWhen.result |   4 +
 .../testPartialFilterPushDownOuterJoin.result   |  27 ++
 .../testPartialFilterPushDownOuterJoin2.result  |  27 ++
 .../TestJoinQuery/testWhereClauseJoin1.result   |   8 +-
 .../TestJoinQuery/testWhereClauseJoin2.result   |   8 +-
 .../TestJoinQuery/testWhereClauseJoin3.result   |   8 +-
 .../TestJoinQuery/testWhereClauseJoin4.result   |   8 +-
 .../org/apache/tajo/storage/NullScanner.java    |  62 +++
 .../org/apache/tajo/storage/StorageManager.java |  11 +
 .../tajo/storage/v2/StorageManagerV2.java       |  12 +
 35 files changed, 969 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index c9fc1ee..e8507b0 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,6 +24,9 @@ Release 0.9.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-853: Refactoring FilterPushDown for OUTER JOIN.
+    (Hyoungjun Kim via hyunsik)
+
     TAJO-840: Improve query result print with counting empty table. (jaehwa)
 
     TAJO-844: JDBC should be support getTime, getDate, and getTimestamp. 

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 3f2b16f..675de3f 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -337,6 +337,9 @@ public class TajoConf extends Configuration {
     // FILE FORMAT
     CSVFILE_NULL("tajo.csvfile.null", "\\\\N"),
 
+    //OPTIMIZER
+    OPTIMIZER_JOIN_ENABLE("tajo.optimizer.join.enable", true),
+
     // DEBUG OPTION
     TAJO_DEBUG("tajo.debug", false)
     ;

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/main/java/org/apache/tajo/engine/eval/CaseWhenEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/CaseWhenEval.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/CaseWhenEval.java
index 85b5bc0..cf1acdf 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/CaseWhenEval.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/CaseWhenEval.java
@@ -240,8 +240,8 @@ public class CaseWhenEval extends EvalNode implements GsonObject {
     @Override
     public Object clone() throws CloneNotSupportedException {
       IfThenEval ifThenEval = (IfThenEval) super.clone();
-      ifThenEval.condition = condition;
-      ifThenEval.result = result;
+      ifThenEval.condition = (EvalNode)condition.clone();
+      ifThenEval.result = (EvalNode)result.clone();
       return ifThenEval;
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
index 3bf70a7..0480fe9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
@@ -20,6 +20,8 @@ package org.apache.tajo.engine.planner;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.conf.TajoConf;
@@ -36,6 +38,7 @@ import org.apache.tajo.engine.planner.rewrite.BasicQueryRewriteEngine;
 import org.apache.tajo.engine.planner.rewrite.FilterPushDownRule;
 import org.apache.tajo.engine.planner.rewrite.PartitionedTableRewriter;
 import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
+import org.apache.tajo.master.session.Session;
 
 import java.util.LinkedHashSet;
 import java.util.Set;
@@ -49,6 +52,8 @@ import static org.apache.tajo.engine.planner.logical.join.GreedyHeuristicJoinOrd
  */
 @InterfaceStability.Evolving
 public class LogicalOptimizer {
+  private static final Log LOG = LogFactory.getLog(LogicalOptimizer.class.getName());
+
   private BasicQueryRewriteEngine rulesBeforeJoinOpt;
   private BasicQueryRewriteEngine rulesAfterToJoinOpt;
   private JoinOrderAlgorithm joinOrderAlgorithm = new GreedyHeuristicJoinOrderAlgorithm();
@@ -65,15 +70,23 @@ public class LogicalOptimizer {
   }
 
   public LogicalNode optimize(LogicalPlan plan) throws PlanningException {
+    return optimize(null, plan);
+  }
+
+  public LogicalNode optimize(Session session, LogicalPlan plan) throws PlanningException {
     rulesBeforeJoinOpt.rewrite(plan);
 
     DirectedGraphCursor<String, BlockEdge> blockCursor =
         new DirectedGraphCursor<String, BlockEdge>(plan.getQueryBlockGraph(), plan.getRootBlock().getName());
 
-    while(blockCursor.hasNext()) {
-      optimizeJoinOrder(plan, blockCursor.nextBlock());
+    if (session == null || "true".equals(session.getVariable(ConfVars.OPTIMIZER_JOIN_ENABLE.varname, "true"))) {
+      // default is true
+      while (blockCursor.hasNext()) {
+        optimizeJoinOrder(plan, blockCursor.nextBlock());
+      }
+    } else {
+      LOG.info("Skip Join Optimized.");
     }
-
     rulesAfterToJoinOpt.rewrite(plan);
     return plan.getRootBlock().getRoot();
   }
@@ -91,6 +104,8 @@ public class LogicalOptimizer {
       // finding join order and restore remain filter order
       FoundJoinOrder order = joinOrderAlgorithm.findBestOrder(plan, block,
           joinGraphContext.joinGraph, joinGraphContext.relationsForProduct);
+
+      // replace join node with FoundJoinOrder.
       JoinNode newJoinNode = order.getOrderedJoin();
       JoinNode old = PlannerUtil.findTopNode(block.getRoot(), NodeType.JOIN);
 
@@ -103,8 +118,9 @@ public class LogicalOptimizer {
       } else {
         newJoinNode.setTargets(targets.toArray(new Target[targets.size()]));
       }
-
       PlannerUtil.replaceNode(plan, block.getRoot(), old, newJoinNode);
+      // End of replacement logic
+
       String optimizedOrder = JoinOrderStringBuilder.buildJoinOrderString(plan, block);
       block.addPlanHistory("Non-optimized join order: " + originalOrder + " (cost: " + nonOptimizedJoinCost + ")");
       block.addPlanHistory("Optimized join order    : " + optimizedOrder + " (cost: " + order.getCost() + ")");

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
index f2bcd77..f65fee7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
@@ -26,9 +26,9 @@ import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.PlanningException;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.util.TUtil;
 
-import java.util.LinkedHashSet;
-import java.util.Set;
+import java.util.*;
 
 /**
  * This is a greedy heuristic algorithm to find a bushy join tree. This algorithm finds
@@ -166,6 +166,45 @@ public class GreedyHeuristicJoinOrderAlgorithm implements JoinOrderAlgorithm {
       throws PlanningException {
     JoinEdge foundJoinEdge = null;
 
+    // If outer is outer join, make edge key using all relation names in outer.
+    SortedSet<String> relationNames =
+        new TreeSet<String>(PlannerUtil.getRelationLineageWithinQueryBlock(plan, outer));
+    String outerEdgeKey = TUtil.collectionToString(relationNames);
+    for (String innerName : PlannerUtil.getRelationLineageWithinQueryBlock(plan, inner)) {
+      if (graph.hasEdge(outerEdgeKey, innerName)) {
+        JoinEdge existJoinEdge = graph.getEdge(outerEdgeKey, innerName);
+        if (foundJoinEdge == null) {
+          foundJoinEdge = new JoinEdge(existJoinEdge.getJoinType(), outer, inner,
+              existJoinEdge.getJoinQual());
+        } else {
+          foundJoinEdge.addJoinQual(AlgebraicUtil.createSingletonExprFromCNF(
+              existJoinEdge.getJoinQual()));
+        }
+      }
+    }
+    if (foundJoinEdge != null) {
+      return foundJoinEdge;
+    }
+
+    relationNames =
+        new TreeSet<String>(PlannerUtil.getRelationLineageWithinQueryBlock(plan, inner));
+    outerEdgeKey = TUtil.collectionToString(relationNames);
+    for (String outerName : PlannerUtil.getRelationLineageWithinQueryBlock(plan, outer)) {
+      if (graph.hasEdge(outerEdgeKey, outerName)) {
+        JoinEdge existJoinEdge = graph.getEdge(outerEdgeKey, outerName);
+        if (foundJoinEdge == null) {
+          foundJoinEdge = new JoinEdge(existJoinEdge.getJoinType(), inner, outer,
+              existJoinEdge.getJoinQual());
+        } else {
+          foundJoinEdge.addJoinQual(AlgebraicUtil.createSingletonExprFromCNF(
+              existJoinEdge.getJoinQual()));
+        }
+      }
+    }
+    if (foundJoinEdge != null) {
+      return foundJoinEdge;
+    }
+
     for (String outerName : PlannerUtil.getRelationLineageWithinQueryBlock(plan, outer)) {
       for (String innerName : PlannerUtil.getRelationLineageWithinQueryBlock(plan, inner)) {
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinEdge.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinEdge.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinEdge.java
index e5c29f0..c9bb571 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinEdge.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinEdge.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Sets;
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
-import org.apache.tajo.engine.planner.logical.RelationNode;
 import org.apache.tajo.util.TUtil;
 
 import java.util.Collections;

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
index d384f51..6e321f4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.planner.logical.join;
 
 import com.google.common.collect.Sets;
+import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.engine.eval.AlgebraicUtil;
@@ -31,9 +32,9 @@ import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.PlanningException;
 import org.apache.tajo.engine.planner.graph.SimpleUndirectedGraph;
 import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.util.TUtil;
 
-import java.util.Collection;
-import java.util.Set;
+import java.util.*;
 
 public class JoinGraph extends SimpleUndirectedGraph<String, JoinEdge> {
 
@@ -76,41 +77,54 @@ public class JoinGraph extends SimpleUndirectedGraph<String, JoinEdge> {
 
     return relationNames;
   }
+
   public Collection<EvalNode> addJoin(LogicalPlan plan, LogicalPlan.QueryBlock block,
                                       JoinNode joinNode) throws PlanningException {
-    Set<EvalNode> cnf = Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual()));
-    Set<EvalNode> nonJoinQuals = Sets.newHashSet();
-    for (EvalNode singleQual : cnf) {
-      if (EvalTreeUtil.isJoinQual(singleQual, true)) {
-
-        String [] relations = guessRelationsFromJoinQual(block, (BinaryEval) singleQual);
-        String leftExprRelName = relations[0];
-        String rightExprRelName = relations[1];
-
-        Collection<String> leftLineage = PlannerUtil.getRelationLineageWithinQueryBlock(plan, joinNode.getLeftChild());
-
-        boolean isLeftExprForLeftTable = leftLineage.contains(leftExprRelName);
-        JoinEdge edge;
-        edge = getEdge(leftExprRelName, rightExprRelName);
-
-        if (edge != null) {
-          edge.addJoinQual(singleQual);
-        } else {
-          if (isLeftExprForLeftTable) {
-            edge = new JoinEdge(joinNode.getJoinType(),
-                block.getRelation(leftExprRelName), block.getRelation(rightExprRelName), singleQual);
-            addEdge(leftExprRelName, rightExprRelName, edge);
+    if (joinNode.getJoinType() == JoinType.LEFT_OUTER || joinNode.getJoinType() == JoinType.RIGHT_OUTER) {
+      JoinEdge edge = new JoinEdge(joinNode.getJoinType(),
+            joinNode.getLeftChild(), joinNode.getRightChild(), joinNode.getJoinQual());
+
+      SortedSet<String> leftNodeRelationName =
+          new TreeSet<String>(PlannerUtil.getRelationLineageWithinQueryBlock(plan, joinNode.getLeftChild()));
+      SortedSet<String> rightNodeRelationName =
+          new TreeSet<String>(PlannerUtil.getRelationLineageWithinQueryBlock(plan, joinNode.getRightChild()));
+
+      addEdge(TUtil.collectionToString(leftNodeRelationName), TUtil.collectionToString(rightNodeRelationName), edge);
+
+      Set<EvalNode> allInOneCnf = new HashSet<EvalNode>();
+      allInOneCnf.add(joinNode.getJoinQual());
+
+      return allInOneCnf;
+    } else {
+      Set<EvalNode> cnf = Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual()));
+
+      for (EvalNode singleQual : cnf) {
+        if (EvalTreeUtil.isJoinQual(singleQual, true)) {
+          String[] relations = guessRelationsFromJoinQual(block, (BinaryEval) singleQual);
+          String leftExprRelName = relations[0];
+          String rightExprRelName = relations[1];
+
+          Collection<String> leftLineage = PlannerUtil.getRelationLineageWithinQueryBlock(plan, joinNode.getLeftChild());
+
+          boolean isLeftExprForLeftTable = leftLineage.contains(leftExprRelName);
+
+          JoinEdge edge = getEdge(leftExprRelName, rightExprRelName);
+          if (edge != null) {
+            edge.addJoinQual(singleQual);
           } else {
-            edge = new JoinEdge(joinNode.getJoinType(),
-                block.getRelation(rightExprRelName), block.getRelation(leftExprRelName), singleQual);
-            addEdge(rightExprRelName, leftExprRelName, edge);
+            if (isLeftExprForLeftTable) {
+              edge = new JoinEdge(joinNode.getJoinType(),
+                  block.getRelation(leftExprRelName), block.getRelation(rightExprRelName), singleQual);
+              addEdge(leftExprRelName, rightExprRelName, edge);
+            } else {
+              edge = new JoinEdge(joinNode.getJoinType(),
+                  block.getRelation(rightExprRelName), block.getRelation(leftExprRelName), singleQual);
+              addEdge(rightExprRelName, leftExprRelName, edge);
+            }
           }
         }
-      } else {
-        nonJoinQuals.add(singleQual);
       }
+      return cnf;
     }
-    cnf.retainAll(nonJoinQuals);
-    return cnf;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index 849dc38..622900f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -141,18 +141,20 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
 
       // getting a next right tuple on in-memory hash table.
       rightTuple = iterator.next();
-      frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
-      if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable
-        projector.eval(frameTuple, outTuple);
-        found = true;
-      }
-
       if (!iterator.hasNext()) { // no more right tuples for this hash key
         shouldGetLeftTuple = true;
       }
 
-      if (found) {
-        break;
+      frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
+      if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable
+        projector.eval(frameTuple, outTuple);
+        return outTuple;
+      } else {
+        // null padding
+        Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+        frameTuple.set(leftTuple, nullPaddedTuple);
+        projector.eval(frameTuple, outTuple);
+        return outTuple;
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
index c70174a..365fc22 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -25,6 +25,7 @@ import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
@@ -298,10 +299,16 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
           posRightTupleSlots = posRightTupleSlots + 1;
 
           frameTuple.set(nextLeft, aTuple);
-          joinQual.eval(inSchema, frameTuple);
-          projector.eval(frameTuple, outTuple);
-          return outTuple;
-
+          if (joinQual.eval(inSchema, frameTuple).asBool()) {
+            projector.eval(frameTuple, outTuple);
+            return outTuple;
+          } else {
+            // padding null
+            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+            frameTuple.set(nullPaddedTuple, aTuple);
+            projector.eval(frameTuple, outTuple);
+            return outTuple;
+          }
         } else {
           // right (inner) slots reached end and should be rewind if there are still tuples in the outer slots
           if(posLeftTupleSlots <= (leftTupleSlots.size() - 1)) {
@@ -313,9 +320,17 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
             posLeftTupleSlots = posLeftTupleSlots + 1;
 
             frameTuple.set(nextLeft, aTuple);
-            joinQual.eval(inSchema, frameTuple);
-            projector.eval(frameTuple, outTuple);
-            return outTuple;
+
+            if (joinQual.eval(inSchema, frameTuple).asBool()) {
+              projector.eval(frameTuple, outTuple);
+              return outTuple;
+            } else {
+              // padding null
+              Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+              frameTuple.set(nullPaddedTuple, aTuple);
+              projector.eval(frameTuple, outTuple);
+              return outTuple;
+            }
           }
         }
       } // the second if end false

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index a45cd7b..507cb6c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -233,8 +233,10 @@ public class SeqScanExec extends PhysicalExec {
       }
     }
 
-    scanner.close();
-    scanner = null;
+    if (scanner != null) {
+      scanner.close();
+      scanner = null;
+    }
 
     TupleCache.getInstance().addBroadcastCache(cacheKey, broadcastTupleCacheList);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
index ed46e25..4215423 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
@@ -154,9 +155,6 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
   @Override
   public LogicalNode visitJoin(FilterPushDownContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode joinNode,
                                Stack<LogicalNode> stack) throws PlanningException {
-    LogicalNode left = joinNode.getRightChild();
-    LogicalNode right = joinNode.getLeftChild();
-
     // here we should stop selection pushdown on the null supplying side(s) of an outer join
     // get the two operands of the join operation as well as the join type
     JoinType joinType = joinNode.getJoinType();
@@ -217,47 +215,89 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
 
     boolean isTopMostJoin = stack.peek().getType() != NodeType.JOIN;
 
-
-
-    List<EvalNode> outerJoinConditionEvals = new ArrayList<EvalNode>();
-    List<EvalNode> outerJoinFilterEvalsExcludeJoinCondition = new ArrayList<EvalNode>();
+    List<EvalNode> outerJoinPredicationEvals = new ArrayList<EvalNode>();
+    List<EvalNode> outerJoinFilterEvalsExcludePredication = new ArrayList<EvalNode>();
     if (LogicalPlanner.isOuterJoin(joinNode.getJoinType())) {
+      // TAJO-853
       // In the case of top most JOIN, all filters except JOIN condition aren't pushed down.
       // That filters are processed by SELECTION NODE.
-      for (EvalNode eachEval: context.pushingDownFilters) {
-        if (isTopMostJoin && !EvalTreeUtil.isJoinQual(eachEval, true)) {
-          outerJoinFilterEvalsExcludeJoinCondition.add(eachEval);
+      Set<String> nullSupplyingTableNameSet;
+      if (joinNode.getJoinType() == JoinType.RIGHT_OUTER) {
+        nullSupplyingTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getLeftChild()));
+      } else {
+        nullSupplyingTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getRightChild()));
+      }
+
+      Set<String> preservedTableNameSet;
+      if (joinNode.getJoinType() == JoinType.RIGHT_OUTER) {
+        preservedTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getRightChild()));
+      } else {
+        preservedTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getLeftChild()));
+      }
 
+      List<EvalNode> removedFromFilter = new ArrayList<EvalNode>();
+      for (EvalNode eachEval: context.pushingDownFilters) {
+        if (EvalTreeUtil.isJoinQual(eachEval, true)) {
+          outerJoinPredicationEvals.add(eachEval);
+          removedFromFilter.add(eachEval);
         } else {
-          outerJoinConditionEvals.add(eachEval);
+          Set<Column> columns = EvalTreeUtil.findUniqueColumns(eachEval);
+          boolean canPushDown = true;
+          for (Column eachColumn: columns) {
+            if (nullSupplyingTableNameSet.contains(eachColumn.getQualifier())) {
+              canPushDown = false;
+              break;
+            }
+          }
+          if (!canPushDown) {
+            outerJoinFilterEvalsExcludePredication.add(eachEval);
+            removedFromFilter.add(eachEval);
+          }
         }
       }
 
+      context.pushingDownFilters.removeAll(removedFromFilter);
+
       for (EvalNode eachOnEval: onConditions) {
         if (EvalTreeUtil.isJoinQual(eachOnEval, true)) {
           // If join condition, processing in the JoinNode.
-          outerJoinConditionEvals.add(eachOnEval);
+          outerJoinPredicationEvals.add(eachOnEval);
         } else {
-          // TODO pushdown(Null Supplying table) or add join condition(Preserved Row table)
-          // https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior
-          throw new PlanningException("Currently not support filter condition ON clause in the case of OUTER JOIN.");
+          // If Eval has a column which belong to Preserved Row table, not using to push down but using JoinCondition
+          Set<Column> columns = EvalTreeUtil.findUniqueColumns(eachOnEval);
+          boolean canPushDown = true;
+          for (Column eachColumn: columns) {
+            if (preservedTableNameSet.contains(eachColumn.getQualifier())) {
+              canPushDown = false;
+              break;
+            }
+          }
+          if (canPushDown) {
+            context.pushingDownFilters.add(eachOnEval);
+          } else {
+            outerJoinPredicationEvals.add(eachOnEval);
+          }
         }
       }
     } else {
       context.pushingDownFilters.addAll(onConditions);
     }
 
+    LogicalNode left = joinNode.getLeftChild();
+    LogicalNode right = joinNode.getRightChild();
+
     List<EvalNode> notMatched = new ArrayList<EvalNode>();
     // Join's input schema = right child output columns + left child output columns
-    Map<EvalNode, EvalNode> transformedMap = findCanPushdownAndTransform(context, joinNode, left, notMatched, true,
-        right.getOutSchema().size());
+    Map<EvalNode, EvalNode> transformedMap = findCanPushdownAndTransform(context, joinNode, left, notMatched, null, true,
+        0);
     context.setFiltersTobePushed(transformedMap.keySet());
     visit(context, plan, block, left, stack);
 
     context.setToOrigin(transformedMap);
     context.addFiltersTobePushed(notMatched);
 
-    transformedMap = findCanPushdownAndTransform(context, joinNode, right, notMatched, true, 0);
+    notMatched.clear();
+    transformedMap = findCanPushdownAndTransform(context, joinNode, right, notMatched, null, true, left.getOutSchema().size());
     context.setFiltersTobePushed(new HashSet<EvalNode>(transformedMap.keySet()));
 
     visit(context, plan, block, right, stack);
@@ -265,9 +305,10 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
     context.setToOrigin(transformedMap);
     context.addFiltersTobePushed(notMatched);
 
+    notMatched.clear();
     List<EvalNode> matched = Lists.newArrayList();
     if(LogicalPlanner.isOuterJoin(joinNode.getJoinType())) {
-      matched.addAll(outerJoinConditionEvals);
+      matched.addAll(outerJoinPredicationEvals);
     } else {
       for (EvalNode eval : context.pushingDownFilters) {
         if (LogicalPlanner.checkIfBeEvaluatedAtJoin(block, eval, joinNode, isTopMostJoin)) {
@@ -295,7 +336,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
       context.pushingDownFilters.removeAll(matched);
     }
 
-    context.pushingDownFilters.addAll(outerJoinFilterEvalsExcludeJoinCondition);
+    context.pushingDownFilters.addAll(outerJoinFilterEvalsExcludePredication);
     return joinNode;
   }
 
@@ -472,7 +513,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
 
     //copy -> origin
     Map<EvalNode, EvalNode> matched = findCanPushdownAndTransform(
-        context, projectionNode, childNode, notMatched, false, 0);
+        context, projectionNode, childNode, notMatched, null, false, 0);
 
     context.setFiltersTobePushed(matched.keySet());
 
@@ -515,6 +556,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
   private Map<EvalNode, EvalNode> findCanPushdownAndTransform(
       FilterPushDownContext context, Projectable node,
       LogicalNode childNode, List<EvalNode> notMatched,
+      Set<String> partitionColumns,
       boolean ignoreJoin, int columnOffset) throws PlanningException {
     // canonical name -> target
     Map<String, Target> nodeTargetMap = new HashMap<String, Target>();
@@ -547,7 +589,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
 
       if (columnMatched) {
         // transform eval column to child's output column
-        EvalNode copyEvalNode = transformEval(node, childNode, eval, nodeTargetMap, columnOffset);
+        EvalNode copyEvalNode = transformEval(node, childNode, eval, nodeTargetMap, partitionColumns, columnOffset);
         if (copyEvalNode != null) {
           matched.put(copyEvalNode, eval);
         } else {
@@ -562,7 +604,8 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
   }
 
   private EvalNode transformEval(Projectable node, LogicalNode childNode, EvalNode origin,
-                                 Map<String, Target> targetMap, int columnOffset) throws PlanningException {
+                                 Map<String, Target> targetMap, Set<String> partitionColumns,
+                                 int columnOffset) throws PlanningException {
     Schema outputSchema = childNode != null ? childNode.getOutSchema() : node.getInSchema();
     EvalNode copy;
     try {
@@ -601,11 +644,28 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
         index = index - columnOffset;
       }
       if (index < 0 || index >= outputSchema.size()) {
-        return null;
+        if (partitionColumns != null && !partitionColumns.isEmpty() && node instanceof ScanNode) {
+          ScanNode scanNode = (ScanNode)node;
+          boolean isPartitionColumn = false;
+          if (CatalogUtil.isFQColumnName(partitionColumns.iterator().next())) {
+            isPartitionColumn = partitionColumns.contains(
+                CatalogUtil.buildFQName(scanNode.getTableName(), c.getSimpleName()));
+          } else {
+            isPartitionColumn = partitionColumns.contains(c.getSimpleName());
+          }
+          if (isPartitionColumn) {
+            EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(),
+                scanNode.getCanonicalName() + "." + c.getSimpleName());
+          } else {
+            return null;
+          }
+        } else {
+          return null;
+        }
+      } else {
+        Column outputColumn = outputSchema.getColumn(index);
+        EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(), outputColumn.getQualifiedName());
       }
-      Column outputColumn = outputSchema.getColumn(index);
-
-      EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(), outputColumn.getQualifiedName());
     }
 
     return copy;
@@ -713,7 +773,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
     List<EvalNode> notMatched = new ArrayList<EvalNode>();
     // transform
     Map<EvalNode, EvalNode> tranformed =
-        findCanPushdownAndTransform(context, groupbyNode,groupbyNode.getChild(), notMatched, false, 0);
+        findCanPushdownAndTransform(context, groupbyNode,groupbyNode.getChild(), notMatched, null, false, 0);
 
     context.setFiltersTobePushed(tranformed.keySet());
     LogicalNode current = super.visitGroupBy(context, plan, block, groupbyNode, stack);
@@ -733,9 +793,11 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
     // find partition column and check matching
     Set<String> partitionColumns = new HashSet<String>();
     TableDesc table = scanNode.getTableDesc();
+    boolean hasQualifiedName = false;
     if (table.hasPartition()) {
       for (Column c: table.getPartitionMethod().getExpressionSchema().getColumns()) {
         partitionColumns.add(c.getQualifiedName());
+        hasQualifiedName = c.hasQualifier();
       }
     }
     Set<EvalNode> partitionEvals = new HashSet<EvalNode>();
@@ -747,7 +809,15 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
         }
         Column column = columns.iterator().next();
 
-        if (partitionColumns.contains(column.getSimpleName())) {
+        // If catalog runs with HCatalog, partition column is a qualified name
+        // Else partition column is a simple name
+        boolean isPartitionColumn = false;
+        if (hasQualifiedName) {
+          isPartitionColumn = partitionColumns.contains(CatalogUtil.buildFQName(table.getName(), column.getSimpleName()));
+        } else {
+          isPartitionColumn = partitionColumns.contains(column.getSimpleName());
+        }
+        if (isPartitionColumn) {
           EvalNode copy;
           try {
             copy = (EvalNode) eval.clone();
@@ -768,7 +838,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
 
     // transform
     Map<EvalNode, EvalNode> transformed =
-        findCanPushdownAndTransform(context, scanNode, null, notMatched, true, 0);
+        findCanPushdownAndTransform(context, scanNode, null, notMatched, partitionColumns, true, 0);
 
     for (EvalNode eval : transformed.keySet()) {
       if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, eval, scanNode)) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 09e0e01..db82dfa 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -483,7 +483,7 @@ public class GlobalEngine extends AbstractService {
       LOG.debug("Non Optimized Query: \n" + plan.toString());
       LOG.debug("=============================================");
     }
-    optimizer.optimize(plan);
+    optimizer.optimize(session, plan);
     LOG.info("=============================================");
     LOG.info("Optimized Query: \n" + plan.toString());
     LOG.info("=============================================");

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index ecf2202..271864c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -329,7 +329,7 @@ public class QueryMasterTask extends CompositeService {
       LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
       Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
       LogicalPlan plan = planner.createPlan(session, expr);
-      optimizer.optimize(plan);
+      optimizer.optimize(session, plan);
 
       GlobalEngine.DistributedQueryHookManager hookManager = new GlobalEngine.DistributedQueryHookManager();
       hookManager.addHook(new GlobalEngine.InsertHook());

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
index c60f50f..a67b6c8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
@@ -85,6 +85,16 @@ public class Session implements SessionConstants, ProtoObject<SessionProto> {
     }
   }
 
+  public String getVariable(String name, String defaultValue) {
+    synchronized (sessionVariables) {
+      if (sessionVariables.containsKey(name)) {
+        return sessionVariables.get(name);
+      } else {
+        return defaultValue;
+      }
+    }
+  }
+
   public void removeVariable(String name) {
     synchronized (sessionVariables) {
       sessionVariables.remove(name);

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index 1d7d0ff..70c73f9 100644
--- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -236,11 +236,20 @@ public class QueryTestCaseBase {
    * @return ResultSet of query execution.
    */
   public ResultSet executeQuery() throws Exception {
-    return executeFile(name.getMethodName() + ".sql");
+    return executeFile(getMethodName() + ".sql");
+  }
+
+  private String getMethodName() {
+    String methodName = name.getMethodName();
+    // In the case of parameter execution name's pattern is methodName[0]
+    if (methodName.endsWith("]")) {
+      methodName = methodName.substring(0, methodName.length() - 3);
+    }
+    return methodName;
   }
 
   public ResultSet executeJsonQuery() throws Exception {
-    return executeJsonFile(name.getMethodName() + ".json");
+    return executeJsonFile(getMethodName() + ".json");
   }
 
   /**
@@ -281,7 +290,7 @@ public class QueryTestCaseBase {
    * @param result Query result to be compared.
    */
   public final void assertResultSet(ResultSet result) throws IOException {
-    assertResultSet("Result Verification", result, name.getMethodName() + ".result");
+    assertResultSet("Result Verification", result, getMethodName() + ".result");
   }
 
   /**
@@ -314,7 +323,7 @@ public class QueryTestCaseBase {
   }
 
   public final void assertStrings(String actual) throws IOException {
-    assertStrings(actual, name.getMethodName() + ".result");
+    assertStrings(actual, getMethodName() + ".result");
   }
 
   public final void assertStrings(String actual, String resultFileName) throws IOException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 06d2bab..0237639 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -579,22 +579,46 @@ public class TajoTestingCluster {
         getStorageManager().getWarehouseDir();
     fs.mkdirs(rootDir);
     for (int i = 0; i < names.length; i++) {
-      Path tablePath = new Path(rootDir, names[i]);
-      fs.mkdirs(tablePath);
-      Path dfsPath = new Path(tablePath, names[i] + ".tbl");
-      FSDataOutputStream out = fs.create(dfsPath);
-      for (int j = 0; j < tables[i].length; j++) {
-        out.write((tables[i][j]+"\n").getBytes());
-      }
-      out.close();
-      TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV, tableOption);
-      client.createExternalTable(names[i], schemas[i], tablePath, meta);
+      createTable(names[i], schemas[i], tableOption, tables[i]);
     }
     Thread.sleep(1000);
     ResultSet res = client.executeQueryAndGetResult(query);
     return res;
   }
 
+  public static void createTable(String tableName, Schema schema,
+                                 KeyValueSet tableOption, String[] tableDatas) throws Exception {
+    TpchTestBase instance = TpchTestBase.getInstance();
+    TajoTestingCluster util = instance.getTestingCluster();
+    while(true) {
+      if(util.getMaster().isMasterRunning()) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+    TajoConf conf = util.getConfiguration();
+    TajoClient client = new TajoClient(conf);
+
+    FileSystem fs = util.getDefaultFileSystem();
+    Path rootDir = util.getMaster().
+        getStorageManager().getWarehouseDir();
+    if (!fs.exists(rootDir)) {
+      fs.mkdirs(rootDir);
+    }
+    Path tablePath = new Path(rootDir, tableName);
+    fs.mkdirs(tablePath);
+    if (tableDatas.length > 0) {
+      Path dfsPath = new Path(tablePath, tableName + ".tbl");
+      FSDataOutputStream out = fs.create(dfsPath);
+      for (int j = 0; j < tableDatas.length; j++) {
+        out.write((tableDatas[j] + "\n").getBytes());
+      }
+      out.close();
+    }
+    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV, tableOption);
+    client.createExternalTable(tableName, schema, tablePath, meta);
+  }
+
     /**
     * Write lines to a file.
     *

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
index 3e28f9e..781f80a 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
@@ -32,7 +32,6 @@ public class TestJoinOnPartitionedTables extends QueryTestCaseBase {
 
   @Test
   public void testPartitionTableJoinSmallTable() throws Exception {
-
     executeDDL("customer_ddl.sql", null);
     ResultSet res = executeFile("insert_into_customer.sql");
     res.close();
@@ -52,5 +51,28 @@ public class TestJoinOnPartitionedTables extends QueryTestCaseBase {
     res = executeFile("testPartialFilterPushDown.sql");
     assertResultSet(res, "testPartialFilterPushDown.result");
     res.close();
+
+    res = executeFile("testPartialFilterPushDownOuterJoin.sql");
+    assertResultSet(res, "testPartialFilterPushDownOuterJoin.result");
+    res.close();
+
+    res = executeFile("testPartialFilterPushDownOuterJoin2.sql");
+    assertResultSet(res, "testPartialFilterPushDownOuterJoin2.result");
+    res.close();
+
+    executeString("DROP TABLE customer_parts PURGE").close();
+  }
+
+  @Test
+  public void testFilterPushDownPartitionColumnCaseWhen() throws Exception {
+    executeDDL("customer_ddl.sql", null);
+    ResultSet res = executeFile("insert_into_customer.sql");
+    res.close();
+
+    res = executeQuery();
+    assertResultSet(res);
+    res.close();
+
+    executeString("DROP TABLE customer_parts PURGE").close();
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
index cc5db52..f1d7f8d 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
@@ -21,16 +21,68 @@ package org.apache.tajo.engine.query;
 import org.apache.tajo.IntegrationTest;
 import org.apache.tajo.QueryTestCaseBase;
 import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.KeyValueSet;
+import org.junit.AfterClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import java.sql.ResultSet;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
 
 @Category(IntegrationTest.class)
+@RunWith(Parameterized.class)
 public class TestJoinQuery extends QueryTestCaseBase {
 
-  public TestJoinQuery() {
+  public TestJoinQuery(String joinOption) {
     super(TajoConstants.DEFAULT_DATABASE_NAME);
+
+    if ("Hash".equals(joinOption)) {
+      testingCluster.setAllTajoDaemonConfValue(
+          ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD.varname,
+          String.valueOf(ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD.defaultLongVal));
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD.varname,
+          String.valueOf(ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD.defaultLongVal));
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
+          String.valueOf(ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.defaultLongVal));
+    } else {
+      testingCluster.setAllTajoDaemonConfValue(
+          ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD.varname, String.valueOf(1));
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD.varname,
+          String.valueOf(1));
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
+          String.valueOf(1));
+    }
+  }
+
+  @Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][]{
+        {"Hash"},
+        {"Sort"}
+    });
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    // recover the default configuration.
+    testingCluster.setAllTajoDaemonConfValue(
+        ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD.varname,
+        String.valueOf(ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD.defaultLongVal));
+    testingCluster.setAllTajoDaemonConfValue(ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD.varname,
+        String.valueOf(ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD.defaultLongVal));
+    testingCluster.setAllTajoDaemonConfValue(ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
+        String.valueOf(ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.defaultLongVal));
   }
 
   @Test
@@ -205,9 +257,14 @@ public class TestJoinQuery extends QueryTestCaseBase {
   public void testOuterJoinAndCaseWhen1() throws Exception {
     executeDDL("oj_table1_ddl.sql", "table1");
     executeDDL("oj_table2_ddl.sql", "table2");
-    ResultSet res = executeQuery();
-    assertResultSet(res);
-    cleanupQuery(res);
+    try {
+      ResultSet res = executeQuery();
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("DROP TABLE table1");
+      executeString("DROP TABLE table2");
+    }
   }
 
   @Test
@@ -397,4 +454,399 @@ public class TestJoinQuery extends QueryTestCaseBase {
     assertResultSet(res);
     cleanupQuery(res);
   }
+
+  @Test
+  public final void testLeftOuterJoinPredicationCaseByCase1() throws Exception {
+    createOuterJoinTestTable();
+    try {
+      ResultSet res = executeString(
+          "select t1.id, t1.name, t2.id, t3.id\n" +
+              "from table11 t1\n" +
+              "left outer join table12 t2\n" +
+              "on t1.id = t2.id\n" +
+              "left outer join table13 t3\n" +
+              "on t1.id = t3.id and t2.id = t3.id");
+
+      String expected =
+          "id,name,id,id\n" +
+              "-------------------------------\n" +
+              "1,table11-1,1,null\n" +
+              "2,table11-2,null,null\n" +
+              "3,table11-3,null,null\n";
+
+      String result = resultSetToString(res);
+
+      assertEquals(expected, result);
+    } finally {
+      dropOuterJoinTestTable();
+    }
+  }
+
+  @Test
+  public final void testLeftOuterJoinPredicationCaseByCase2() throws Exception {
+    // outer -> outer -> inner
+    createOuterJoinTestTable();
+    try {
+      ResultSet res = executeString(
+          "select t1.id, t1.name, t2.id, t3.id, t4.id\n" +
+              "from table11 t1\n" +
+              "left outer join table12 t2\n" +
+              "on t1.id = t2.id\n" +
+              "left outer join table13 t3\n" +
+              "on t2.id = t3.id\n" +
+              "inner join table14 t4\n" +
+              "on t2.id = t4.id"
+      );
+
+      String expected =
+          "id,name,id,id,id\n" +
+              "-------------------------------\n" +
+              "1,table11-1,1,null,1\n";
+
+      String result = resultSetToString(res);
+
+      assertEquals(expected, result);
+    } finally {
+      dropOuterJoinTestTable();
+    }
+  }
+
+  @Test
+  public final void testLeftOuterJoinPredicationCaseByCase2_1() throws Exception {
+    // inner(on predication) -> outer(on predication) -> outer -> where
+    createOuterJoinTestTable();
+    try {
+      ResultSet res = executeString(
+          "select t1.id, t1.name, t2.id, t3.id, t4.id\n" +
+              "from table11 t1\n" +
+              "inner join table14 t4\n" +
+              "on t1.id = t4.id and t4.id > 1\n" +
+              "left outer join table13 t3\n" +
+              "on t4.id = t3.id and t3.id = 2\n" +
+              "left outer join table12 t2\n" +
+              "on t1.id = t2.id \n" +
+              "where t1.id > 1"
+      );
+
+      String expected =
+          "id,name,id,id,id\n" +
+              "-------------------------------\n" +
+              "2,table11-2,null,2,2\n" +
+              "3,table11-3,null,null,3\n";
+
+      String result = resultSetToString(res);
+
+      assertEquals(expected, result);
+    } finally {
+      dropOuterJoinTestTable();
+    }
+  }
+
+  @Test
+  public final void testLeftOuterJoinPredicationCaseByCase3() throws Exception {
+    // https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior
+    // Case J1: Join Predicate on Preserved Row Table
+    createOuterJoinTestTable();
+    try {
+      ResultSet res = executeString(
+          "select t1.id, t1.name, t2.id, t3.id\n" +
+              "from table11 t1\n" +
+              "left outer join table12 t2 \n" +
+              "on t1.id = t2.id and (concat(t1.name, cast(t2.id as TEXT)) = 'table11-11' or concat(t1.name, cast(t2.id as TEXT)) = 'table11-33')\n" +
+              "left outer join table13 t3\n" +
+              "on t1.id = t3.id "
+      );
+
+      String expected =
+          "id,name,id,id\n" +
+              "-------------------------------\n" +
+              "1,table11-1,1,null\n" +
+              "2,table11-2,null,2\n" +
+              "3,table11-3,null,3\n";
+
+      String result = resultSetToString(res);
+
+      assertEquals(expected, result);
+    } finally {
+      dropOuterJoinTestTable();
+    }
+  }
+
+  @Test
+  public final void testLeftOuterJoinPredicationCaseByCase4() throws Exception {
+    // https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior
+    // Case J2: Join Predicate on Null Supplying Table
+    createOuterJoinTestTable();
+    try {
+      ResultSet res = executeString(
+          "select t1.id, t1.name, t2.id, t3.id\n" +
+              "from table11 t1\n" +
+              "left outer join table12 t2\n" +
+              "on t1.id = t2.id and t2.id > 1 \n" +
+              "left outer join table13 t3\n" +
+              "on t1.id = t3.id"
+      );
+
+      String expected =
+          "id,name,id,id\n" +
+              "-------------------------------\n" +
+              "1,table11-1,null,null\n" +
+              "2,table11-2,null,2\n" +
+              "3,table11-3,null,3\n";
+
+      String result = resultSetToString(res);
+
+      assertEquals(expected, result);
+    } finally {
+      dropOuterJoinTestTable();
+    }
+  }
+
+  @Test
+  public final void testLeftOuterJoinPredicationCaseByCase5() throws Exception {
+    // https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior
+    // Case W1: Where Predicate on Preserved Row Table
+    createOuterJoinTestTable();
+    try {
+      ResultSet res = executeString(
+          "select t1.id, t1.name, t2.id, t3.id\n" +
+              "from table11 t1\n" +
+              "left outer join table12 t2\n" +
+              "on t1.id = t2.id\n" +
+              "left outer join table13 t3\n" +
+              "on t1.id = t3.id\n" +
+              "where t1.name > 'table11-1'"
+      );
+
+      String expected =
+          "id,name,id,id\n" +
+              "-------------------------------\n" +
+              "2,table11-2,null,2\n" +
+              "3,table11-3,null,3\n";
+
+      String result = resultSetToString(res);
+
+      assertEquals(expected, result);
+    } finally {
+      dropOuterJoinTestTable();
+    }
+  }
+
+  @Test
+  public final void testLeftOuterJoinPredicationCaseByCase6() throws Exception {
+    // https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior
+    // Case W2: Where Predicate on Null Supplying Table
+    createOuterJoinTestTable();
+    try {
+      ResultSet res = executeString(
+          "select t1.id, t1.name, t2.id, t3.id\n" +
+              "from table11 t1\n" +
+              "left outer join table12 t2\n" +
+              "on t1.id = t2.id\n" +
+              "left outer join table13 t3\n" +
+              "on t1.id = t3.id\n" +
+              "where t3.id > 2"
+      );
+
+      String expected =
+          "id,name,id,id\n" +
+              "-------------------------------\n" +
+              "3,table11-3,null,3\n";
+
+      String result = resultSetToString(res);
+
+      assertEquals(expected, result);
+    } finally {
+      dropOuterJoinTestTable();
+    }
+  }
+
+  @Test
+  public final void testLeftOuterWithEmptyTable() throws Exception {
+    // https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior
+    // Case W2: Where Predicate on Null Supplying Table
+    createOuterJoinTestTable();
+    try {
+      ResultSet res = executeString(
+          "select t1.id, t1.name, t2.id\n" +
+              "from table11 t1\n" +
+              "left outer join table15 t2\n" +
+              "on t1.id = t2.id"
+      );
+
+      String expected =
+          "id,name,id\n" +
+              "-------------------------------\n" +
+              "1,table11-1,null\n" +
+              "2,table11-2,null\n" +
+              "3,table11-3,null\n";
+
+      String result = resultSetToString(res);
+
+      assertEquals(expected, result);
+    } finally {
+      dropOuterJoinTestTable();
+    }
+  }
+
+  @Test
+  public final void testRightOuterJoinPredicationCaseByCase1() throws Exception {
+    createOuterJoinTestTable();
+    try {
+      ResultSet res = executeString(
+          "select t1.id, t1.name, t2.id, t3.id\n" +
+              "from table11 t1\n" +
+              "right outer join table12 t2\n" +
+              "on t1.id = t2.id\n" +
+              "right outer join table13 t3\n" +
+              "on t1.id = t3.id and t2.id = t3.id"
+      );
+
+      String expected =
+          "id,name,id,id\n" +
+              "-------------------------------\n" +
+              "null,null,null,2\n" +
+              "null,null,null,3\n";
+
+      String result = resultSetToString(res);
+
+      assertEquals(expected, result);
+    } finally {
+      dropOuterJoinTestTable();
+    }
+  }
+
+  @Test
+  public final void testRightOuterJoinPredicationCaseByCase2() throws Exception {
+    // inner -> right
+    // Notice: Join order should be preserved with origin order.
+    // JoinEdge: t1 -> t4, t3 -> t1,t4
+    createOuterJoinTestTable();
+    try {
+      ResultSet res = executeString(
+          "select t1.id, t1.name, t3.id, t4.id\n" +
+              "from table11 t1\n" +
+              "inner join table14 t4\n" +
+              "on t1.id = t4.id and t4.id > 1\n" +
+              "right outer join table13 t3\n" +
+              "on t4.id = t3.id and t3.id = 2\n" +
+              "where t3.id > 1"
+      );
+
+      String expected =
+          "id,name,id,id\n" +
+              "-------------------------------\n" +
+              "2,table11-2,2,2\n" +
+              "null,null,3,null\n";
+
+      String result = resultSetToString(res);
+
+      assertEquals(expected, result);
+    } finally {
+      dropOuterJoinTestTable();
+    }
+  }
+
+  @Test
+  public final void testRightOuterJoinPredicationCaseByCase3() throws Exception {
+    createOuterJoinTestTable();
+    try {
+      ResultSet res = executeString(
+          "select t1.id, t1.name, t2.id, t3.id\n" +
+              "from table11 t1\n" +
+              "right outer join table12 t2 \n" +
+              "on t1.id = t2.id and (concat(t1.name, cast(t2.id as TEXT)) = 'table11-11' or concat(t1.name, cast(t2.id as TEXT)) = 'table11-33')\n" +
+              "right outer join table13 t3\n" +
+              "on t1.id = t3.id "
+      );
+
+      String expected =
+          "id,name,id,id\n" +
+              "-------------------------------\n" +
+              "null,null,null,2\n" +
+              "null,null,null,3\n";
+
+      String result = resultSetToString(res);
+
+      assertEquals(expected, result);
+    } finally {
+      dropOuterJoinTestTable();
+    }
+  }
+
+  @Test
+  public final void testFullOuterJoinPredicationCaseByCase1() throws Exception {
+    createOuterJoinTestTable();
+
+    try {
+      ResultSet res = executeString(
+          "select t1.id, t1.name, t3.id, t4.id\n" +
+              "from table11 t1\n" +
+              "full outer join table13 t3\n" +
+              "on t1.id = t3.id\n" +
+              "full outer join table14 t4\n" +
+              "on t3.id = t4.id \n" +
+              "order by t4.id"
+      );
+
+      String expected =
+          "id,name,id,id\n" +
+              "-------------------------------\n" +
+              "null,null,null,1\n" +
+              "2,table11-2,2,2\n" +
+              "3,table11-3,3,3\n" +
+              "null,null,null,4\n" +
+              "1,table11-1,null,null\n";
+
+      String result = resultSetToString(res);
+
+      assertEquals(expected, result);
+    } finally {
+      dropOuterJoinTestTable();
+    }
+  }
+
+  private void createOuterJoinTestTable() throws Exception {
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.put(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.put(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("name", Type.TEXT);
+    String[] data = new String[]{ "1|table11-1", "2|table11-2", "3|table11-3" };
+    TajoTestingCluster.createTable("table11", schema, tableOptions, data);
+
+    schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("name", Type.TEXT);
+    data = new String[]{ "1|table12-1" };
+    TajoTestingCluster.createTable("table12", schema, tableOptions, data);
+
+    schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("name", Type.TEXT);
+    data = new String[]{"2|table13-2", "3|table13-3" };
+    TajoTestingCluster.createTable("table13", schema, tableOptions, data);
+
+    schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("name", Type.TEXT);
+    data = new String[]{"1|table14-1", "2|table14-2", "3|table14-3", "4|table14-4" };
+    TajoTestingCluster.createTable("table14", schema, tableOptions, data);
+
+    schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("name", Type.TEXT);
+    data = new String[]{};
+    TajoTestingCluster.createTable("table15", schema, tableOptions, data);
+  }
+
+  private void dropOuterJoinTestTable() throws Exception {
+    executeString("DROP TABLE table11 PURGE;");
+    executeString("DROP TABLE table12 PURGE;");
+    executeString("DROP TABLE table13 PURGE;");
+    executeString("DROP TABLE table14 PURGE;");
+    executeString("DROP TABLE table15 PURGE;");
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/test/resources/queries/TestJoinOnPartitionedTables/testFilterPushDownPartitionColumnCaseWhen.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinOnPartitionedTables/testFilterPushDownPartitionColumnCaseWhen.sql b/tajo-core/src/test/resources/queries/TestJoinOnPartitionedTables/testFilterPushDownPartitionColumnCaseWhen.sql
new file mode 100644
index 0000000..951b831
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestJoinOnPartitionedTables/testFilterPushDownPartitionColumnCaseWhen.sql
@@ -0,0 +1,5 @@
+select c_custkey, c_nationkey, c_name, o_custkey, (case when a.c_nationkey > 3 then 4 else 3 end)
+from customer_parts a
+inner join orders b
+on a.c_custkey = b.o_custkey
+where a.c_custkey = (case when a.c_name like 'Customer%' and a.c_nationkey > 3 then 4 else 3 end)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/test/resources/queries/TestJoinOnPartitionedTables/testPartialFilterPushDownOuterJoin.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinOnPartitionedTables/testPartialFilterPushDownOuterJoin.sql b/tajo-core/src/test/resources/queries/TestJoinOnPartitionedTables/testPartialFilterPushDownOuterJoin.sql
new file mode 100644
index 0000000..bbb2c45
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestJoinOnPartitionedTables/testPartialFilterPushDownOuterJoin.sql
@@ -0,0 +1,4 @@
+select a.n_nationkey, a.n_name, b.c_custkey, b.c_nationkey, b.c_name
+from nation a
+left outer join customer_parts b on a.n_nationkey = b.c_custkey
+and b.c_nationkey = 1

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/test/resources/queries/TestJoinOnPartitionedTables/testPartialFilterPushDownOuterJoin2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinOnPartitionedTables/testPartialFilterPushDownOuterJoin2.sql b/tajo-core/src/test/resources/queries/TestJoinOnPartitionedTables/testPartialFilterPushDownOuterJoin2.sql
new file mode 100644
index 0000000..b30b2cb
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestJoinOnPartitionedTables/testPartialFilterPushDownOuterJoin2.sql
@@ -0,0 +1,5 @@
+-- In the case of no partition directory
+select a.n_nationkey, a.n_name, b.c_custkey, b.c_nationkey, b.c_name
+from nation a
+left outer join customer_parts b on a.n_nationkey = b.c_custkey
+and b.c_nationkey = 100

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin1.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin1.sql
index 069be09..a7c5335 100644
--- a/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin1.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin1.sql
@@ -7,4 +7,5 @@ from
   nation,
   region
 where
-  n_regionkey = r_regionkey;
\ No newline at end of file
+  n_regionkey = r_regionkey
+order by n_name
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin2.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin2.sql
index 0c6539b..4bb1a82 100644
--- a/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin2.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin2.sql
@@ -5,4 +5,5 @@ from
   nation,
   region
 where
-  n_regionkey = r_regionkey;
\ No newline at end of file
+  n_regionkey = r_regionkey
+order by n_name
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin3.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin3.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin3.sql
index 6495958..631469b 100644
--- a/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin3.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin3.sql
@@ -6,4 +6,5 @@ select
 from
   nation, region
 where
-  n_regionkey = r_regionkey;
\ No newline at end of file
+  n_regionkey = r_regionkey
+order by n_name
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin4.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin4.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin4.sql
index d8ee615..08e6807 100644
--- a/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin4.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin4.sql
@@ -5,4 +5,5 @@ select
 from
   nation, region
 where
-  n_regionkey = r_regionkey;
\ No newline at end of file
+  n_regionkey = r_regionkey
+order by n_name
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/test/resources/results/TestJoinOnPartitionedTables/testFilterPushDownPartitionColumnCaseWhen.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestJoinOnPartitionedTables/testFilterPushDownPartitionColumnCaseWhen.result b/tajo-core/src/test/resources/results/TestJoinOnPartitionedTables/testFilterPushDownPartitionColumnCaseWhen.result
new file mode 100644
index 0000000..4ba41a3
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestJoinOnPartitionedTables/testFilterPushDownPartitionColumnCaseWhen.result
@@ -0,0 +1,4 @@
+c_custkey,c_nationkey,c_name,o_custkey,?casewhen
+-------------------------------
+3,1,Customer#000000003,3,3
+4,4,Customer#000000004,4,4
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/test/resources/results/TestJoinOnPartitionedTables/testPartialFilterPushDownOuterJoin.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestJoinOnPartitionedTables/testPartialFilterPushDownOuterJoin.result b/tajo-core/src/test/resources/results/TestJoinOnPartitionedTables/testPartialFilterPushDownOuterJoin.result
new file mode 100644
index 0000000..73150a0
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestJoinOnPartitionedTables/testPartialFilterPushDownOuterJoin.result
@@ -0,0 +1,27 @@
+n_nationkey,n_name,c_custkey,c_nationkey,c_name
+-------------------------------
+0,ALGERIA,null,null,null
+1,ARGENTINA,null,null,null
+2,BRAZIL,null,null,null
+3,CANADA,3,1,Customer#000000003
+4,EGYPT,null,null,null
+5,ETHIOPIA,null,null,null
+6,FRANCE,null,null,null
+7,GERMANY,null,null,null
+8,INDIA,null,null,null
+9,INDONESIA,null,null,null
+10,IRAN,null,null,null
+11,IRAQ,null,null,null
+12,JAPAN,null,null,null
+13,JORDAN,null,null,null
+14,KENYA,null,null,null
+15,MOROCCO,null,null,null
+16,MOZAMBIQUE,null,null,null
+17,PERU,null,null,null
+18,CHINA,null,null,null
+19,ROMANIA,null,null,null
+20,SAUDI ARABIA,null,null,null
+21,VIETNAM,null,null,null
+22,RUSSIA,null,null,null
+23,UNITED KINGDOM,null,null,null
+24,UNITED STATES,null,null,null
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/test/resources/results/TestJoinOnPartitionedTables/testPartialFilterPushDownOuterJoin2.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestJoinOnPartitionedTables/testPartialFilterPushDownOuterJoin2.result b/tajo-core/src/test/resources/results/TestJoinOnPartitionedTables/testPartialFilterPushDownOuterJoin2.result
new file mode 100644
index 0000000..442ac75
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestJoinOnPartitionedTables/testPartialFilterPushDownOuterJoin2.result
@@ -0,0 +1,27 @@
+n_nationkey,n_name,c_custkey,c_nationkey,c_name
+-------------------------------
+0,ALGERIA,null,null,null
+1,ARGENTINA,null,null,null
+2,BRAZIL,null,null,null
+3,CANADA,null,null,null
+4,EGYPT,null,null,null
+5,ETHIOPIA,null,null,null
+6,FRANCE,null,null,null
+7,GERMANY,null,null,null
+8,INDIA,null,null,null
+9,INDONESIA,null,null,null
+10,IRAN,null,null,null
+11,IRAQ,null,null,null
+12,JAPAN,null,null,null
+13,JORDAN,null,null,null
+14,KENYA,null,null,null
+15,MOROCCO,null,null,null
+16,MOZAMBIQUE,null,null,null
+17,PERU,null,null,null
+18,CHINA,null,null,null
+19,ROMANIA,null,null,null
+20,SAUDI ARABIA,null,null,null
+21,VIETNAM,null,null,null
+22,RUSSIA,null,null,null
+23,UNITED KINGDOM,null,null,null
+24,UNITED STATES,null,null,null
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/test/resources/results/TestJoinQuery/testWhereClauseJoin1.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestJoinQuery/testWhereClauseJoin1.result b/tajo-core/src/test/resources/results/TestJoinQuery/testWhereClauseJoin1.result
index 5c54325..5691b50 100644
--- a/tajo-core/src/test/resources/results/TestJoinQuery/testWhereClauseJoin1.result
+++ b/tajo-core/src/test/resources/results/TestJoinQuery/testWhereClauseJoin1.result
@@ -4,6 +4,7 @@ ALGERIA,AFRICA,0,0
 ARGENTINA,AMERICA,1,1
 BRAZIL,AMERICA,1,1
 CANADA,AMERICA,1,1
+CHINA,ASIA,2,2
 EGYPT,MIDDLE EAST,4,4
 ETHIOPIA,AFRICA,0,0
 FRANCE,EUROPE,3,3
@@ -18,10 +19,9 @@ KENYA,AFRICA,0,0
 MOROCCO,AFRICA,0,0
 MOZAMBIQUE,AFRICA,0,0
 PERU,AMERICA,1,1
-CHINA,ASIA,2,2
 ROMANIA,EUROPE,3,3
-SAUDI ARABIA,MIDDLE EAST,4,4
-VIETNAM,ASIA,2,2
 RUSSIA,EUROPE,3,3
+SAUDI ARABIA,MIDDLE EAST,4,4
 UNITED KINGDOM,EUROPE,3,3
-UNITED STATES,AMERICA,1,1
\ No newline at end of file
+UNITED STATES,AMERICA,1,1
+VIETNAM,ASIA,2,2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/test/resources/results/TestJoinQuery/testWhereClauseJoin2.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestJoinQuery/testWhereClauseJoin2.result b/tajo-core/src/test/resources/results/TestJoinQuery/testWhereClauseJoin2.result
index 178ddd6..c83d6d6 100644
--- a/tajo-core/src/test/resources/results/TestJoinQuery/testWhereClauseJoin2.result
+++ b/tajo-core/src/test/resources/results/TestJoinQuery/testWhereClauseJoin2.result
@@ -4,6 +4,7 @@ ALGERIA,AFRICA
 ARGENTINA,AMERICA
 BRAZIL,AMERICA
 CANADA,AMERICA
+CHINA,ASIA
 EGYPT,MIDDLE EAST
 ETHIOPIA,AFRICA
 FRANCE,EUROPE
@@ -18,10 +19,9 @@ KENYA,AFRICA
 MOROCCO,AFRICA
 MOZAMBIQUE,AFRICA
 PERU,AMERICA
-CHINA,ASIA
 ROMANIA,EUROPE
-SAUDI ARABIA,MIDDLE EAST
-VIETNAM,ASIA
 RUSSIA,EUROPE
+SAUDI ARABIA,MIDDLE EAST
 UNITED KINGDOM,EUROPE
-UNITED STATES,AMERICA
\ No newline at end of file
+UNITED STATES,AMERICA
+VIETNAM,ASIA
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/test/resources/results/TestJoinQuery/testWhereClauseJoin3.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestJoinQuery/testWhereClauseJoin3.result b/tajo-core/src/test/resources/results/TestJoinQuery/testWhereClauseJoin3.result
index 9f3123a..e559818 100644
--- a/tajo-core/src/test/resources/results/TestJoinQuery/testWhereClauseJoin3.result
+++ b/tajo-core/src/test/resources/results/TestJoinQuery/testWhereClauseJoin3.result
@@ -4,6 +4,7 @@ ALGERIA,AFRICA,1,1
 ARGENTINA,AMERICA,2,2
 BRAZIL,AMERICA,3,2
 CANADA,AMERICA,4,2
+CHINA,ASIA,19,3
 EGYPT,MIDDLE EAST,5,5
 ETHIOPIA,AFRICA,6,1
 FRANCE,EUROPE,7,4
@@ -18,10 +19,9 @@ KENYA,AFRICA,15,1
 MOROCCO,AFRICA,16,1
 MOZAMBIQUE,AFRICA,17,1
 PERU,AMERICA,18,2
-CHINA,ASIA,19,3
 ROMANIA,EUROPE,20,4
-SAUDI ARABIA,MIDDLE EAST,21,5
-VIETNAM,ASIA,22,3
 RUSSIA,EUROPE,23,4
+SAUDI ARABIA,MIDDLE EAST,21,5
 UNITED KINGDOM,EUROPE,24,4
-UNITED STATES,AMERICA,25,2
\ No newline at end of file
+UNITED STATES,AMERICA,25,2
+VIETNAM,ASIA,22,3
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-core/src/test/resources/results/TestJoinQuery/testWhereClauseJoin4.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestJoinQuery/testWhereClauseJoin4.result b/tajo-core/src/test/resources/results/TestJoinQuery/testWhereClauseJoin4.result
index b7f95a8..90df873 100644
--- a/tajo-core/src/test/resources/results/TestJoinQuery/testWhereClauseJoin4.result
+++ b/tajo-core/src/test/resources/results/TestJoinQuery/testWhereClauseJoin4.result
@@ -4,6 +4,7 @@ ALGERIA,AFRICA,0
 ARGENTINA,AMERICA,2
 BRAZIL,AMERICA,3
 CANADA,AMERICA,4
+CHINA,ASIA,20
 EGYPT,MIDDLE EAST,8
 ETHIOPIA,AFRICA,5
 FRANCE,EUROPE,9
@@ -18,10 +19,9 @@ KENYA,AFRICA,14
 MOROCCO,AFRICA,15
 MOZAMBIQUE,AFRICA,16
 PERU,AMERICA,18
-CHINA,ASIA,20
 ROMANIA,EUROPE,22
-SAUDI ARABIA,MIDDLE EAST,24
-VIETNAM,ASIA,23
 RUSSIA,EUROPE,25
+SAUDI ARABIA,MIDDLE EAST,24
 UNITED KINGDOM,EUROPE,26
-UNITED STATES,AMERICA,25
\ No newline at end of file
+UNITED STATES,AMERICA,25
+VIETNAM,ASIA,23
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
new file mode 100644
index 0000000..4cec67d
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
@@ -0,0 +1,62 @@
+package org.apache.tajo.storage; /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.fragment.FileFragment;
+
+import java.io.IOException;
+
+public class NullScanner extends FileScanner {
+  public NullScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) {
+    super(conf, schema, meta, fragment);
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    progress = 1.0f;
+
+    return null;
+  }
+
+  @Override
+  public void reset() throws IOException {
+    progress = 0.0f;
+  }
+
+  @Override
+  public void close() throws IOException {
+    progress = 0.0f;
+  }
+
+  @Override
+  public boolean isProjectable() {
+    return false;
+  }
+
+  @Override
+  public boolean isSelectable() {
+    return true;
+  }
+
+  @Override
+  public boolean isSplittable() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
index 1b852d4..4b23f4d 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -22,6 +22,7 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.Fragment;
 
 import java.io.IOException;
@@ -54,6 +55,16 @@ public class StorageManager extends AbstractStorageManager {
 
   @Override
   public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
+    if (fragment instanceof FileFragment) {
+      FileFragment fileFragment = (FileFragment)fragment;
+      if (fileFragment.getEndKey() == 0) {
+        Scanner scanner = new NullScanner(conf, schema, meta, fileFragment);
+        scanner.setTarget(target.toArray());
+
+        return scanner;
+      }
+    }
+
     Scanner scanner;
 
     Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType());

http://git-wip-us.apache.org/repos/asf/tajo/blob/8cf508aa/tajo-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
index cffff00..2fd4a99 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
@@ -25,7 +25,9 @@ import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.NullScanner;
 import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.Fragment;
 
 import java.io.IOException;
@@ -72,6 +74,16 @@ public final class StorageManagerV2 extends AbstractStorageManager {
 
   @Override
   public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
+    if (fragment instanceof FileFragment) {
+      FileFragment fileFragment = (FileFragment)fragment;
+      if (fileFragment.getEndKey() == 0) {
+        Scanner scanner = new NullScanner(conf, schema, meta, fileFragment);
+        scanner.setTarget(target.toArray());
+
+        return scanner;
+      }
+    }
+
     Scanner scanner;
 
     Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType());


Mime
View raw message