tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [3/3] git commit: TAJO-216: Improve FilterPushDownRule and Implement physical operators for outer join. (camelia_c via hyunsik)
Date Wed, 02 Oct 2013 01:33:20 GMT
TAJO-216: Improve FilterPushDownRule and Implement physical operators for outer join. (camelia_c via hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/3e6d684a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/3e6d684a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/3e6d684a

Branch: refs/heads/master
Commit: 3e6d684a9f1c7592c99d359c376956fcc9e917ba
Parents: 406337d
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Wed Oct 2 10:29:03 2013 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Wed Oct 2 10:32:23 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../java/org/apache/tajo/datum/CharDatum.java   |  23 +-
 .../java/org/apache/tajo/datum/Float4Datum.java |  92 ++--
 .../java/org/apache/tajo/datum/Float8Datum.java |  64 ++-
 .../java/org/apache/tajo/datum/Int2Datum.java   |  65 ++-
 .../java/org/apache/tajo/datum/Int4Datum.java   |  58 ++-
 .../java/org/apache/tajo/datum/Int8Datum.java   |  63 ++-
 .../java/org/apache/tajo/datum/NullDatum.java   |   2 +-
 .../java/org/apache/tajo/datum/TextDatum.java   |  27 +-
 .../engine/planner/PhysicalPlannerImpl.java     | 178 ++++++-
 .../planner/physical/FullOuterHashJoinExec.java | 252 +++++++++
 .../physical/FullOuterMergeJoinExec.java        | 334 ++++++++++++
 .../planner/physical/LeftOuterHashJoinExec.java | 214 ++++++++
 .../planner/physical/LeftOuterNLJoinExec.java   | 129 +++++
 .../physical/RightOuterMergeJoinExec.java       | 343 ++++++++++++
 .../planner/rewrite/FilterPushDownRule.java     | 111 +++-
 .../org/apache/tajo/engine/utils/TupleUtil.java |  16 +
 .../physical/TestFullOuterHashJoinExec.java     | 394 ++++++++++++++
 .../physical/TestFullOuterMergeJoinExec.java    | 516 +++++++++++++++++++
 .../physical/TestLeftOuterHashJoinExec.java     | 450 ++++++++++++++++
 .../physical/TestLeftOuterNLJoinExec.java       | 459 +++++++++++++++++
 .../planner/physical/TestMergeJoinExec.java     |   2 +-
 .../physical/TestRightOuterHashJoinExec.java    | 342 ++++++++++++
 .../physical/TestRightOuterMergeJoinExec.java   | 506 ++++++++++++++++++
 24 files changed, 4518 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a148c20..e802376 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,9 @@ Release 0.2.0 - unreleased
 
   NEW FEATURES
 
+    TAJO-216: Improve FilterPushDownRule and Implement physical operators 
+    for outer join. (camelia_c via hyunsik)
+
     TAJO-211: Implement regexp_replace function. (hyunsik)
 
     TAJO-212: Implement type cast expresion. (hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java
index 9ce9089..651b00b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/CharDatum.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.datum;
 
+import com.google.common.primitives.UnsignedBytes;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.datum.exception.InvalidOperationException;
 
@@ -135,10 +136,14 @@ public class CharDatum extends Datum {
   @Override
   public BooleanDatum equalsTo(Datum datum) {
     switch (datum.type()) {
-    case CHAR:
-      return DatumFactory.createBool(this.equals(datum));
-    default:
-      throw new InvalidOperationException(datum.type());
+      case CHAR:
+        return DatumFactory.createBool(this.equals(datum));
+
+      case NULL:
+        return DatumFactory.createBool(false);
+
+      default:
+        throw new InvalidOperationException();
     }
   }
   
@@ -147,9 +152,13 @@ public class CharDatum extends Datum {
     switch (datum.type()) {
       case CHAR:
         CharDatum other = (CharDatum) datum;
-        return this.getString().compareTo(other.getString());
-    default:
-      throw new InvalidOperationException(datum.type());
+        return UnsignedBytes.lexicographicalComparator().compare(bytes, other.bytes);
+
+      case NULL:
+        return -1;
+
+      default:
+        throw new InvalidOperationException();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java
index 97e608a..8de20ee 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java
@@ -33,7 +33,7 @@ public class Float4Datum extends Datum implements NumericDatum {
 	public Float4Datum() {
 		super(TajoDataTypes.Type.FLOAT4);
 	}
-	
+
 	public Float4Datum(float val) {
 		this();
 		this.val = val;
@@ -44,7 +44,7 @@ public class Float4Datum extends Datum implements NumericDatum {
     ByteBuffer bb = ByteBuffer.wrap(bytes);
     this.val = bb.getFloat();
   }
-	
+
 	public boolean asBool() {
 		throw new InvalidCastException();
 	}
@@ -53,7 +53,7 @@ public class Float4Datum extends Datum implements NumericDatum {
   public char asChar() {
     return asChars().charAt(0);
   }
-	
+
 	@Override
 	public short asInt2() {
 		return (short) val;
@@ -105,7 +105,7 @@ public class Float4Datum extends Datum implements NumericDatum {
   public int size() {
     return size;
   }
-  
+
   @Override
   public int hashCode() {
     return (int) val;
@@ -117,73 +117,87 @@ public class Float4Datum extends Datum implements NumericDatum {
       Float4Datum other = (Float4Datum) obj;
       return val == other.val;
     }
-    
+
     return false;
   }
 
   @Override
   public BooleanDatum equalsTo(Datum datum) {
     switch (datum.type()) {
-    case INT2:
-      return DatumFactory.createBool(val == datum.asInt2());
-    case INT4:
-      return DatumFactory.createBool(val == datum.asInt4());
-    case INT8:
-      return DatumFactory.createBool(val == datum.asInt8());
-    case FLOAT4:
-      return DatumFactory.createBool(val == datum.asFloat4());
-    case FLOAT8:
-      return DatumFactory.createBool(val == datum.asFloat8());
-    default:
-      throw new InvalidOperationException(datum.type());
+      case INT2:
+        return DatumFactory.createBool(val == datum.asInt2());
+      case INT4:
+        return DatumFactory.createBool(val == datum.asInt4());
+      case INT8:
+        return DatumFactory.createBool(val == datum.asInt8());
+      case FLOAT4:
+        return DatumFactory.createBool(val == datum.asFloat4());
+      case FLOAT8:
+        return DatumFactory.createBool(val == datum.asFloat8());
+      case NULL:
+        return DatumFactory.createBool(false);
+      default:
+        throw new InvalidOperationException();
     }
   }
 
   @Override
   public int compareTo(Datum datum) {
     switch (datum.type()) {
-      case INT2:
-        if (val < datum.asInt2()) {
+      case INT2: {
+        short another = datum.asInt2();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt2() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case INT4:
-        if (val < datum.asInt4()) {
+      }
+      case INT4: {
+        int another = datum.asInt4();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt4() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case INT8:
-        if (val < datum.asInt8()) {
+      }
+      case INT8: {
+        long another = datum.asInt8();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt8() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case FLOAT4:
-        if (val < datum.asFloat4()) {
+      }
+      case FLOAT4: {
+        float another = datum.asFloat4();
+        if (val < another) {
           return -1;
-        } else if (datum.asFloat4() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case FLOAT8:
-        if (val < datum.asFloat8()) {
+      }
+      case FLOAT8: {
+        double another = datum.asFloat8();
+        if (val < another) {
           return -1;
-        } else if (datum.asFloat8() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
+      }
+      case NULL:
+        return -1;
       default:
-        throw new InvalidOperationException(datum.type());
+        throw new InvalidOperationException();
     }
   }
 
@@ -200,6 +214,8 @@ public class Float4Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val + datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val + datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -218,6 +234,8 @@ public class Float4Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val - datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val - datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -236,6 +254,8 @@ public class Float4Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val * datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val * datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException();
     }
@@ -254,6 +274,8 @@ public class Float4Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val / datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val / datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -272,6 +294,8 @@ public class Float4Datum extends Datum implements NumericDatum {
         return DatumFactory.createFloat4(val / datum.asFloat4());
       case FLOAT8:
         return DatumFactory.createFloat8(val / datum.asFloat8());
+      case NULL:
+        return datum;
       default:
         throw new InvalidOperationException(datum.type());
     }
@@ -279,6 +303,6 @@ public class Float4Datum extends Datum implements NumericDatum {
 
   @Override
   public void inverseSign() {
-    this.val = - val;    
+    this.val = - val;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java
index d531942..1857929 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java
@@ -28,10 +28,16 @@ import org.apache.tajo.util.NumberUtil;
 
 import java.nio.ByteBuffer;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
 public class Float8Datum extends Datum implements NumericDatum {
   private static final int size = 8;
   @Expose private double val;
 
+  private static final Log LOG = LogFactory.getLog(Float8Datum.class);
+
 	public Float8Datum() {
 		super(TajoDataTypes.Type.FLOAT8);
 	}
@@ -126,56 +132,70 @@ public class Float8Datum extends Datum implements NumericDatum {
       return DatumFactory.createBool(val == datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createBool(val == datum.asFloat8());
+    case NULL:
+      return DatumFactory.createBool(false);
     default:
-      throw new InvalidOperationException(datum.type());
+      throw new InvalidOperationException();
     }
   }
 
   @Override
   public int compareTo(Datum datum) {
     switch (datum.type()) {
-      case INT2:
-        if (val < datum.asInt2()) {
+      case INT2: {
+        short another = datum.asInt2();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt2() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case INT4:
-        if (val < datum.asInt4()) {
+      }
+      case INT4: {
+        int another = datum.asInt4();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt4() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case INT8:
-        if (val < datum.asInt8()) {
+      }
+      case INT8:{
+        long another = datum.asInt8();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt8() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case FLOAT4:
-        if (val < datum.asFloat4()) {
+      }
+      case FLOAT4: {
+        float another = datum.asFloat4();
+        if (val < another) {
           return -1;
-        } else if (datum.asFloat4() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case FLOAT8:
-        if (val < datum.asFloat8()) {
+      }
+      case FLOAT8: {
+        double another = datum.asFloat8();
+        if (val < another) {
           return -1;
-        } else if (datum.asFloat8() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
+      }
+      case NULL:
+        return -1;
       default:
-        throw new InvalidOperationException(datum.type());
+        throw new InvalidOperationException();
     }
   }
 
@@ -192,6 +212,8 @@ public class Float8Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val + datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val + datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -210,6 +232,8 @@ public class Float8Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val - datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val - datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -228,6 +252,8 @@ public class Float8Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val * datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val * datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException();
     }
@@ -246,6 +272,8 @@ public class Float8Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val / datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val / datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -264,6 +292,8 @@ public class Float8Datum extends Datum implements NumericDatum {
         return DatumFactory.createFloat8(val % datum.asFloat4());
       case FLOAT8:
         return DatumFactory.createFloat8(val % datum.asFloat8());
+      case NULL:
+        return datum;
       default:
         throw new InvalidOperationException(datum.type());
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java
index b34031f..000a107 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java
@@ -25,10 +25,17 @@ import org.apache.tajo.util.NumberUtil;
 
 import java.nio.ByteBuffer;
 
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
 public class Int2Datum extends Datum implements NumericDatum {
   private static final int size = 2;  
   @Expose private short val;
 
+  private static final Log LOG = LogFactory.getLog(Int2Datum.class);
+
   public Int2Datum() {
     super(TajoDataTypes.Type.INT2);
   }
@@ -124,56 +131,70 @@ public class Int2Datum extends Datum implements NumericDatum {
       return DatumFactory.createBool(val == datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createBool(val == datum.asFloat8());
+    case NULL:
+      return DatumFactory.createBool(false);
     default:
-      throw new InvalidOperationException(datum.type());
+      throw new InvalidOperationException();
     }
   }
 
   @Override
   public int compareTo(Datum datum) {
     switch (datum.type()) {
-      case INT2:
-        if (val < datum.asInt2()) {
+      case INT2: {
+        short another = datum.asInt2();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt2() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case INT4:
-        if (val < datum.asInt4()) {
+      }
+      case INT4: {
+        int another = datum.asInt4();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt4() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case INT8:
-        if (val < datum.asInt8()) {
+      }
+      case INT8: {
+        long another = datum.asInt8();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt8() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case FLOAT4:
-        if (val < datum.asFloat4()) {
+      }
+      case FLOAT4: {
+        float another = datum.asFloat4();
+        if (val < another) {
           return -1;
-        } else if (datum.asFloat4() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case FLOAT8:
-        if (val < datum.asFloat8()) {
+      }
+      case FLOAT8: {
+        double another = datum.asFloat8();
+        if (val < another) {
           return -1;
-        } else if (datum.asFloat8() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
+      }
+      case NULL:
+        return -1;
       default:
-        throw new InvalidOperationException(datum.type());
+        throw new InvalidOperationException();
     }
   }
 
@@ -190,6 +211,8 @@ public class Int2Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat4(val + datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val + datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -208,6 +231,8 @@ public class Int2Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat4(val - datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val - datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -226,6 +251,8 @@ public class Int2Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat4(val * datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val * datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -244,6 +271,8 @@ public class Int2Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat4(val / datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val / datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -262,6 +291,8 @@ public class Int2Datum extends Datum implements NumericDatum {
         return DatumFactory.createFloat4(val % datum.asFloat4());
       case FLOAT8:
         return DatumFactory.createFloat8(val % datum.asFloat8());
+      case NULL:
+        return datum;
       default:
         throw new InvalidOperationException(datum.type());
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java
index 1422187..a68a5f1 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java
@@ -20,12 +20,12 @@ package org.apache.tajo.datum;
 
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.datum.exception.InvalidCastException;
 import org.apache.tajo.datum.exception.InvalidOperationException;
 import org.apache.tajo.util.NumberUtil;
 
 import java.nio.ByteBuffer;
 
+
 public class Int4Datum extends Datum implements NumericDatum {
   private static final int size = 4;
   @Expose private int val;
@@ -129,6 +129,8 @@ public class Int4Datum extends Datum implements NumericDatum {
       return DatumFactory.createBool(val == datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createBool(val == datum.asFloat8());
+    case NULL:
+      return DatumFactory.createBool(false);
     default:
       throw new InvalidOperationException();
     }
@@ -137,48 +139,60 @@ public class Int4Datum extends Datum implements NumericDatum {
   @Override
   public int compareTo(Datum datum) {
     switch (datum.type()) {
-      case INT2:
-        if (val < datum.asInt2()) {
+      case INT2: {
+        short another = datum.asInt2();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt2() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case INT4:
-        if (val < datum.asInt4()) {
+      }
+      case INT4: {
+        int another = datum.asInt4();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt4() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case INT8:
-        if (val < datum.asInt8()) {
+      }
+      case INT8: {
+        long another = datum.asInt8();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt8() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case FLOAT4:
-        if (val < datum.asFloat4()) {
+      }
+      case FLOAT4:{
+        float another = datum.asFloat4();
+        if (val < another) {
           return -1;
-        } else if (datum.asFloat4() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case FLOAT8:
-        if (val < datum.asFloat8()) {
+      }
+      case FLOAT8: {
+        double another = datum.asFloat8();
+        if (val < another) {
           return -1;
-        } else if (datum.asFloat8() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
+      }
+      case NULL:
+        return -1;
       default:
-        throw new InvalidOperationException(datum.type());
+        throw new InvalidOperationException();
     }
   }
 
@@ -195,6 +209,8 @@ public class Int4Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat4(val + datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val + datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -213,6 +229,8 @@ public class Int4Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat4(val - datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val - datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -231,6 +249,8 @@ public class Int4Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat4(val * datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val * datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException();
     }
@@ -249,6 +269,8 @@ public class Int4Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat4(val / datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val / datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -267,6 +289,8 @@ public class Int4Datum extends Datum implements NumericDatum {
         return DatumFactory.createFloat4(val % datum.asFloat4());
       case FLOAT8:
         return DatumFactory.createFloat8(val % datum.asFloat8());
+      case NULL:
+        return datum;
       default:
         throw new InvalidOperationException(datum.type());
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java
index 689adbc..7e8bd27 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java
@@ -26,6 +26,11 @@ import org.apache.tajo.util.NumberUtil;
 
 import java.nio.ByteBuffer;
 
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
 public class Int8Datum extends Datum implements NumericDatum {
   private static final int size = 8;
   @Expose private long val;
@@ -135,56 +140,70 @@ public class Int8Datum extends Datum implements NumericDatum {
         return DatumFactory.createBool(val == datum.asFloat4());
       case FLOAT8:
         return DatumFactory.createBool(val == datum.asFloat8());
+      case NULL:
+        return DatumFactory.createBool(false);
       default:
-        throw new InvalidOperationException(datum.type());
+        throw new InvalidOperationException();
     }
   }
 
   @Override
   public int compareTo(Datum datum) {
     switch (datum.type()) {
-      case INT2:
-        if (val < datum.asInt2()) {
+      case INT2: {
+        short another = datum.asInt2();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt2() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case INT4:
-        if (val < datum.asInt4()) {
+      }
+      case INT4: {
+        int another = datum.asInt4();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt4() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case INT8:
-        if (val < datum.asInt8()) {
+      }
+      case INT8: {
+        long another = datum.asInt8();
+        if (val < another) {
           return -1;
-        } else if (datum.asInt8() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case FLOAT4:
-        if (val < datum.asFloat4()) {
+      }
+      case FLOAT4: {
+        float another = datum.asFloat4();
+        if (val < another) {
           return -1;
-        } else if (datum.asFloat4() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
-      case FLOAT8:
-        if (val < datum.asFloat8()) {
+      }
+      case FLOAT8:{
+        double another = datum.asFloat8();
+        if (val < another) {
           return -1;
-        } else if (datum.asFloat8() < val) {
+        } else if (val > another) {
           return 1;
         } else {
           return 0;
         }
+      }
+      case NULL:
+        return -1;
       default:
-        throw new InvalidOperationException(datum.type());
+        throw new InvalidOperationException();
     }
   }
 
@@ -201,6 +220,8 @@ public class Int8Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val + datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val + datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -219,6 +240,8 @@ public class Int8Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val - datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val - datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -237,6 +260,8 @@ public class Int8Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val * datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val * datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -255,6 +280,8 @@ public class Int8Datum extends Datum implements NumericDatum {
       return DatumFactory.createFloat8(val / datum.asFloat4());
     case FLOAT8:
       return DatumFactory.createFloat8(val / datum.asFloat8());
+    case NULL:
+      return datum;
     default:
       throw new InvalidOperationException(datum.type());
     }
@@ -273,6 +300,8 @@ public class Int8Datum extends Datum implements NumericDatum {
         return DatumFactory.createFloat8(val % datum.asFloat4());
       case FLOAT8:
         return DatumFactory.createFloat8(val % datum.asFloat8());
+      case NULL:
+        return datum;
       default:
         throw new InvalidOperationException(datum.type());
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/NullDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/NullDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/NullDatum.java
index 10c35a9..2cce626 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/NullDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/NullDatum.java
@@ -104,4 +104,4 @@ public class NullDatum extends Datum {
   public int hashCode() {
     return 0; // one of the prime number
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
index bab99a2..a155f1a 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.datum;
 
+import com.google.common.primitives.UnsignedBytes;
 import com.google.gson.annotations.Expose;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.tajo.common.TajoDataTypes;
@@ -26,6 +27,9 @@ import org.apache.tajo.datum.exception.InvalidOperationException;
 
 import java.util.Arrays;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 public class TextDatum extends Datum {
   @Expose private int size;
   @Expose private byte [] bytes;
@@ -102,11 +106,14 @@ public class TextDatum extends Datum {
   public int compareTo(Datum datum) {
     switch (datum.type()) {
       case TEXT:
-        byte[] o = datum.asByteArray();
-        return WritableComparator.compareBytes(this.bytes, 0, this.bytes.length,
-            o, 0, o.length);
+      case CHAR:
+      case BLOB:
+        return UnsignedBytes.lexicographicalComparator().compare(bytes, datum.asByteArray());
+
+      case NULL:
+        return -1;
       default:
-        throw new InvalidOperationException(datum.type());
+        throw new InvalidOperationException();
     }
   }
 
@@ -114,7 +121,7 @@ public class TextDatum extends Datum {
   public boolean equals(Object obj) {
     if (obj instanceof TextDatum) {
       TextDatum o = (TextDatum) obj;
-      return Arrays.equals(this.bytes, o.bytes);
+      return UnsignedBytes.lexicographicalComparator().compare(this.bytes, o.bytes) == 0;
     }
 
     return false;
@@ -124,10 +131,14 @@ public class TextDatum extends Datum {
   public BooleanDatum equalsTo(Datum datum) {
     switch (datum.type()) {
       case TEXT:
-        return DatumFactory.createBool(
-            Arrays.equals(this.bytes, datum.asByteArray()));
+      case CHAR:
+      case BLOB:
+        return DatumFactory.createBool(UnsignedBytes.lexicographicalComparator()
+            .compare(bytes, datum.asByteArray()) == 0);
+      case NULL:
+        return DatumFactory.createBool(false);
       default:
-        throw new InvalidOperationException(datum.type());
+        throw new InvalidOperationException();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 8110a3b..c6e4695 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -57,6 +57,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
   protected final TajoConf conf;
   protected final AbstractStorageManager sm;
 
+  final long threshold = 1048576 * 128; // 64MB
+
 
   public PhysicalPlannerImpl(final TajoConf conf, final AbstractStorageManager sm) {
     this.conf = conf;
@@ -194,15 +196,19 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
     switch (joinNode.getJoinType()) {
       case CROSS:
-        LOG.info("The planner chooses [Nested Loop Join]");
         return createCrossJoinPlan(context, joinNode, leftExec, rightExec);
 
       case INNER:
         return createInnerJoinPlan(context, joinNode, leftExec, rightExec);
 
-      case FULL_OUTER:
       case LEFT_OUTER:
+        return createLeftOuterJoinPlan(context, joinNode, leftExec, rightExec);
+
       case RIGHT_OUTER:
+        return createRightOuterJoinPlan(context, joinNode, leftExec, rightExec);
+
+      case FULL_OUTER:
+        return createFullOuterJoinPlan(context, joinNode, leftExec, rightExec);
 
       case LEFT_SEMI:
       case RIGHT_SEMI:
@@ -215,6 +221,171 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     }
   }
 
+  private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                           PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+      switch (algorithm) {
+        case IN_MEMORY_HASH_JOIN:
+          LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
+          return new LeftOuterHashJoinExec(context, plan, leftExec, rightExec);
+        case NESTED_LOOP_JOIN:
+          //the right operand is too large, so we opt for NL implementation of left outer join
+          LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
+          return new LeftOuterNLJoinExec(context, plan, leftExec, rightExec);
+        default:
+          LOG.error("Invalid Left Outer Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
+          return new LeftOuterHashJoinExec(context, plan, leftExec, rightExec);
+      }
+    } else {
+      return createBestLeftOuterJoinPlan(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                                   PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+    long rightTableVolume = estimateSizeRecursive(context, rightLineage);
+
+    if (rightTableVolume < threshold) {
+      // we can implement left outer join using hash join, using the right operand as the build relation
+      LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
+      return new LeftOuterHashJoinExec(context, plan, leftExec, rightExec);
+    }
+    else {
+      //the right operand is too large, so we opt for NL implementation of left outer join
+      LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
+      return new LeftOuterNLJoinExec(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private PhysicalExec createBestRightJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    //if the left operand is small enough => implement it as a left outer hash join with exchanged operators (note:
+    // blocking, but merge join is blocking as well)
+    String [] outerLineage4 = PlannerUtil.getLineage(plan.getLeftChild());
+    long outerSize4 = estimateSizeRecursive(context, outerLineage4);
+    if (outerSize4 < threshold){
+      LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
+      return new LeftOuterHashJoinExec(context, plan, rightExec, leftExec);
+    } else {
+      return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private PhysicalExec createRightOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                                     PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    //the left operand is too large, so opt for merge join implementation
+    LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Merge Join].");
+    SortSpec[][] sortSpecs2 = PlannerUtil.getSortKeysFromJoinQual(
+        plan.getJoinQual(), leftExec.getSchema(), rightExec.getSchema());
+    ExternalSortExec outerSort2 = new ExternalSortExec(context, sm,
+        new SortNode(UNGENERATED_PID,sortSpecs2[0], leftExec.getSchema(), leftExec.getSchema()), leftExec);
+    ExternalSortExec innerSort2 = new ExternalSortExec(context, sm,
+        new SortNode(UNGENERATED_PID,sortSpecs2[1], rightExec.getSchema(), rightExec.getSchema()), rightExec);
+    return new RightOuterMergeJoinExec(context, plan, outerSort2, innerSort2, sortSpecs2[0], sortSpecs2[1]);
+  }
+
+  private PhysicalExec createRightOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+      switch (algorithm) {
+        case IN_MEMORY_HASH_JOIN:
+          LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
+          return new LeftOuterHashJoinExec(context, plan, rightExec, leftExec);
+        case MERGE_JOIN:
+          return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+        default:
+          LOG.error("Invalid Right Outer Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback merge join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
+          return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+      }
+    } else {
+      return createBestRightJoinPlan(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private PhysicalExec createFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                              PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+      switch (algorithm) {
+        case IN_MEMORY_HASH_JOIN:
+          return createFullOuterHashJoinPlan(context, plan, leftExec, rightExec);
+
+        case MERGE_JOIN:
+          return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+
+        default:
+          LOG.error("Invalid Full Outer Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback merge join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
+          return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+      }
+    } else {
+      return createBestFullOuterJoinPlan(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private FullOuterHashJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                                            PhysicalExec leftExec, PhysicalExec rightExec)
+      throws IOException {
+    String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
+    String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+    long outerSize2 = estimateSizeRecursive(context, leftLineage);
+    long innerSize2 = estimateSizeRecursive(context, rightLineage);
+
+    PhysicalExec selectedRight;
+    PhysicalExec selectedLeft;
+
+    // HashJoinExec loads the smaller relation to memory.
+    if (outerSize2 <= innerSize2) {
+      selectedLeft = leftExec;
+      selectedRight = rightExec;
+    } else {
+      selectedLeft = rightExec;
+      selectedRight = leftExec;
+    }
+    LOG.info("Full Outer Join (" + plan.getPID() +") chooses [Hash Join]");
+    return new FullOuterHashJoinExec(context, plan, selectedRight, selectedLeft);
+  }
+
+  private FullOuterMergeJoinExec createFullOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                                              PhysicalExec leftExec, PhysicalExec rightExec)
+      throws IOException {
+    // if size too large, full outer merge join implementation
+    LOG.info("Full Outer Join (" + plan.getPID() +") chooses [Merge Join]");
+    SortSpec[][] sortSpecs3 = PlannerUtil.getSortKeysFromJoinQual(plan.getJoinQual(),
+        leftExec.getSchema(), rightExec.getSchema());
+    ExternalSortExec outerSort3 = new ExternalSortExec(context, sm,
+        new SortNode(UNGENERATED_PID,sortSpecs3[0], leftExec.getSchema(), leftExec.getSchema()), leftExec);
+    ExternalSortExec innerSort3 = new ExternalSortExec(context, sm,
+        new SortNode(UNGENERATED_PID,sortSpecs3[1], rightExec.getSchema(), rightExec.getSchema()), rightExec);
+
+    return new FullOuterMergeJoinExec(context, plan, outerSort3, innerSort3, sortSpecs3[0], sortSpecs3[1]);
+  }
+
+  private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                                   PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
+    String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+    long outerSize2 = estimateSizeRecursive(context, leftLineage);
+    long innerSize2 = estimateSizeRecursive(context, rightLineage);
+    final long threshold = 1048576 * 128;
+    if (outerSize2 < threshold || innerSize2 < threshold) {
+      return createFullOuterHashJoinPlan(context, plan, leftExec, rightExec);
+    } else {
+      return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+    }
+  }
+
   private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode plan,
                                            PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     Enforcer enforcer = context.getEnforcer();
@@ -358,8 +529,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     return new StoreTableExec(ctx, plan, subOp);
   }
 
-  public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode)
-      throws IOException {
+  public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode) throws IOException {
     Preconditions.checkNotNull(ctx.getTable(scanNode.getCanonicalName()),
         "Error: There is no table matched to %s", scanNode.getCanonicalName() + "(" + scanNode.getTableName() + ")");
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java
new file mode 100644
index 0000000..aebfdb5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterHashJoinExec.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.*;
+
+
+public class FullOuterHashJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  protected JoinNode plan;
+  protected EvalNode joinQual;
+
+  protected List<Column[]> joinKeyPairs;
+
+  // temporal tuples and states for nested loop join
+  protected boolean first = true;
+  protected FrameTuple frameTuple;
+  protected Tuple outTuple = null;
+  protected Map<Tuple, List<Tuple>> tupleSlots;
+  protected Iterator<Tuple> iterator = null;
+  protected EvalContext qualCtx;
+  protected Tuple leftTuple;
+  protected Tuple leftKeyTuple;
+
+  protected int [] leftKeyList;
+  protected int [] rightKeyList;
+
+  protected boolean finished = false;
+  protected boolean shouldGetLeftTuple = true;
+
+  // projection
+  protected final Projector projector;
+  protected final EvalContext [] evalContexts;
+
+  private int rightNumCols;
+  private int leftNumCols;
+  private Map<Tuple, Boolean> matched;
+
+  public FullOuterHashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
+                               PhysicalExec inner) {
+    super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
+        plan.getOutSchema(), outer, inner);
+    this.plan = plan;
+    this.joinQual = plan.getJoinQual();
+    this.qualCtx = joinQual.newContext();
+    this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
+
+    // this hashmap mirrors the evolution of the tupleSlots, with the same keys. For each join key,
+    // we have a boolean flag, initially false (whether this join key had at least one match on the left operand)
+    this.matched = new HashMap<Tuple, Boolean>(10000);
+
+    this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual,
+        outer.getSchema(), inner.getSchema());
+
+    leftKeyList = new int[joinKeyPairs.size()];
+    rightKeyList = new int[joinKeyPairs.size()];
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
+    }
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+    }
+
+    // for projection
+    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+    this.evalContexts = projector.renew();
+
+    // for join
+    frameTuple = new FrameTuple();
+    outTuple = new VTuple(outSchema.getColumnNum());
+    leftKeyTuple = new VTuple(leftKeyList.length);
+
+    leftNumCols = outer.getSchema().getColumnNum();
+    rightNumCols = inner.getSchema().getColumnNum();
+  }
+
+  protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
+    for (int i = 0; i < leftKeyList.length; i++) {
+      keyTuple.put(i, outerTuple.get(leftKeyList[i]));
+    }
+  }
+
+  public Tuple getNextUnmatchedRight() {
+
+    List<Tuple> newValue;
+    Tuple returnedTuple;
+    // get a keyTUple from the matched hashmap with a boolean false value
+    for(Tuple aKeyTuple : matched.keySet()) {
+      if(matched.get(aKeyTuple) == false) {
+        newValue = tupleSlots.get(aKeyTuple);
+        returnedTuple = newValue.remove(0);
+        tupleSlots.put(aKeyTuple, newValue);
+
+        // after taking the last element from the list in tupleSlots, set flag true in matched as well
+        if(newValue.isEmpty()){
+          matched.put(aKeyTuple, true);
+        }
+
+        return returnedTuple;
+      }
+    }
+    return null;
+  }
+
+  public Tuple next() throws IOException {
+    if (first) {
+      loadRightToHashTable();
+    }
+
+    Tuple rightTuple;
+    boolean found = false;
+
+    while(!finished) {
+      if (shouldGetLeftTuple) { // initially, it is true.
+        // getting new outer
+        leftTuple = leftChild.next(); // it comes from a disk
+        if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+          // in this stage we can begin outputing tuples from the right operand (which were before in tupleSlots) null padded on the left side
+          Tuple unmatchedRightTuple = getNextUnmatchedRight();
+          if( unmatchedRightTuple == null) {
+            finished = true;
+            outTuple = null;
+            return null;
+          } else {
+            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+            frameTuple.set(nullPaddedTuple, unmatchedRightTuple);
+            projector.eval(evalContexts, frameTuple);
+            projector.terminate(evalContexts, outTuple);
+
+            return outTuple;
+          }
+        }
+
+        // getting corresponding right
+        getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
+        if (tupleSlots.containsKey(leftKeyTuple)) { // finds right tuples on in-memory hash table.
+          iterator = tupleSlots.get(leftKeyTuple).iterator();
+          shouldGetLeftTuple = false;
+        } else {
+          //this left tuple doesn't have a match on the right.But full outer join => we should keep it anyway
+          //output a tuple with the nulls padded rightTuple
+          Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+          frameTuple.set(leftTuple, nullPaddedTuple);
+          projector.eval(evalContexts, frameTuple);
+          projector.terminate(evalContexts, outTuple);
+          // we simulate we found a match, which is exactly the null padded one
+          shouldGetLeftTuple = true;
+          return outTuple;
+        }
+      }
+
+      // getting a next right tuple on in-memory hash table.
+      rightTuple = iterator.next();
+      frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
+      joinQual.eval(qualCtx, inSchema, frameTuple); //?? isn't it always true if hash function is identity function??
+      if (joinQual.terminate(qualCtx).asBool()) { // if both tuples are joinable
+        projector.eval(evalContexts, frameTuple);
+        projector.terminate(evalContexts, outTuple);
+        found = true;
+        getKeyLeftTuple(leftTuple, leftKeyTuple);
+        matched.put(leftKeyTuple, true);
+      }
+
+      if (!iterator.hasNext()) { // no more right tuples for this hash key
+        shouldGetLeftTuple = true;
+      }
+
+      if (found) {
+        break;
+      }
+    }
+    return outTuple;
+  }
+
+  protected void loadRightToHashTable() throws IOException {
+    Tuple tuple;
+    Tuple keyTuple;
+
+    while ((tuple = rightChild.next()) != null) {
+      keyTuple = new VTuple(joinKeyPairs.size());
+      List<Tuple> newValue;
+      for (int i = 0; i < rightKeyList.length; i++) {
+        keyTuple.put(i, tuple.get(rightKeyList[i]));
+      }
+
+      if (tupleSlots.containsKey(keyTuple)) {
+        newValue = tupleSlots.get(keyTuple);
+        newValue.add(tuple);
+        tupleSlots.put(keyTuple, newValue);
+      } else {
+        newValue = new ArrayList<Tuple>();
+        newValue.add(tuple);
+        tupleSlots.put(keyTuple, newValue);
+        matched.put(keyTuple,false);
+      }
+    }
+    first = false;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+
+    tupleSlots.clear();
+    first = true;
+
+    finished = false;
+    iterator = null;
+    shouldGetLeftTuple = true;
+  }
+
+  public void close() throws IOException {
+    tupleSlots.clear();
+  }
+
+  public JoinNode getPlan() {
+    return this.plan;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java
new file mode 100644
index 0000000..2c11fea
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/FullOuterMergeJoinExec.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class FullOuterMergeJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  private JoinNode joinNode;
+  private EvalNode joinQual;
+  private EvalContext qualCtx;
+
+  // temporal tuples and states for nested loop join
+  private FrameTuple frameTuple;
+  private Tuple leftTuple = null;
+  private Tuple rightTuple = null;
+  private Tuple outTuple = null;
+  private Tuple leftNext = null;
+
+  private final List<Tuple> leftTupleSlots;
+  private final List<Tuple> rightTupleSlots;
+
+  private JoinTupleComparator joincomparator = null;
+  private TupleComparator[] tupleComparator = null;
+
+  private final static int INITIAL_TUPLE_SLOT = 10000;
+
+  private boolean end = false;
+
+  // projection
+  private final Projector projector;
+  private final EvalContext [] evalContexts;
+
+  private int rightNumCols;
+  private int leftNumCols;
+  private int posRightTupleSlots = -1;
+  private int posLeftTupleSlots = -1;
+  boolean endInPopulationStage = false;
+  private boolean initRightDone = false;
+
+  public FullOuterMergeJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
+                                PhysicalExec rightChild, SortSpec[] leftSortKey, SortSpec[] rightSortKey) {
+    super(context, plan.getInSchema(), plan.getOutSchema(), leftChild, rightChild);
+    Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " +
+        "but there is no join condition");
+    this.joinNode = plan;
+    this.joinQual = plan.getJoinQual();
+    this.qualCtx = this.joinQual.newContext();
+
+    this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+    this.rightTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+    SortSpec[][] sortSpecs = new SortSpec[2][];
+    sortSpecs[0] = leftSortKey;
+    sortSpecs[1] = rightSortKey;
+
+    this.joincomparator = new JoinTupleComparator(leftChild.getSchema(),
+        rightChild.getSchema(), sortSpecs);
+    this.tupleComparator = PlannerUtil.getComparatorsFromJoinQual(
+        plan.getJoinQual(), leftChild.getSchema(), rightChild.getSchema());
+
+    // for projection
+    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+    this.evalContexts = projector.renew();
+
+    // for join
+    frameTuple = new FrameTuple();
+    outTuple = new VTuple(outSchema.getColumnNum());
+
+    leftNumCols = leftChild.getSchema().getColumnNum();
+    rightNumCols = rightChild.getSchema().getColumnNum();
+  }
+
+  public JoinNode getPlan(){
+    return this.joinNode;
+  }
+
+  public Tuple next() throws IOException {
+    Tuple previous;
+
+    for (;;) {
+      boolean newRound = false;
+      if((posRightTupleSlots == -1) && (posLeftTupleSlots == -1)) {
+        newRound = true;
+      }
+      if ((posRightTupleSlots == rightTupleSlots.size()) && (posLeftTupleSlots == leftTupleSlots.size())) {
+        newRound = true;
+      }
+
+      if(newRound == true){
+
+        if (end) {
+
+          ////////////////////////////////////////////////////////////////////////
+          // FINALIZING STAGE
+          ////////////////////////////////////////////////////////////////////////
+          // the finalizing stage, where remaining tuples on the right are
+          // transformed into left-padded results while tuples on the left
+          // are transformed into right-padded results
+
+          // before exit, a left-padded tuple should be built for all remaining
+          // right side and a right-padded tuple should be built for all remaining
+          // left side
+
+          if (initRightDone == false) {
+            // maybe the left operand was empty => the right one didn't have the chance to initialize
+            rightTuple = rightChild.next();
+            initRightDone = true;
+          }
+
+          if((leftTuple == null) && (rightTuple == null)) {
+            return null;
+          }
+
+          if((leftTuple == null) && (rightTuple != null)){
+            // output a tuple with the nulls padded leftTuple
+            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+            frameTuple.set(nullPaddedTuple, rightTuple);
+            projector.eval(evalContexts, frameTuple);
+            projector.terminate(evalContexts, outTuple);
+            // we simulate we found a match, which is exactly the null padded one
+            rightTuple = rightChild.next();
+            return outTuple;
+          }
+
+          if((leftTuple != null) && (rightTuple == null)){
+            // output a tuple with the nulls padded leftTuple
+            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+            frameTuple.set(leftTuple, nullPaddedTuple);
+            projector.eval(evalContexts, frameTuple);
+            projector.terminate(evalContexts, outTuple);
+            // we simulate we found a match, which is exactly the null padded one
+            leftTuple = leftChild.next();
+            return outTuple;
+          }
+        } // if end
+
+        ////////////////////////////////////////////////////////////////////////
+        // INITIALIZING STAGE
+        ////////////////////////////////////////////////////////////////////////
+        // initializing stage, reading the first tuple on each side
+        if (leftTuple == null) {
+          leftTuple = leftChild.next();
+          if( leftTuple == null){
+            end = true;
+            continue;
+          }
+        }
+        if (rightTuple == null) {
+          rightTuple = rightChild.next();
+          initRightDone = true;
+          if (rightTuple == null) {
+            end = true;
+            continue;
+          }
+        }
+
+        // reset tuple slots for a new round
+        leftTupleSlots.clear();
+        rightTupleSlots.clear();
+        posRightTupleSlots = -1;
+        posLeftTupleSlots = -1;
+
+        ////////////////////////////////////////////////////////////////////////
+        // Comparison and Move Forward Stage
+        ////////////////////////////////////////////////////////////////////////
+        // advance alternatively on each side until a match is found
+        int cmp;
+        while (!end && ((cmp = joincomparator.compare(leftTuple, rightTuple)) != 0)) {
+
+          if (cmp > 0) {
+
+            //before getting a new tuple from the right,  a leftnullpadded tuple should be built
+            //output a tuple with the nulls padded leftTuple
+            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+            frameTuple.set(nullPaddedTuple, rightTuple);
+            projector.eval(evalContexts, frameTuple);
+            projector.terminate(evalContexts, outTuple);
+            // BEFORE RETURN, MOVE FORWARD
+            rightTuple = rightChild.next();
+            if(rightTuple == null) {
+              end = true;
+            }
+
+            return outTuple;
+
+          } else if (cmp < 0) {
+            // before getting a new tuple from the left,  a rightnullpadded tuple should be built
+            // output a tuple with the nulls padded rightTuple
+            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+            frameTuple.set(leftTuple, nullPaddedTuple);
+            projector.eval(evalContexts, frameTuple);
+            projector.terminate(evalContexts, outTuple);
+            // we simulate we found a match, which is exactly the null padded one
+            // BEFORE RETURN, MOVE FORWARD
+            leftTuple = leftChild.next();
+            if(leftTuple == null) {
+              end = true;
+            }
+
+            return outTuple;
+
+          } // if (cmp < 0)
+        } //while
+
+
+        ////////////////////////////////////////////////////////////////////////
+        // SLOTS POPULATION STAGE
+        ////////////////////////////////////////////////////////////////////////
+        // once a match is found, retain all tuples with this key in tuple slots
+        // on each side
+        if(!end) {
+          endInPopulationStage = false;
+
+          boolean endLeft = false;
+          boolean endRight = false;
+
+          previous = new VTuple(leftTuple);
+          do {
+            leftTupleSlots.add(new VTuple(leftTuple));
+            leftTuple = leftChild.next();
+            if(leftTuple == null) {
+              endLeft = true;
+            }
+
+
+          } while ((endLeft != true) && (tupleComparator[0].compare(previous, leftTuple) == 0));
+          posLeftTupleSlots = 0;
+
+
+          previous = new VTuple(rightTuple);
+          do {
+            rightTupleSlots.add(new VTuple(rightTuple));
+            rightTuple = rightChild.next();
+            if(rightTuple == null) {
+              endRight = true;
+            }
+
+          } while ((endRight != true) && (tupleComparator[1].compare(previous, rightTuple) == 0) );
+          posRightTupleSlots = 0;
+
+          if ((endLeft == true) || (endRight == true)) {
+            end = true;
+            endInPopulationStage = true;
+          }
+
+        } // if end false
+      } // if newRound
+
+
+      ////////////////////////////////////////////////////////////////////////
+      // RESULTS STAGE
+      ////////////////////////////////////////////////////////////////////////
+      // now output result matching tuples from the slots
+      // if either we haven't reached end on neither side, or we did reach end
+      // on one(or both) sides but that happened in the slots population step
+      // (i.e. refers to next round)
+      if(!end || (end && endInPopulationStage)){
+        if(posLeftTupleSlots == 0){
+          leftNext = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+          posLeftTupleSlots = posLeftTupleSlots + 1;
+        }
+
+        if(posRightTupleSlots <= (rightTupleSlots.size() -1)) {
+          Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
+          posRightTupleSlots = posRightTupleSlots + 1;
+          frameTuple.set(leftNext, aTuple);
+          joinQual.eval(qualCtx, inSchema, frameTuple);
+          projector.eval(evalContexts, frameTuple);
+          projector.terminate(evalContexts, outTuple);
+          return outTuple;
+        } else {
+          // right (inner) slots reached end and should be rewind if there are still tuples in the outer slots
+          if(posLeftTupleSlots <= (leftTupleSlots.size()-1)) {
+            //rewind the right slots position
+            posRightTupleSlots = 0;
+            Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
+            posRightTupleSlots = posRightTupleSlots + 1;
+            leftNext = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+            posLeftTupleSlots = posLeftTupleSlots + 1;
+
+            frameTuple.set(leftNext, aTuple);
+            joinQual.eval(qualCtx, inSchema, frameTuple);
+            projector.eval(evalContexts, frameTuple);
+            projector.terminate(evalContexts, outTuple);
+            return outTuple;
+          }
+        }
+      } // the second if end false
+    } // for
+  }
+
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+    leftTupleSlots.clear();
+    rightTupleSlots.clear();
+    posRightTupleSlots = -1;
+    posLeftTupleSlots = -1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterHashJoinExec.java
new file mode 100644
index 0000000..bbf39dd
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterHashJoinExec.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+import java.util.*;
+import org.apache.tajo.datum.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+public class LeftOuterHashJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  protected JoinNode plan;
+  protected EvalNode joinQual;
+
+  protected List<Column[]> joinKeyPairs;
+
+  // temporal tuples and states for nested loop join
+  protected boolean first = true;
+  protected FrameTuple frameTuple;
+  protected Tuple outTuple = null;
+  protected Map<Tuple, List<Tuple>> tupleSlots;
+  protected Iterator<Tuple> iterator = null;
+  protected EvalContext qualCtx;
+  protected Tuple leftTuple;
+  protected Tuple leftKeyTuple;
+
+  protected int [] leftKeyList;
+  protected int [] rightKeyList;
+
+  protected boolean finished = false;
+  protected boolean shouldGetLeftTuple = true;
+
+  // projection
+  protected final Projector projector;
+  protected final EvalContext [] evalContexts;
+
+  private int rightNumCols;
+  private int leftNumCols;
+  private static final Log LOG = LogFactory.getLog(LeftOuterHashJoinExec.class);
+
+  public LeftOuterHashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
+                               PhysicalExec rightChild) {
+    super(context, SchemaUtil.merge(leftChild.getSchema(), rightChild.getSchema()),
+        plan.getOutSchema(), leftChild, rightChild);
+    this.plan = plan;
+    this.joinQual = plan.getJoinQual();
+    this.qualCtx = joinQual.newContext();
+    this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
+
+    this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftChild.getSchema(), rightChild.getSchema());
+
+    leftKeyList = new int[joinKeyPairs.size()];
+    rightKeyList = new int[joinKeyPairs.size()];
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      leftKeyList[i] = leftChild.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
+    }
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      rightKeyList[i] = rightChild.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+    }
+
+    // for projection
+    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+    this.evalContexts = projector.renew();
+
+    // for join
+    frameTuple = new FrameTuple();
+    outTuple = new VTuple(outSchema.getColumnNum());
+    leftKeyTuple = new VTuple(leftKeyList.length);
+
+    leftNumCols = leftChild.getSchema().getColumnNum();
+    rightNumCols = rightChild.getSchema().getColumnNum();
+  }
+
+  protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
+    for (int i = 0; i < leftKeyList.length; i++) {
+      keyTuple.put(i, outerTuple.get(leftKeyList[i]));
+    }
+  }
+
+  public Tuple next() throws IOException {
+    if (first) {
+      loadRightToHashTable();
+    }
+
+    Tuple rightTuple;
+    boolean found = false;
+
+    while(!finished) {
+
+      if (shouldGetLeftTuple) { // initially, it is true.
+        // getting new outer
+        leftTuple = leftChild.next(); // it comes from a disk
+        if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+          finished = true;
+          return null;
+        }
+
+        // getting corresponding right
+        getKeyLeftTuple(leftTuple, leftKeyTuple); // get a left key tuple
+        if (tupleSlots.containsKey(leftKeyTuple)) { // finds right tuples on in-memory hash table.
+          iterator = tupleSlots.get(leftKeyTuple).iterator();
+          shouldGetLeftTuple = false;
+        } else {
+          // this left tuple doesn't have a match on the right, and output a tuple with the nulls padded rightTuple
+          Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+          frameTuple.set(leftTuple, nullPaddedTuple);
+          projector.eval(evalContexts, frameTuple);
+          projector.terminate(evalContexts, outTuple);
+          // we simulate we found a match, which is exactly the null padded one
+          shouldGetLeftTuple = true;
+          return outTuple;
+        }
+      }
+
+      // getting a next right tuple on in-memory hash table.
+      rightTuple = iterator.next();
+      frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
+      joinQual.eval(qualCtx, inSchema, frameTuple);
+      if (joinQual.terminate(qualCtx).asBool()) { // if both tuples are joinable
+        projector.eval(evalContexts, frameTuple);
+        projector.terminate(evalContexts, outTuple);
+        found = true;
+      }
+
+      if (!iterator.hasNext()) { // no more right tuples for this hash key
+        shouldGetLeftTuple = true;
+      }
+
+      if (found) {
+        break;
+      }
+    }
+
+    return outTuple;
+  }
+
+  protected void loadRightToHashTable() throws IOException {
+    Tuple tuple;
+    Tuple keyTuple;
+
+    while ((tuple = rightChild.next()) != null) {
+      keyTuple = new VTuple(joinKeyPairs.size());
+      List<Tuple> newValue;
+      for (int i = 0; i < rightKeyList.length; i++) {
+        keyTuple.put(i, tuple.get(rightKeyList[i]));
+      }
+
+      if (tupleSlots.containsKey(keyTuple)) {
+        newValue = tupleSlots.get(keyTuple);
+        newValue.add(tuple);
+        tupleSlots.put(keyTuple, newValue);
+      } else {
+        newValue = new ArrayList<Tuple>();
+        newValue.add(tuple);
+        tupleSlots.put(keyTuple, newValue);
+      }
+    }
+    first = false;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+
+    tupleSlots.clear();
+    first = true;
+
+    finished = false;
+    iterator = null;
+    shouldGetLeftTuple = true;
+  }
+
+  public void close() throws IOException {
+    tupleSlots.clear();
+  }
+
+  public JoinNode getPlan() {
+    return this.plan;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3e6d684a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterNLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterNLJoinExec.java
new file mode 100644
index 0000000..cc7b331
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/LeftOuterNLJoinExec.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+
+public class LeftOuterNLJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  private JoinNode plan;
+  private EvalNode joinQual;
+
+  // temporal tuples and states for nested loop join
+  private boolean needNextRightTuple;
+  private FrameTuple frameTuple;
+  private Tuple leftTuple = null;
+  private Tuple rightTuple = null;
+  private Tuple outTuple = null;
+  private EvalContext qualCtx;
+
+  // projection
+  private final EvalContext [] evalContexts;
+  private final Projector projector;
+
+  private boolean foundAtLeastOneMatch;
+  private int rightNumCols;
+
+  public LeftOuterNLJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
+                             PhysicalExec rightChild) {
+    super(context, plan.getInSchema(), plan.getOutSchema(), leftChild, rightChild);
+    this.plan = plan;
+
+    if (plan.hasJoinQual()) {
+      this.joinQual = plan.getJoinQual();
+      this.qualCtx = this.joinQual.newContext();
+    }
+
+    // for projection
+    projector = new Projector(inSchema, outSchema, plan.getTargets());
+    evalContexts = projector.renew();
+
+    // for join
+    needNextRightTuple = true;
+    frameTuple = new FrameTuple();
+    outTuple = new VTuple(outSchema.getColumnNum());
+
+    foundAtLeastOneMatch = false;
+    rightNumCols = rightChild.getSchema().getColumnNum();
+  }
+
+  public JoinNode getPlan() {
+    return this.plan;
+  }
+
+  public Tuple next() throws IOException {
+    for (;;) {
+      if (needNextRightTuple) {
+        leftTuple = leftChild.next();
+        if (leftTuple == null) {
+          return null;
+        }
+        needNextRightTuple = false;
+        // a new tuple from the left child has initially no matches on the right operand
+        foundAtLeastOneMatch = false;
+      }
+      rightTuple = rightChild.next();
+
+      if (rightTuple == null) {
+        // the scan of the right operand is finished with no matches found
+        if(foundAtLeastOneMatch == false){
+          //output a tuple with the nulls padded rightTuple
+          Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+          frameTuple.set(leftTuple, nullPaddedTuple);
+          projector.eval(evalContexts, frameTuple);
+          projector.terminate(evalContexts, outTuple);
+          // we simulate we found a match, which is exactly the null padded one
+          foundAtLeastOneMatch = true;
+          needNextRightTuple = true;
+          rightChild.rescan();
+          return outTuple;
+        } else {
+          needNextRightTuple = true;
+          rightChild.rescan();
+          continue;
+        }
+      }
+
+      frameTuple.set(leftTuple, rightTuple);
+      joinQual.eval(qualCtx, inSchema, frameTuple);
+      if (joinQual.terminate(qualCtx).asBool()) {
+        projector.eval(evalContexts, frameTuple);
+        projector.terminate(evalContexts, outTuple);
+        foundAtLeastOneMatch = true;
+        return outTuple;
+      }
+    }
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+    needNextRightTuple = true;
+  }
+}


Mime
View raw message