tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [6/6] git commit: TAJO-907: Implement off-heap tuple block and zero-copy tuple.
Date Tue, 16 Sep 2014 11:06:13 GMT
TAJO-907: Implement off-heap tuple block and zero-copy tuple.

Closes #133


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/e6e2a6b7
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/e6e2a6b7
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/e6e2a6b7

Branch: refs/heads/block_iteration
Commit: e6e2a6b75ea57658011dd6e035e9ba6b7302be54
Parents: 15450e8
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Tue Sep 16 19:46:29 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Tue Sep 16 19:46:42 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   7 +
 .../org/apache/tajo/catalog/SchemaUtil.java     | 103 ++++
 .../src/main/proto/CatalogProtos.proto          |  15 +-
 .../org/apache/tajo/jdbc/TajoResultSetBase.java |   2 +-
 .../java/org/apache/tajo/datum/DateDatum.java   |   2 +-
 .../main/java/org/apache/tajo/datum/Datum.java  |   4 +-
 .../org/apache/tajo/datum/IntervalDatum.java    |   2 +-
 .../java/org/apache/tajo/datum/TextDatum.java   |   6 +-
 .../java/org/apache/tajo/datum/TimeDatum.java   |  17 +-
 .../org/apache/tajo/util/Deallocatable.java     |  23 +
 .../java/org/apache/tajo/util/FileUtil.java     |  16 +-
 .../main/java/org/apache/tajo/util/SizeOf.java  | 159 +++++
 .../org/apache/tajo/util/UnsafeComparer.java    | 160 +++++
 .../java/org/apache/tajo/util/UnsafeUtil.java   |  73 +++
 tajo-core/pom.xml                               |   8 +
 .../tajo/engine/codegen/CodeGenUtils.java       |  15 +
 .../tajo/engine/codegen/CompilationError.java   |   5 +
 .../tajo/engine/codegen/EvalCodeGenContext.java |   2 +-
 .../tajo/engine/codegen/EvalCodeGenerator.java  |  94 +--
 .../engine/codegen/ExecutorPreCompiler.java     |  67 ++-
 .../engine/codegen/TajoGeneratorAdapter.java    | 110 +++-
 .../apache/tajo/engine/codegen/TupleCmp.java    |  31 +
 .../engine/codegen/TupleComparerCompiler.java   | 318 ++++++++++
 .../tajo/engine/function/builtin/AvgDouble.java |   2 +-
 .../tajo/engine/function/builtin/AvgLong.java   |   2 +-
 .../tajo/engine/function/builtin/Coalesce.java  |   2 +-
 .../engine/function/builtin/CountValue.java     |   3 +-
 .../function/builtin/SumDoubleDistinct.java     |   3 +-
 .../function/builtin/SumFloatDistinct.java      |   3 +-
 .../engine/function/builtin/SumIntDistinct.java |   3 +-
 .../function/builtin/SumLongDistinct.java       |   3 +-
 .../function/datetime/DatePartFromDate.java     |   4 +-
 .../function/datetime/DatePartFromTime.java     |   5 +-
 .../datetime/DatePartFromTimestamp.java         |   4 +-
 .../datetime/DateTimePartFromUnixTimestamp.java |   6 +-
 .../function/datetime/ToTimestampInt.java       |   2 +-
 .../function/geoip/GeoIPCountryInet4.java       |   2 +-
 .../engine/function/geoip/GeoIPCountryText.java |   2 +-
 .../function/geoip/GeoIPInCountryInet4.java     |   2 +-
 .../function/geoip/GeoIPInCountryText.java      |   2 +-
 .../apache/tajo/engine/function/math/Ceil.java  |   2 +-
 .../apache/tajo/engine/function/math/Cos.java   |   2 +-
 .../tajo/engine/function/math/Degrees.java      |   2 +-
 .../apache/tajo/engine/function/math/Div.java   |   2 +-
 .../apache/tajo/engine/function/math/Exp.java   |   2 +-
 .../apache/tajo/engine/function/math/Floor.java |   2 +-
 .../apache/tajo/engine/function/math/Mod.java   |   2 +-
 .../apache/tajo/engine/function/math/Pow.java   |   2 +-
 .../tajo/engine/function/math/Radians.java      |   2 +-
 .../apache/tajo/engine/function/math/Round.java |   2 +-
 .../tajo/engine/function/math/RoundFloat8.java  |   2 +-
 .../apache/tajo/engine/function/math/Sign.java  |   2 +-
 .../apache/tajo/engine/function/math/Sin.java   |   2 +-
 .../apache/tajo/engine/function/math/Sqrt.java  |   2 +-
 .../apache/tajo/engine/function/math/Tan.java   |   2 +-
 .../tajo/engine/function/string/FindInSet.java  |   2 +-
 .../tajo/engine/function/string/InitCap.java    |   4 +-
 .../tajo/engine/function/string/LTrim.java      |   4 +-
 .../tajo/engine/function/string/Left.java       |   8 +-
 .../tajo/engine/function/string/Length.java     |   2 +-
 .../tajo/engine/function/string/Locate.java     |   4 +-
 .../tajo/engine/function/string/Lower.java      |   4 +-
 .../tajo/engine/function/string/Lpad.java       |   8 +-
 .../apache/tajo/engine/function/string/Md5.java |   4 +-
 .../engine/function/string/OctetLength.java     |   2 +-
 .../tajo/engine/function/string/QuoteIdent.java |   2 +-
 .../tajo/engine/function/string/RTrim.java      |   4 +-
 .../engine/function/string/RegexpReplace.java   |   6 +-
 .../tajo/engine/function/string/Repeat.java     |   4 +-
 .../tajo/engine/function/string/Reverse.java    |   4 +-
 .../tajo/engine/function/string/Right.java      |   8 +-
 .../tajo/engine/function/string/Rpad.java       |   8 +-
 .../tajo/engine/function/string/StrPos.java     |   2 +-
 .../tajo/engine/function/string/StrPosb.java    |   4 +-
 .../tajo/engine/function/string/Substr.java     |   2 +-
 .../tajo/engine/function/string/ToBin.java      |   2 +-
 .../tajo/engine/function/string/ToHex.java      |   2 +-
 .../apache/tajo/engine/parser/SQLAnalyzer.java  |   1 +
 .../tajo/engine/planner/ExprAnnotator.java      |   2 +-
 .../engine/planner/LogicalPlanPreprocessor.java |   2 +-
 .../tajo/engine/planner/LogicalPlanner.java     |   2 +-
 .../engine/planner/PhysicalPlannerImpl.java     |   6 +-
 .../apache/tajo/engine/planner/PlannerUtil.java |  13 +-
 .../engine/planner/UniformRangePartition.java   |   3 +-
 .../tajo/engine/planner/global/DataChannel.java |   2 +-
 .../engine/planner/global/GlobalPlanner.java    |   6 +-
 .../tajo/engine/planner/logical/ScanNode.java   |   2 +-
 .../planner/logical/TableSubQueryNode.java      |   2 +-
 .../join/GreedyHeuristicJoinOrderAlgorithm.java |   2 +-
 .../planner/physical/BSTIndexScanExec.java      |   2 +-
 .../planner/physical/ColPartitionStoreExec.java |   5 +-
 .../planner/physical/ExternalSortExec.java      | 345 ++---------
 .../planner/physical/HashFullOuterJoinExec.java |   2 +-
 .../engine/planner/physical/HashJoinExec.java   |   2 +-
 .../planner/physical/HashLeftOuterJoinExec.java |   2 +-
 .../planner/physical/JoinTupleComparator.java   |   6 +-
 .../planner/physical/MemTableScanner.java       | 128 ++++
 .../physical/MergeFullOuterJoinExec.java        |   4 +-
 .../engine/planner/physical/MergeJoinExec.java  |   4 +-
 .../engine/planner/physical/PairWiseMerger.java | 250 ++++++++
 .../physical/RangeShuffleFileWriteExec.java     |   4 +-
 .../physical/RightOuterMergeJoinExec.java       |   4 +-
 .../engine/planner/physical/SeqScanExec.java    |   2 +-
 .../tajo/engine/planner/physical/SortExec.java  |  11 +-
 .../engine/planner/physical/StoreTableExec.java |   5 +-
 .../engine/planner/physical/WindowAggExec.java  |   8 +-
 .../planner/rewrite/ProjectionPushDownRule.java |   2 +-
 .../apache/tajo/engine/utils/SchemaUtil.java    |  88 ---
 .../worker/ExecutionBlockSharedResource.java    |  37 +-
 .../tajo/worker/RangeRetrieverHandler.java      |   6 +-
 .../main/java/org/apache/tajo/worker/Task.java  |  11 +-
 .../apache/tajo/worker/TaskAttemptContext.java  |  12 +-
 .../codegen/TestTupleComparerCompiler.java      | 352 +++++++++++
 .../apache/tajo/engine/eval/ExprTestBase.java   |   2 +-
 .../tajo/engine/planner/TestPlannerUtil.java    |   8 +-
 .../planner/TestUniformRangePartition.java      |   4 +-
 .../planner/physical/TestBSTIndexExec.java      |   4 +-
 .../planner/physical/TestExternalSortExec.java  | 126 ++--
 .../planner/physical/TestPairWiseMerger.java    | 316 ++++++++++
 .../planner/physical/TestPhysicalPlanner.java   |   2 +-
 .../physical/TestProgressExternalSortExec.java  |   4 +-
 .../engine/planner/physical/TestSortExec.java   |   2 +-
 .../apache/tajo/engine/util/TestTupleUtil.java  |   2 +-
 .../tajo/worker/TestRangeRetrieverHandler.java  |   4 +-
 tajo-dist/pom.xml                               |   1 +
 .../org/apache/tajo/jdbc/MetaDataTuple.java     |  17 +-
 tajo-project/pom.xml                            |   7 +
 .../tajo/storage/BaseTupleComparator.java       | 205 +++++++
 .../storage/BinarySerializerDeserializer.java   |   2 +-
 .../org/apache/tajo/storage/FrameTuple.java     |  14 +-
 .../java/org/apache/tajo/storage/LazyTuple.java |  13 +-
 .../org/apache/tajo/storage/MemoryUtil.java     | 163 ------
 .../java/org/apache/tajo/storage/RawFile.java   |   7 +-
 .../org/apache/tajo/storage/RowStoreUtil.java   | 202 +++++--
 .../org/apache/tajo/storage/StorageUtil.java    |  12 -
 .../apache/tajo/storage/TableStatistics.java    |   8 +-
 .../storage/TextSerializerDeserializer.java     |   2 +-
 .../java/org/apache/tajo/storage/Tuple.java     |   8 +-
 .../apache/tajo/storage/TupleComparator.java    | 164 +-----
 .../org/apache/tajo/storage/TupleRange.java     |   4 +-
 .../java/org/apache/tajo/storage/VTuple.java    |  56 +-
 .../apache/tajo/storage/avro/AvroAppender.java  |  17 +-
 .../apache/tajo/storage/index/IndexMethod.java  |   6 +-
 .../apache/tajo/storage/index/bst/BSTIndex.java |  22 +-
 .../storage/rawfile/DirectRawFileScanner.java   | 205 +++++++
 .../storage/rawfile/DirectRawFileWriter.java    | 182 ++++++
 .../org/apache/tajo/tuple/BaseTupleBuilder.java | 110 ++++
 .../org/apache/tajo/tuple/RowBlockReader.java   |  33 ++
 .../org/apache/tajo/tuple/TupleBuilder.java     |  26 +
 .../tajo/tuple/offheap/DirectBufTuple.java      |  41 ++
 .../tajo/tuple/offheap/FixedSizeLimitSpec.java  |  32 +
 .../apache/tajo/tuple/offheap/HeapTuple.java    | 269 +++++++++
 .../tajo/tuple/offheap/OffHeapMemory.java       | 102 ++++
 .../tajo/tuple/offheap/OffHeapRowBlock.java     | 176 ++++++
 .../tuple/offheap/OffHeapRowBlockReader.java    |  63 ++
 .../tuple/offheap/OffHeapRowBlockUtils.java     |  54 ++
 .../tuple/offheap/OffHeapRowBlockWriter.java    |  58 ++
 .../tajo/tuple/offheap/OffHeapRowWriter.java    | 230 ++++++++
 .../tajo/tuple/offheap/ResizableLimitSpec.java  | 142 +++++
 .../apache/tajo/tuple/offheap/RowWriter.java    |  73 +++
 .../apache/tajo/tuple/offheap/UnSafeTuple.java  | 308 ++++++++++
 .../offheap/UnSafeTupleBytesComparator.java     |  99 ++++
 .../tajo/tuple/offheap/ZeroCopyTuple.java       |  35 ++
 tajo-storage/src/main/proto/IndexProtos.proto   |   4 +-
 .../src/main/resources/storage-default.xml      |  16 +-
 .../tajo/storage/TestTupleComparator.java       |   2 +-
 .../apache/tajo/storage/index/TestBSTIndex.java | 151 ++---
 .../index/TestSingleCSVFileBSTIndex.java        |   4 +-
 .../tajo/storage/raw/TestDirectRawFile.java     | 318 ++++++++++
 .../apache/tajo/storage/v2/TestStorages.java    |   2 +-
 .../apache/tajo/tuple/TestBaseTupleBuilder.java |  76 +++
 .../tajo/tuple/offheap/TestHeapTuple.java       |  45 ++
 .../tajo/tuple/offheap/TestOffHeapRowBlock.java | 577 +++++++++++++++++++
 .../tajo/tuple/offheap/TestResizableSpec.java   |  59 ++
 .../src/test/resources/log4j.properties         |  28 +
 .../src/test/resources/storage-default.xml      |  16 +-
 tajo-thirdparty/asm/pom.xml                     |   2 +-
 .../tajo/pullserver/PullServerAuxService.java   |   4 +-
 .../tajo/pullserver/TajoPullServerService.java  |   4 +-
 .../java/org/apache/tajo/storage/Tuple.java     |   5 +
 180 files changed, 6450 insertions(+), 1249 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 61fcec0..4257e59 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,5 +1,12 @@
 Tajo Change Log 
 
+Block Iteration - branch
+
+  IMPROVEMENT
+    
+    TAJO-907: Implement off-heap tuple block and zero-copy tuple. 
+    (hyunsik)
+
 Release 0.9.0 - unreleased
 
   NEW FEATURES 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java
new file mode 100644
index 0000000..ee670ef
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+import static org.apache.tajo.common.TajoDataTypes.Type;
+
+public class SchemaUtil {
+  // See TAJO-914 bug.
+  //
+  // Its essential problem is that constant value is evaluated multiple times at each scan.
+  // As a result, join nodes can take the child nodes which have the same named fields.
+  // Because current schema does not allow the same name and ignore the duplicated schema,
+  // it finally causes the in-out schema mismatch between the parent and child nodes.
+  //
+  // tmpColumnSeq is a hack to avoid the above problem by keeping duplicated constant values as different name fields.
+  // The essential solution would be https://issues.apache.org/jira/browse/TAJO-895.
+  static int tmpColumnSeq = 0;
+  public static Schema merge(Schema left, Schema right) {
+    Schema merged = new Schema();
+    for(Column col : left.getColumns()) {
+      if (!merged.containsByQualifiedName(col.getQualifiedName())) {
+        merged.addColumn(col);
+      }
+    }
+    for(Column col : right.getColumns()) {
+      if (merged.containsByQualifiedName(col.getQualifiedName())) {
+        merged.addColumn("?fake" + (tmpColumnSeq++), col.getDataType());
+      } else {
+        merged.addColumn(col);
+      }
+    }
+
+    // if overflow
+    if (tmpColumnSeq < 0) {
+      tmpColumnSeq = 0;
+    }
+    return merged;
+  }
+
+  /**
+   * Get common columns to be used as join keys of natural joins.
+   */
+  public static Schema getNaturalJoinColumns(Schema left, Schema right) {
+    Schema common = new Schema();
+    for (Column outer : left.getColumns()) {
+      if (!common.containsByName(outer.getSimpleName()) && right.containsByName(outer.getSimpleName())) {
+        common.addColumn(new Column(outer.getSimpleName(), outer.getDataType()));
+      }
+    }
+    
+    return common;
+  }
+
+  public static Schema getQualifiedLogicalSchema(TableDesc tableDesc, String tableName) {
+    Schema logicalSchema = new Schema(tableDesc.getLogicalSchema());
+    if (tableName != null) {
+      logicalSchema.setQualifier(tableName);
+    }
+    return logicalSchema;
+  }
+
+  public static <T extends Schema> T clone(Schema schema) {
+    try {
+      T copy = (T) schema.clone();
+      return copy;
+    } catch (CloneNotSupportedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static DataType [] toDataTypes(Schema schema) {
+    DataType[] types = new DataType[schema.size()];
+    for (int i = 0; i < schema.size(); i++) {
+      types[i] = schema.getColumn(i).getDataType();
+    }
+    return types;
+  }
+
+  public static Type [] toTypes(Schema schema) {
+    Type [] types = new Type[schema.size()];
+    for (int i = 0; i < schema.size(); i++) {
+      types[i] = schema.getColumn(i).getDataType().getType();
+    }
+    return types;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index 367d0b8..c45e1ef 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -29,13 +29,14 @@ enum StoreType {
   MEM = 0;
   CSV = 1;
   RAW = 2;
-  RCFILE = 3;
-  ROWFILE = 4;
-  HCFILE = 5;
-  TREVNI = 6;
-  PARQUET = 7;
-  SEQUENCEFILE = 8;
-  AVRO = 9;
+  DIRECTRAW = 3;
+  RCFILE = 4;
+  ROWFILE = 5;
+  HCFILE = 6;
+  TREVNI = 7;
+  PARQUET = 8;
+  SEQUENCEFILE = 9;
+  AVRO = 10;
 }
 
 enum OrderType {

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
index d189c78..331d743 100644
--- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
@@ -51,7 +51,7 @@ public abstract class TajoResultSetBase implements ResultSet {
   }
 
   private void handleNull(Datum d) {
-    wasNull = (d instanceof NullDatum);
+    wasNull = d.isNull();
   }
 
   public Tuple getCurrentTuple() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-common/src/main/java/org/apache/tajo/datum/DateDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/DateDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/DateDatum.java
index b26ef84..02f045c 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/DateDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/DateDatum.java
@@ -251,7 +251,7 @@ public class DateDatum extends Datum {
       }
 
       return (dayOfMonth < another.dayOfMonth) ? -1 : ((dayOfMonth == another.dayOfMonth) ? 0 : 1);
-    } else if (datum instanceof NullDatum || datum.isNull()) {
+    } else if (datum.isNull()) {
       return -1;
     } else {
       throw new InvalidOperationException(datum.type());

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java
index 442db71..c37ce55 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java
@@ -155,7 +155,7 @@ public abstract class Datum implements Comparable<Datum>, GsonObject {
   }
 
   public Datum equalsTo(Datum datum) {
-    if (this instanceof NullDatum || datum instanceof NullDatum) {
+    if (this.isNull() || datum.isNull()) {
       return NullDatum.get();
     } else {
       return DatumFactory.createBool(compareTo(datum) == 0);
@@ -163,7 +163,7 @@ public abstract class Datum implements Comparable<Datum>, GsonObject {
   }
 
   public Datum notEqualsTo(Datum datum) {
-    if (this instanceof NullDatum || datum instanceof NullDatum) {
+    if (this.isNull() || datum.isNull()) {
       return NullDatum.get();
     } else {
       return DatumFactory.createBool(compareTo(datum) != 0);

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-common/src/main/java/org/apache/tajo/datum/IntervalDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/IntervalDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/IntervalDatum.java
index c6f3922..d06fddf 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/IntervalDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/IntervalDatum.java
@@ -408,7 +408,7 @@ public class IntervalDatum extends Datum {
       } else {
         return 0;
       }
-    } else if (datum instanceof NullDatum || datum.isNull()) {
+    } else if (datum.isNull()) {
       return -1;
     } else {
       throw new InvalidOperationException(datum.type());

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
index ca76ed2..9a9b37e 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
@@ -30,7 +30,7 @@ import java.nio.charset.Charset;
 import java.util.Comparator;
 
 public class TextDatum extends Datum {
-  static Charset defaultCharset = Charset.forName("UTF-8");
+  public static Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
 
   @Expose private final int size;
   /* encoded in UTF-8 */
@@ -47,7 +47,7 @@ public class TextDatum extends Datum {
   }
 
   public TextDatum(String string) {
-    this(string.getBytes(defaultCharset));
+    this(string.getBytes(DEFAULT_CHARSET));
   }
 
   @Override
@@ -92,7 +92,7 @@ public class TextDatum extends Datum {
 
   @Override
   public String asChars() {
-    return new String(this.bytes, defaultCharset);
+    return new String(this.bytes, DEFAULT_CHARSET);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-common/src/main/java/org/apache/tajo/datum/TimeDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/TimeDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/TimeDatum.java
index 37e5e78..4b1eb4f 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/TimeDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/TimeDatum.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.datum;
 
-import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.exception.InvalidCastException;
 import org.apache.tajo.exception.InvalidOperationException;
 import org.apache.tajo.util.Bytes;
@@ -28,12 +27,14 @@ import org.apache.tajo.util.datetime.TimeMeta;
 
 import java.util.TimeZone;
 
+import static org.apache.tajo.common.TajoDataTypes.Type;
+
 public class TimeDatum extends Datum {
   public static final int SIZE = 8;
   private final long time;
 
   public TimeDatum(long time) {
-    super(TajoDataTypes.Type.TIME);
+    super(Type.TIME);
     this.time = time;
   }
 
@@ -71,7 +72,7 @@ public class TimeDatum extends Datum {
 
   @Override
   public int asInt4() {
-    throw new InvalidCastException();
+    throw new InvalidCastException(Type.TIME, Type.INT4);
   }
 
   @Override
@@ -81,12 +82,12 @@ public class TimeDatum extends Datum {
 
   @Override
   public float asFloat4() {
-    throw new InvalidCastException();
+    throw new InvalidCastException(Type.TIME, Type.FLOAT4);
   }
 
   @Override
   public double asFloat8() {
-    throw new InvalidCastException();
+    throw new InvalidCastException(Type.TIME, Type.FLOAT8);
   }
 
   @Override
@@ -162,7 +163,7 @@ public class TimeDatum extends Datum {
 
   @Override
   public Datum equalsTo(Datum datum) {
-    if (datum.type() == TajoDataTypes.Type.TIME) {
+    if (datum.type() == Type.TIME) {
       return DatumFactory.createBool(time == (((TimeDatum) datum).time));
     } else if (datum.isNull()) {
       return datum;
@@ -173,10 +174,10 @@ public class TimeDatum extends Datum {
 
   @Override
   public int compareTo(Datum datum) {
-    if (datum.type() == TajoDataTypes.Type.TIME) {
+    if (datum.type() == Type.TIME) {
       TimeDatum another = (TimeDatum)datum;
       return (time < another.time) ? -1 : ((time == another.time) ? 0 : 1);
-    } else if (datum instanceof NullDatum || datum.isNull()) {
+    } else if (datum.isNull()) {
       return -1;
     } else {
       throw new InvalidOperationException(datum.type());

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-common/src/main/java/org/apache/tajo/util/Deallocatable.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/Deallocatable.java b/tajo-common/src/main/java/org/apache/tajo/util/Deallocatable.java
new file mode 100644
index 0000000..8202a4b
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/util/Deallocatable.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+public interface Deallocatable {
+  void release();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
index 9aa6af9..18083e2 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
@@ -127,7 +127,19 @@ public class FileUtil {
     return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre);
   }
 
-  public static boolean isLocalPath(Path path) {
-    return path.toUri().getScheme().equals("file");
+  /**
+   * Get file extension
+   *
+   * @param name FileName
+   * @return File Extension
+   */
+  public static String getExtension(String name) {
+    int lastDelimiterIdx = name.lastIndexOf(".");
+    if (lastDelimiterIdx > -1) {
+      // plus one means skipping a delimiter.
+      return name.substring(lastDelimiterIdx + 1, name.length());
+    } else {
+      return "";
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-common/src/main/java/org/apache/tajo/util/SizeOf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/SizeOf.java b/tajo-common/src/main/java/org/apache/tajo/util/SizeOf.java
new file mode 100644
index 0000000..5e10deb
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/util/SizeOf.java
@@ -0,0 +1,159 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import static sun.misc.Unsafe.*;
+
+
+public final class SizeOf {
+
+  public static final int BITS_PER_BYTE = 8;
+  public static final int BYTES_PER_WORD = SizeOf.SIZE_OF_LONG;
+  public static final int BITS_PER_WORD = SizeOf.SIZE_OF_LONG * BITS_PER_BYTE;
+
+  public static final byte SIZE_OF_BOOL = 1;
+  public static final byte SIZE_OF_BYTE = 1;
+  public static final byte SIZE_OF_SHORT = 2;
+  public static final byte SIZE_OF_INT = 4;
+  public static final byte SIZE_OF_LONG = 8;
+  public static final byte SIZE_OF_FLOAT = 4;
+  public static final byte SIZE_OF_DOUBLE = 8;
+
+  public static long sizeOf(boolean[] array)
+  {
+    if (array == null) {
+      return 0;
+    }
+    return ARRAY_BOOLEAN_BASE_OFFSET + (((long) ARRAY_BOOLEAN_INDEX_SCALE) * array.length);
+  }
+
+  public static long sizeOf(byte[] array)
+  {
+    if (array == null) {
+      return 0;
+    }
+    return ARRAY_BYTE_BASE_OFFSET + (((long) ARRAY_BYTE_INDEX_SCALE) * array.length);
+  }
+
+  public static long sizeOf(short[] array)
+  {
+    if (array == null) {
+      return 0;
+    }
+    return ARRAY_SHORT_BASE_OFFSET + (((long) ARRAY_SHORT_INDEX_SCALE) * array.length);
+  }
+
+  public static long sizeOf(char[] array)
+  {
+    if (array == null) {
+      return 0;
+    }
+    return ARRAY_CHAR_BASE_OFFSET + (((long) ARRAY_CHAR_INDEX_SCALE) * array.length);
+  }
+
+  public static long sizeOf(int[] array)
+  {
+    if (array == null) {
+      return 0;
+    }
+    return ARRAY_INT_BASE_OFFSET + (((long) ARRAY_INT_INDEX_SCALE) * array.length);
+  }
+
+  public static long sizeOf(long[] array)
+  {
+    if (array == null) {
+      return 0;
+    }
+    return ARRAY_LONG_BASE_OFFSET + (((long) ARRAY_LONG_INDEX_SCALE) * array.length);
+  }
+
+  public static long sizeOf(float[] array)
+  {
+    if (array == null) {
+      return 0;
+    }
+    return ARRAY_FLOAT_BASE_OFFSET + (((long) ARRAY_FLOAT_INDEX_SCALE) * array.length);
+  }
+
+  public static long sizeOf(double[] array)
+  {
+    if (array == null) {
+      return 0;
+    }
+    return ARRAY_DOUBLE_BASE_OFFSET + (((long) ARRAY_DOUBLE_INDEX_SCALE) * array.length);
+  }
+
+  public static long sizeOf(Object[] array)
+  {
+    if (array == null) {
+      return 0;
+    }
+    return ARRAY_OBJECT_BASE_OFFSET + (((long) ARRAY_OBJECT_INDEX_SCALE) * array.length);
+  }
+
+
+  public static long sizeOfBooleanArray(int length)
+  {
+    return ARRAY_BOOLEAN_BASE_OFFSET + (((long) ARRAY_BOOLEAN_INDEX_SCALE) * length);
+  }
+
+  public static long sizeOfByteArray(int length)
+  {
+    return ARRAY_BYTE_BASE_OFFSET + (((long) ARRAY_BYTE_INDEX_SCALE) * length);
+  }
+
+  public static long sizeOfShortArray(int length)
+  {
+    return ARRAY_SHORT_BASE_OFFSET + (((long) ARRAY_SHORT_INDEX_SCALE) * length);
+  }
+
+  public static long sizeOfCharArray(int length)
+  {
+    return ARRAY_CHAR_BASE_OFFSET + (((long) ARRAY_CHAR_INDEX_SCALE) * length);
+  }
+
+  public static long sizeOfIntArray(int length)
+  {
+    return ARRAY_INT_BASE_OFFSET + (((long) ARRAY_INT_INDEX_SCALE) * length);
+  }
+
+  public static long sizeOfLongArray(int length)
+  {
+    return ARRAY_LONG_BASE_OFFSET + (((long) ARRAY_LONG_INDEX_SCALE) * length);
+  }
+
+  public static long sizeOfFloatArray(int length)
+  {
+    return ARRAY_FLOAT_BASE_OFFSET + (((long) ARRAY_FLOAT_INDEX_SCALE) * length);
+  }
+
+  public static long sizeOfDoubleArray(int length)
+  {
+    return ARRAY_DOUBLE_BASE_OFFSET + (((long) ARRAY_DOUBLE_INDEX_SCALE) * length);
+  }
+
+  public static long sizeOfObjectArray(int length)
+  {
+    return ARRAY_OBJECT_BASE_OFFSET + (((long) ARRAY_OBJECT_INDEX_SCALE) * length);
+  }
+
+  private SizeOf()
+  {
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-common/src/main/java/org/apache/tajo/util/UnsafeComparer.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/UnsafeComparer.java b/tajo-common/src/main/java/org/apache/tajo/util/UnsafeComparer.java
new file mode 100644
index 0000000..fec4e5a
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/util/UnsafeComparer.java
@@ -0,0 +1,160 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.UnsignedBytes;
+import com.google.common.primitives.UnsignedLongs;
+import sun.misc.Unsafe;
+
+import java.lang.reflect.Field;
+import java.nio.ByteOrder;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Comparator;
+
+/**
+ * This is borrowed from Guava's UnsignedBytes.
+ */
+public class UnsafeComparer implements Comparator<byte[]> {
+
+  public static final UnsafeComparer INSTANCE;
+
+  static {
+    INSTANCE = new UnsafeComparer();
+  }
+
+  public Comparator<byte []> get() {
+    return INSTANCE;
+  }
+
+  private UnsafeComparer() {}
+
+  static final boolean littleEndian =
+      ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
+
+      /*
+       * The following static final fields exist for performance reasons.
+       *
+       * In UnsignedBytesBenchmark, accessing the following objects via static
+       * final fields is the fastest (more than twice as fast as the Java
+       * implementation, vs ~1.5x with non-final static fields, on x86_32)
+       * under the Hotspot server compiler. The reason is obviously that the
+       * non-final fields need to be reloaded inside the loop.
+       *
+       * And, no, defining (final or not) local variables out of the loop still
+       * isn't as good because the null check on the theUnsafe object remains
+       * inside the loop and BYTE_ARRAY_BASE_OFFSET doesn't get
+       * constant-folded.
+       *
+       * The compiler can treat static final fields as compile-time constants
+       * and can constant-fold them while (final or not) local variables are
+       * run time values.
+       */
+
+  static final Unsafe theUnsafe;
+
+  /** The offset to the first element in a byte array. */
+  static final int BYTE_ARRAY_BASE_OFFSET;
+
+  static {
+    theUnsafe = (Unsafe) AccessController.doPrivileged(
+        new PrivilegedAction<Object>() {
+          @Override
+          public Object run() {
+            try {
+              Field f = Unsafe.class.getDeclaredField("theUnsafe");
+              f.setAccessible(true);
+              return f.get(null);
+            } catch (NoSuchFieldException e) {
+              // It doesn't matter what we throw;
+              // it's swallowed in getBestComparator().
+              throw new Error();
+            } catch (IllegalAccessException e) {
+              throw new Error();
+            }
+          }
+        });
+
+    BYTE_ARRAY_BASE_OFFSET = theUnsafe.arrayBaseOffset(byte[].class);
+
+    // sanity check - this should never fail
+    if (theUnsafe.arrayIndexScale(byte[].class) != 1) {
+      throw new AssertionError();
+    }
+  }
+
+  @SuppressWarnings("unused")
+  public static int compareStatic(byte[] left, byte[] right) {
+    return INSTANCE.compare(left, right);
+  }
+
+  @Override public int compare(byte[] left, byte[] right) {
+    int minLength = Math.min(left.length, right.length);
+    int minWords = minLength / Longs.BYTES;
+
+        /*
+         * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a
+         * time is no slower than comparing 4 bytes at a time even on 32-bit.
+         * On the other hand, it is substantially faster on 64-bit.
+         */
+    for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
+      long lw = theUnsafe.getLong(left, BYTE_ARRAY_BASE_OFFSET + (long) i);
+      long rw = theUnsafe.getLong(right, BYTE_ARRAY_BASE_OFFSET + (long) i);
+      long diff = lw ^ rw;
+
+      if (diff != 0) {
+        if (!littleEndian) {
+          return UnsignedLongs.compare(lw, rw);
+        }
+
+        // Use binary search
+        int n = 0;
+        int y;
+        int x = (int) diff;
+        if (x == 0) {
+          x = (int) (diff >>> 32);
+          n = 32;
+        }
+
+        y = x << 16;
+        if (y == 0) {
+          n += 16;
+        } else {
+          x = y;
+        }
+
+        y = x << 8;
+        if (y == 0) {
+          n += 8;
+        }
+        return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
+      }
+    }
+
+    // The epilogue to cover the last (minLength % 8) elements.
+    for (int i = minWords * Longs.BYTES; i < minLength; i++) {
+      int result = UnsignedBytes.compare(left[i], right[i]);
+      if (result != 0) {
+        return result;
+      }
+    }
+    return left.length - right.length;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java
new file mode 100644
index 0000000..05c9fcc
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java
@@ -0,0 +1,73 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.util;
+
+import com.google.common.base.Preconditions;
+import sun.misc.Cleaner;
+import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+
+public class UnsafeUtil {
+  public static final Unsafe unsafe;
+
+  static {
+    Field field;
+    try {
+      field = Unsafe.class.getDeclaredField("theUnsafe");
+
+      field.setAccessible(true);
+      unsafe = (Unsafe) field.get(null);
+      if (unsafe == null) {
+        throw new RuntimeException("Unsafe access not available");
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static int alignedSize(int size) {
+    int remain = size % SizeOf.BYTES_PER_WORD;
+    if (remain > 0) {
+      return size + (SizeOf.BYTES_PER_WORD - remain);
+    } else {
+      return size;
+    }
+  }
+
+  public static long getAddress(ByteBuffer buffer) {
+    Preconditions.checkArgument(buffer instanceof DirectBuffer, "ByteBuffer must be DirectBuffer");
+    return ((DirectBuffer)buffer).address();
+  }
+
+  public static void free(Deallocatable obj) {
+    obj.release();
+  }
+
+  public static void free(ByteBuffer bb) {
+    Preconditions.checkNotNull(bb);
+    Preconditions.checkState(bb instanceof DirectBuffer);
+
+    Cleaner cleaner = ((DirectBuffer) bb).cleaner();
+    if (cleaner != null) {
+      cleaner.clean();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index d33b598..03a7348 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -243,6 +243,14 @@
     <dependency>
       <groupId>org.apache.tajo</groupId>
       <artifactId>tajo-storage</artifactId>
+      <type>jar</type>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-storage</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/codegen/CodeGenUtils.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/CodeGenUtils.java b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/CodeGenUtils.java
index 84a93fc..a2813f3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/CodeGenUtils.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/CodeGenUtils.java
@@ -19,12 +19,19 @@
 package org.apache.tajo.engine.codegen;
 
 import org.apache.tajo.org.objectweb.asm.ClassReader;
+import org.apache.tajo.org.objectweb.asm.MethodVisitor;
+import org.apache.tajo.org.objectweb.asm.Opcodes;
 import org.apache.tajo.org.objectweb.asm.util.ASMifier;
 import org.apache.tajo.org.objectweb.asm.util.TraceClassVisitor;
 
+import java.io.PrintStream;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 
+/**
+ * It includes utility methods, and some of them are only used in generated code.
+ * So, they appear to be not used in the project.
+ */
 public class CodeGenUtils {
   private static final int FLAGS = ClassReader.SKIP_DEBUG;
 
@@ -38,4 +45,12 @@ public class CodeGenUtils {
 
     return strWriter.toString();
   }
+
+  @SuppressWarnings("unused")
+  public static void emitPrintOut(MethodVisitor mv, String message) {
+    mv.visitFieldInsn(Opcodes.GETSTATIC, "java/lang/System", "out", "Ljava/io/PrintStream;");
+    mv.visitLdcInsn(message);
+    mv.visitMethodInsn(Opcodes.INVOKEVIRTUAL, TajoGeneratorAdapter.getInternalName(PrintStream.class),
+        "println", TajoGeneratorAdapter.getMethodDescription(void.class, new Class[]{String.class}));
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/codegen/CompilationError.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/CompilationError.java b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/CompilationError.java
index 27054de..325ed2d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/CompilationError.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/CompilationError.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.codegen;
 
 import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.storage.BaseTupleComparator;
 
 public class CompilationError extends RuntimeException {
   public CompilationError(String message) {
@@ -28,4 +29,8 @@ public class CompilationError extends RuntimeException {
   public CompilationError(EvalNode evalNode, Throwable t, byte [] clazz) {
     super("Compilation Error: " + evalNode.toString() + "\n\nBYTES CODE DUMP:\n" + CodeGenUtils.disassemble(clazz), t);
   }
+
+  public CompilationError(BaseTupleComparator comp, Throwable t, byte [] clazz) {
+    super("Compilation Error: " + comp.toString() + "\n\nBYTES CODE DUMP:\n" + CodeGenUtils.disassemble(clazz), t);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenContext.java
index 03c0402..1e51ba5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenContext.java
@@ -62,7 +62,7 @@ public class EvalCodeGenContext extends TajoGeneratorAdapter {
   }
 
   public void emitClassDefinition() {
-    classWriter.visit(Opcodes.V1_5, Opcodes.ACC_PUBLIC, this.owner, null,
+    classWriter.visit(Opcodes.V1_6, Opcodes.ACC_PUBLIC, this.owner, null,
         TajoGeneratorAdapter.getInternalName(EvalNode.class), null);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenerator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenerator.java b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenerator.java
index c57f923..2948dec 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenerator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/EvalCodeGenerator.java
@@ -24,19 +24,20 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.IntervalDatum;
-import org.apache.tajo.datum.ProtobufDatum;
 import org.apache.tajo.engine.eval.*;
 import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.org.objectweb.asm.*;
+import org.apache.tajo.org.objectweb.asm.ClassWriter;
+import org.apache.tajo.org.objectweb.asm.Label;
+import org.apache.tajo.org.objectweb.asm.Opcodes;
+import org.apache.tajo.org.objectweb.asm.Type;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 
-import java.io.PrintStream;
 import java.lang.reflect.Constructor;
 import java.util.Stack;
 
 import static org.apache.tajo.common.TajoDataTypes.DataType;
-import static org.apache.tajo.engine.codegen.TajoGeneratorAdapter.*;
+import static org.apache.tajo.engine.codegen.TajoGeneratorAdapter.getDescription;
 import static org.apache.tajo.engine.eval.FunctionEval.ParamType;
 
 public class EvalCodeGenerator extends SimpleEvalNodeVisitor<EvalCodeGenContext> {
@@ -96,12 +97,6 @@ public class EvalCodeGenerator extends SimpleEvalNodeVisitor<EvalCodeGenContext>
     return compiledEval;
   }
 
-  private void printOut(EvalCodeGenContext context, String message) {
-    context.methodvisitor.visitFieldInsn(Opcodes.GETSTATIC, "java/lang/System", "out", "Ljava/io/PrintStream;");
-    context.push(message);
-    context.invokeVirtual(PrintStream.class, "println", void.class, new Class[]{String.class});
-  }
-
   public EvalNode visitBinaryEval(EvalCodeGenContext context, Stack<EvalNode> stack, BinaryEval binaryEval) {
     if (EvalType.isLogicalOperator(binaryEval.getType())) {
       return visitAndOrEval(context, binaryEval, stack);
@@ -133,17 +128,17 @@ public class EvalCodeGenerator extends SimpleEvalNodeVisitor<EvalCodeGenContext>
     } else if (unary.getType() == EvalType.NOT) {
 
       visit(context, unary.getChild(), stack);
-      context.methodvisitor.visitVarInsn(Opcodes.ISTORE, 9);
-      context.methodvisitor.visitVarInsn(Opcodes.ISTORE, 10);
+      int nullFlagId = context.istore();
+      int valueId = context.istore();
 
       Label ifNull = new Label();
       Label endIf = new Label();
 
-      context.emitNullityCheck(ifNull, 9);
+      context.emitNullityCheck(ifNull, nullFlagId);
 
       context.methodvisitor.visitFieldInsn(Opcodes.GETSTATIC, Type.getInternalName(EvalCodeGenerator.class),
           "NOT_LOGIC", "[B");
-      context.methodvisitor.visitVarInsn(Opcodes.ILOAD, 10);
+      context.iload(valueId);
       context.methodvisitor.visitInsn(Opcodes.BALOAD);
       context.pushNullFlag(true);
       emitGotoLabel(context, endIf);
@@ -209,9 +204,9 @@ public class EvalCodeGenerator extends SimpleEvalNodeVisitor<EvalCodeGenContext>
     final int BEGIN_NULLFLAG = context.istore();
     final int BEGIN = context.store(begin.getValueType());
 
-    visit(context, end, stack);                                         // < end, right_nullflag
+    visit(context, end, stack);
     final int END_NULLFLAG = context.istore();
-    final int END = context.store(end.getValueType());                                // <
+    final int END = context.store(end.getValueType());
 
     stack.pop();
 
@@ -339,7 +334,6 @@ public class EvalCodeGenerator extends SimpleEvalNodeVisitor<EvalCodeGenContext>
     context.pop(srcType);
     context.pushDummyValue(targetType);
     context.pushNullFlag(false);
-    printOut(context, "endIfNull");
 
     emitLabel(context, afterEnd);
     return cast;
@@ -361,8 +355,7 @@ public class EvalCodeGenerator extends SimpleEvalNodeVisitor<EvalCodeGenContext>
       }
 
       context.methodvisitor.visitVarInsn(Opcodes.ALOAD, 2);
-      context.push(fieldIdx);
-      context.invokeInterface(Tuple.class, "isNull", boolean.class, new Class [] {int.class});
+      context.emitIsNullOfTuple(fieldIdx);
 
       context.push(true);
 
@@ -370,69 +363,8 @@ public class EvalCodeGenerator extends SimpleEvalNodeVisitor<EvalCodeGenContext>
       Label afterAll = new Label();
       context.methodvisitor.visitJumpInsn(Opcodes.IF_ICMPEQ, ifNull);
 
-      String methodName = null;
-      Class returnType = null;
-      Class [] paramTypes = null;
-      switch (field.getValueType().getType()) {
-      case BOOLEAN:
-        methodName = "getByte";
-        returnType = byte.class;
-        paramTypes = new Class[] {int.class};
-        break;
-      case CHAR: {
-        methodName = "getText";
-        returnType = String.class;
-        paramTypes = new Class[] {int.class};
-        break;
-      }
-      case INT1:
-      case INT2:
-      case INT4:
-      case DATE:
-      case INET4:
-        methodName = "getInt4";
-        returnType = int.class;
-        paramTypes = new Class [] {int.class};
-        break;
-      case INT8:
-      case TIMESTAMP:
-      case TIME:
-        methodName = "getInt8";
-        returnType = long.class;
-        paramTypes = new Class [] {int.class};
-        break;
-      case FLOAT4:
-        methodName = "getFloat4";
-        returnType = float.class;
-        paramTypes = new Class [] {int.class};
-        break;
-      case FLOAT8:
-        methodName = "getFloat8";
-        returnType = double.class;
-        paramTypes = new Class [] {int.class};
-        break;
-      case TEXT:
-        methodName = "getText";
-        returnType = String.class;
-        paramTypes = new Class [] {int.class};
-        break;
-      case INTERVAL:
-        methodName = "getInterval";
-        returnType = IntervalDatum.class;
-        paramTypes = new Class [] {int.class};
-        break;
-      case PROTOBUF:
-        methodName = "getProtobufDatum";
-        returnType = ProtobufDatum.class;
-        paramTypes = new Class [] {int.class};
-        break;
-      default:
-        throw new InvalidEvalException(field.getValueType() + " is not supported yet");
-      }
-
       context.methodvisitor.visitVarInsn(Opcodes.ALOAD, 2);
-      context.push(fieldIdx);
-      context.invokeInterface(Tuple.class, methodName, returnType, paramTypes);
+      context.emitGetValueOfTuple(columnRef.getDataType(), fieldIdx);
 
       context.pushNullFlag(true); // not null
       context.methodvisitor.visitJumpInsn(Opcodes.GOTO, afterAll);

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java
index 54d857b..3de0bf1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java
@@ -25,6 +25,8 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.storage.BaseTupleComparator;
+import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.util.Pair;
 
 import java.util.Collections;
@@ -42,45 +44,73 @@ public class ExecutorPreCompiler extends BasicLogicalPlanVisitor<ExecutorPreComp
 
   public static void compile(CompilationContext context, LogicalNode node) throws PlanningException {
     instance.visit(context, null, null, node, new Stack<LogicalNode>());
-    context.compiledEval = Collections.unmodifiableMap(context.compiledEval);
+    context.compiledEvals = Collections.unmodifiableMap(context.compiledEvals);
   }
 
   public static Map<Pair<Schema, EvalNode>, EvalNode> compile(TajoClassLoader classLoader, LogicalNode node)
       throws PlanningException {
     CompilationContext context = new CompilationContext(classLoader);
     instance.visit(context, null, null, node, new Stack<LogicalNode>());
-    return context.compiledEval;
+    return context.compiledEvals;
   }
 
   public static class CompilationContext {
-    private final EvalCodeGenerator compiler;
-    private Map<Pair<Schema,EvalNode>, EvalNode> compiledEval;
+    private final EvalCodeGenerator evalCompiler;
+    private final TupleComparerCompiler comparerCompiler;
+    private Map<Pair<Schema,EvalNode>, EvalNode> compiledEvals;
+    private Map<Pair<Schema,BaseTupleComparator>, TupleComparator> compiledComparators;
 
     public CompilationContext(TajoClassLoader classLoader) {
-      this.compiler = new EvalCodeGenerator(classLoader);
-      this.compiledEval = Maps.newHashMap();
+      this.evalCompiler = new EvalCodeGenerator(classLoader);
+      this.comparerCompiler = new TupleComparerCompiler(classLoader);
+      this.compiledEvals = Maps.newHashMap();
+      this.compiledComparators = Maps.newHashMap();
     }
 
-    public EvalCodeGenerator getCompiler() {
-      return compiler;
+    public EvalCodeGenerator getEvalCompiler() {
+      return evalCompiler;
+    }
+
+    public TupleComparerCompiler getComparatorCompiler() {
+      return comparerCompiler;
     }
 
     public Map<Pair<Schema, EvalNode>, EvalNode> getPrecompiedEvals() {
-      return compiledEval;
+      return compiledEvals;
+    }
+
+    public Map<Pair<Schema, BaseTupleComparator>, TupleComparator> getPrecompiedComparators() {
+      return compiledComparators;
     }
   }
 
   private static void compileIfAbsent(CompilationContext context, Schema schema, EvalNode eval) {
     Pair<Schema, EvalNode> key = new Pair<Schema, EvalNode>(schema, eval);
-    if (!context.compiledEval.containsKey(key)) {
+    if (!context.compiledEvals.containsKey(key)) {
+      try {
+        EvalNode compiled = context.evalCompiler.compile(schema, eval);
+        context.compiledEvals.put(key, compiled);
+
+      } catch (Throwable t) {
+        // If any compilation error occurs, it works in a fallback mode. This mode just uses EvalNode objects
+        // instead of a compiled EvalNode.
+        context.compiledEvals.put(key, eval);
+        LOG.warn(t);
+      }
+    }
+  }
+
+  private static void compileIfAbsent(CompilationContext context, Schema schema, BaseTupleComparator comparator) {
+    Pair<Schema, BaseTupleComparator> key = new Pair<Schema, BaseTupleComparator>(schema, comparator);
+    if (!context.compiledComparators.containsKey(key)) {
       try {
-        EvalNode compiled = context.compiler.compile(schema, eval);
-        context.compiledEval.put(key, compiled);
+        TupleComparator compiled = context.comparerCompiler.compile(comparator, false);
+        context.compiledComparators.put(key, compiled);
 
       } catch (Throwable t) {
         // If any compilation error occurs, it works in a fallback mode. This mode just uses EvalNode objects
         // instead of a compiled EvalNode.
-        context.compiledEval.put(key, eval);
+        context.compiledComparators.put(key, comparator);
         LOG.warn(t);
       }
     }
@@ -116,6 +146,17 @@ public class ExecutorPreCompiler extends BasicLogicalPlanVisitor<ExecutorPreComp
   }
 
   @Override
+  public LogicalNode visitSort(CompilationContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                               SortNode node, Stack<LogicalNode> stack) throws PlanningException {
+    super.visitSort(context, plan, block, node, stack);
+
+    BaseTupleComparator comparator = new BaseTupleComparator(node.getInSchema(), node.getSortKeys());
+    compileIfAbsent(context, node.getInSchema(), comparator);
+
+    return node;
+  }
+
+  @Override
   public LogicalNode visitHaving(CompilationContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                  HavingNode node, Stack<LogicalNode> stack) throws PlanningException {
     super.visitHaving(context, plan, block, node, stack);

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/codegen/TajoGeneratorAdapter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/TajoGeneratorAdapter.java b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/TajoGeneratorAdapter.java
index 6fac1a8..c1c4801 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/TajoGeneratorAdapter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/TajoGeneratorAdapter.java
@@ -20,12 +20,14 @@ package org.apache.tajo.engine.codegen;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.*;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.eval.EvalType;
 import org.apache.tajo.exception.InvalidCastException;
 import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.datetime.DateTimeUtil;
 import org.apache.tajo.org.objectweb.asm.Label;
@@ -51,6 +53,9 @@ class TajoGeneratorAdapter {
     TUtil.putToNestedMap(OpCodesMap, EvalType.PLUS, INT8, Opcodes.LADD);
     TUtil.putToNestedMap(OpCodesMap, EvalType.PLUS, FLOAT4, Opcodes.FADD);
     TUtil.putToNestedMap(OpCodesMap, EvalType.PLUS, FLOAT8, Opcodes.DADD);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.PLUS, TIMESTAMP, Opcodes.LADD);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.PLUS, DATE, Opcodes.IADD);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.PLUS, TIME, Opcodes.LADD);
 
     TUtil.putToNestedMap(OpCodesMap, EvalType.MINUS, INT1, Opcodes.ISUB);
     TUtil.putToNestedMap(OpCodesMap, EvalType.MINUS, INT2, Opcodes.ISUB);
@@ -58,6 +63,10 @@ class TajoGeneratorAdapter {
     TUtil.putToNestedMap(OpCodesMap, EvalType.MINUS, INT8, Opcodes.LSUB);
     TUtil.putToNestedMap(OpCodesMap, EvalType.MINUS, FLOAT4, Opcodes.FSUB);
     TUtil.putToNestedMap(OpCodesMap, EvalType.MINUS, FLOAT8, Opcodes.DSUB);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.MINUS, TIMESTAMP, Opcodes.LSUB);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.MINUS, DATE, Opcodes.ISUB);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.MINUS, TIME, Opcodes.LSUB);
+
 
     TUtil.putToNestedMap(OpCodesMap, EvalType.MULTIPLY, INT1, Opcodes.IMUL);
     TUtil.putToNestedMap(OpCodesMap, EvalType.MULTIPLY, INT2, Opcodes.IMUL);
@@ -102,6 +111,10 @@ class TajoGeneratorAdapter {
     TUtil.putToNestedMap(OpCodesMap, EvalType.EQUAL, FLOAT4, Opcodes.FCMPL);
     TUtil.putToNestedMap(OpCodesMap, EvalType.EQUAL, FLOAT8, Opcodes.DCMPG);
     TUtil.putToNestedMap(OpCodesMap, EvalType.EQUAL, TEXT, Opcodes.IF_ACMPNE);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.EQUAL, TIMESTAMP, Opcodes.LCMP);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.EQUAL, DATE, Opcodes.IF_ICMPEQ);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.EQUAL, TIME, Opcodes.LCMP);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.EQUAL, INET4, Opcodes.IF_ICMPEQ);
 
     TUtil.putToNestedMap(OpCodesMap, EvalType.NOT_EQUAL, INT1, Opcodes.IF_ICMPNE);
     TUtil.putToNestedMap(OpCodesMap, EvalType.NOT_EQUAL, INT2, Opcodes.IF_ICMPNE);
@@ -110,6 +123,10 @@ class TajoGeneratorAdapter {
     TUtil.putToNestedMap(OpCodesMap, EvalType.NOT_EQUAL, FLOAT4, Opcodes.FCMPL);
     TUtil.putToNestedMap(OpCodesMap, EvalType.NOT_EQUAL, FLOAT8, Opcodes.DCMPG);
     TUtil.putToNestedMap(OpCodesMap, EvalType.NOT_EQUAL, TEXT, Opcodes.IF_ACMPNE);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.NOT_EQUAL, TIMESTAMP, Opcodes.LCMP);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.NOT_EQUAL, DATE, Opcodes.IF_ICMPNE);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.NOT_EQUAL, TIME, Opcodes.LCMP);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.NOT_EQUAL, INET4, Opcodes.IF_ICMPNE);
 
     TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, INT1, Opcodes.IF_ICMPLT);
     TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, INT2, Opcodes.IF_ICMPLT);
@@ -117,13 +134,9 @@ class TajoGeneratorAdapter {
     TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, INT8, Opcodes.LCMP);
     TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, FLOAT4, Opcodes.FCMPL);
     TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, FLOAT8, Opcodes.DCMPG);
-
-    TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, INT1, Opcodes.IF_ICMPLT);
-    TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, INT2, Opcodes.IF_ICMPLT);
-    TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, INT4, Opcodes.IF_ICMPLT);
-    TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, INT8, Opcodes.LCMP);
-    TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, FLOAT4, Opcodes.FCMPL);
-    TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, FLOAT8, Opcodes.DCMPG);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, TIMESTAMP, Opcodes.LCMP);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, DATE, Opcodes.IF_ICMPLT);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, TIME, Opcodes.LCMP);
 
     TUtil.putToNestedMap(OpCodesMap, EvalType.LEQ, INT1, Opcodes.IF_ICMPLE);
     TUtil.putToNestedMap(OpCodesMap, EvalType.LEQ, INT2, Opcodes.IF_ICMPLE);
@@ -131,6 +144,9 @@ class TajoGeneratorAdapter {
     TUtil.putToNestedMap(OpCodesMap, EvalType.LEQ, INT8, Opcodes.LCMP);
     TUtil.putToNestedMap(OpCodesMap, EvalType.LEQ, FLOAT4, Opcodes.FCMPL);
     TUtil.putToNestedMap(OpCodesMap, EvalType.LEQ, FLOAT8, Opcodes.DCMPG);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, TIMESTAMP, Opcodes.LCMP);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, DATE, Opcodes.IF_ICMPLE);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, TIME, Opcodes.LCMP);
 
     TUtil.putToNestedMap(OpCodesMap, EvalType.GTH, INT1, Opcodes.IF_ICMPGT);
     TUtil.putToNestedMap(OpCodesMap, EvalType.GTH, INT2, Opcodes.IF_ICMPGT);
@@ -138,6 +154,9 @@ class TajoGeneratorAdapter {
     TUtil.putToNestedMap(OpCodesMap, EvalType.GTH, INT8, Opcodes.LCMP);
     TUtil.putToNestedMap(OpCodesMap, EvalType.GTH, FLOAT4, Opcodes.FCMPL);
     TUtil.putToNestedMap(OpCodesMap, EvalType.GTH, FLOAT8, Opcodes.DCMPG);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, TIMESTAMP, Opcodes.LCMP);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, DATE, Opcodes.IF_ICMPGT);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, TIME, Opcodes.LCMP);
 
     TUtil.putToNestedMap(OpCodesMap, EvalType.GEQ, INT1, Opcodes.IF_ICMPGE);
     TUtil.putToNestedMap(OpCodesMap, EvalType.GEQ, INT2, Opcodes.IF_ICMPGE);
@@ -145,6 +164,9 @@ class TajoGeneratorAdapter {
     TUtil.putToNestedMap(OpCodesMap, EvalType.GEQ, INT8, Opcodes.LCMP);
     TUtil.putToNestedMap(OpCodesMap, EvalType.GEQ, FLOAT4, Opcodes.FCMPL);
     TUtil.putToNestedMap(OpCodesMap, EvalType.GEQ, FLOAT8, Opcodes.DCMPG);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, TIMESTAMP, Opcodes.LCMP);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, DATE, Opcodes.IF_ICMPGE);
+    TUtil.putToNestedMap(OpCodesMap, EvalType.LTH, TIME, Opcodes.LCMP);
   }
 
   protected int access;
@@ -159,9 +181,79 @@ class TajoGeneratorAdapter {
     generatorAdapter = new GeneratorAdapter(methodVisitor, access, name, desc);
   }
 
+  public void emitIsNullOfTuple() {
+    emitIsNullOfTuple(null);
+  }
+
+  public void emitIsNullOfTuple(Integer fieldIndex) {
+    if (fieldIndex != null) {
+      push(fieldIndex);
+    }
+
+    invokeInterface(Tuple.class, "isNull", boolean.class, new Class[]{int.class});
+  }
+
+  public void emitIsNotNullOfTuple() {
+    emitIsNotNullOfTuple(null);
+  }
+
+  public void emitIsNotNullOfTuple(@Nullable Integer fieldIndex) {
+    if (fieldIndex != null) {
+      push(fieldIndex);
+    }
+    invokeInterface(Tuple.class, "isNotNull", boolean.class, new Class [] {int.class});
+  }
+
+  public void emitGetValueOfTuple(TajoDataTypes.DataType dataType, int fieldIndex) {
+    push(fieldIndex);
+
+    TajoDataTypes.Type type = dataType.getType();
+    switch (type) {
+    case BOOLEAN:
+      invokeInterface(Tuple.class, "getByte", byte.class, new Class[]{int.class});
+      break;
+    case INT1:
+    case INT2:
+      invokeInterface(Tuple.class, "getInt2", short.class, new Class[]{int.class});
+      break;
+    case INT4:
+    case DATE:
+    case INET4:
+      invokeInterface(Tuple.class, "getInt4", int.class, new Class[]{int.class});
+      break;
+    case INT8:
+    case TIMESTAMP:
+    case TIME:
+      invokeInterface(Tuple.class, "getInt8", long.class, new Class[]{int.class});
+      break;
+    case FLOAT4:
+      invokeInterface(Tuple.class, "getFloat4", float.class, new Class[]{int.class});
+      break;
+    case FLOAT8:
+      invokeInterface(Tuple.class, "getFloat8", double.class, new Class[]{int.class});
+      break;
+    case CHAR:
+    case TEXT:
+      invokeInterface(Tuple.class, "getText", String.class, new Class[]{int.class});
+      break;
+    case INTERVAL:
+      invokeInterface(Tuple.class, "getInterval", Datum.class, new Class[]{int.class});
+      break;
+    case PROTOBUF:
+      invokeInterface(Tuple.class, "getProtobufDatum", Datum.class, new Class[]{int.class});
+      break;
+    default:
+      throw new UnsupportedException("Unknown data type: " + type.name());
+    }
+  }
+
+  public MethodVisitor getMethodvisitor() {
+    return methodvisitor;
+  }
+
   public static boolean isJVMInternalInt(TajoDataTypes.DataType dataType) {
     TajoDataTypes.Type type = dataType.getType();
-    return type == BOOLEAN || type == INT1 || type == INT2 || type == INT4 || type== INET4;
+    return type == BOOLEAN || type == INT1 || type == INT2 || type == INT4 || type == DATE || type== INET4;
   }
 
   public static int getWordSize(TajoDataTypes.DataType type) {
@@ -290,6 +382,8 @@ class TajoGeneratorAdapter {
       methodvisitor.visitVarInsn(Opcodes.ILOAD, idx);
       break;
     case INT8:
+    case TIMESTAMP:
+    case TIME:
       methodvisitor.visitVarInsn(Opcodes.LLOAD, idx);
       break;
     case FLOAT4:

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/codegen/TupleCmp.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/TupleCmp.java b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/TupleCmp.java
new file mode 100644
index 0000000..71ee650
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/TupleCmp.java
@@ -0,0 +1,31 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.codegen;
+
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+
+public class TupleCmp extends TupleComparator {
+  @Override
+  public int compare(Tuple o1, Tuple o2) {
+
+    o1.clear();;
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/codegen/TupleComparerCompiler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/TupleComparerCompiler.java b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/TupleComparerCompiler.java
new file mode 100644
index 0000000..f7f113d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/TupleComparerCompiler.java
@@ -0,0 +1,318 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.codegen;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.UnsignedInts;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.engine.eval.EvalType;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.org.objectweb.asm.ClassWriter;
+import org.apache.tajo.org.objectweb.asm.Label;
+import org.apache.tajo.org.objectweb.asm.MethodVisitor;
+import org.apache.tajo.org.objectweb.asm.Opcodes;
+import org.apache.tajo.storage.BaseTupleComparator;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.tuple.offheap.UnSafeTuple;
+import org.apache.tajo.tuple.offheap.UnSafeTupleBytesComparator;
+import org.apache.tajo.util.UnsafeComparer;
+
+import java.lang.reflect.Constructor;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+import static org.apache.tajo.common.TajoDataTypes.Type;
+import static org.apache.tajo.engine.codegen.TajoGeneratorAdapter.getInternalName;
+
+public class TupleComparerCompiler {
+  private final static int LEFT_VALUE = 1;
+  private final static int RIGHT_VALUE = 2;
+
+  private static int classSeqId = 0;
+  private final TajoClassLoader classLoader;
+
+  public TupleComparerCompiler(TajoClassLoader classLoader) {
+    this.classLoader = classLoader;
+  }
+
+  public TupleComparator compile(BaseTupleComparator comp, boolean ensureUnSafeTuple) {
+    ClassWriter classWriter = new ClassWriter(ClassWriter.COMPUTE_MAXS);
+
+    String className = TupleComparator.class.getPackage().getName() + ".TupleComparator" + classSeqId++;
+
+    emitClassDefinition(classWriter, getInternalName(className));
+    emitConstructor(classWriter);
+    emitCompare(classWriter, comp, ensureUnSafeTuple);
+
+    classWriter.visitEnd();
+
+    Class clazz = classLoader.defineClass(className, classWriter.toByteArray());
+    Constructor constructor;
+    TupleComparator compiled;
+
+    try {
+      constructor = clazz.getConstructor();
+      compiled = (TupleComparator) constructor.newInstance();
+    } catch (Throwable t) {
+      throw new CompilationError(comp, t, classWriter.toByteArray());
+    }
+    return compiled;
+  }
+
+  private void emitClassDefinition(ClassWriter classWriter, String generatedClassName) {
+    classWriter.visit(
+        Opcodes.V1_6,
+        Opcodes.ACC_PUBLIC,
+        generatedClassName,
+        null,
+        getInternalName(TupleComparator.class),
+        new String[]{}
+    );
+  }
+
+  /**
+   * Generation Constructor
+   *
+   * @param classWriter
+   */
+  private void emitConstructor(ClassWriter classWriter) {
+    MethodVisitor constructorMethod = classWriter.visitMethod(Opcodes.ACC_PUBLIC, "<init>", "()V", null, null);
+    constructorMethod.visitVarInsn(Opcodes.ALOAD, 0);
+    constructorMethod.visitMethodInsn(Opcodes.INVOKESPECIAL,
+        getInternalName(TupleComparator.class),
+        "<init>",
+        "()V");
+    constructorMethod.visitInsn(Opcodes.RETURN);
+    constructorMethod.visitMaxs(0, 0);
+    constructorMethod.visitEnd();
+  }
+
+  /**
+   * Generation Comparator::compare(Tuple t1, Tuple t2);
+   *
+   * This code generation makes use of subtraction for bool, short, integer
+   *
+   *
+   * @param classWriter
+   * @param compImpl
+   */
+  private void emitCompare(ClassWriter classWriter, BaseTupleComparator compImpl, boolean ensureUnSafeTuple) {
+
+    String methodName = "compare";
+    String methodDesc = TajoGeneratorAdapter.getMethodDescription(int.class, new Class[]{Tuple.class, Tuple.class});
+    MethodVisitor compMethod = classWriter.visitMethod(Opcodes.ACC_PUBLIC, methodName, methodDesc, null, null);
+    compMethod.visitCode();
+    compMethod.visitVarInsn(Opcodes.ALOAD, 0);
+
+    TajoGeneratorAdapter adapter =
+        new TajoGeneratorAdapter(Opcodes.ACC_PUBLIC, compMethod, methodName, methodDesc);
+
+    final Label returnLabel = new Label();
+
+    for (int idx = 0; idx < compImpl.getSortSpecs().length; idx++) {
+
+      if (idx > 0) {
+        // this check is omitted in the first field for comparison
+        //
+        // if cmpVal == 0 {
+        //
+        // } else {
+        //   return cmpVal;
+        // }
+        adapter.dup();
+        compMethod.visitJumpInsn(Opcodes.IFNE, returnLabel);
+        compMethod.visitInsn(Opcodes.POP);
+      }
+
+      int nullFlag;
+
+      adapter.methodvisitor.visitVarInsn(Opcodes.ALOAD, 1);
+
+      adapter.push(compImpl.getSortKeyIds()[idx]);
+      if (compImpl.getSortSpecs()[idx].isNullFirst()) {
+        adapter.emitIsNotNullOfTuple();
+      } else {
+        adapter.emitIsNullOfTuple();
+      }
+      adapter.dup();
+      nullFlag = adapter.istore();
+
+      adapter.methodvisitor.visitVarInsn(Opcodes.ALOAD, 2);
+      adapter.push(compImpl.getSortKeyIds()[idx]);
+      if (compImpl.getSortSpecs()[idx].isNullFirst()) {
+        adapter.emitIsNotNullOfTuple();
+
+        adapter.dup();
+        adapter.iload(nullFlag);
+        compMethod.visitInsn(Opcodes.IAND);
+        compMethod.visitVarInsn(Opcodes.ISTORE, nullFlag);
+      } else {
+        adapter.emitIsNullOfTuple();
+
+        adapter.dup();
+        adapter.iload(nullFlag);
+        compMethod.visitInsn(Opcodes.IOR);
+        compMethod.visitVarInsn(Opcodes.ISTORE, nullFlag);
+      }
+
+      compMethod.visitInsn(Opcodes.ISUB);
+
+      adapter.dup();
+      compMethod.visitJumpInsn(Opcodes.IFNE, returnLabel);
+      adapter.pop();
+
+      Label nextComp = new Label();
+      Label pushComp = new Label();
+
+      // nullFlag indicates if either value is null. If one of them is null, we should skip reading values.
+      // For computation efficiency for null comparison, we use subtraction the results of isNull or isNotNull.
+      // We reuse the result as follows:
+      //
+      // <For Null First>
+      //
+      // if (left.isNotNull && right.isNotNull) == FALSE, we can ensure one of them is NULL.
+      //
+      // <For Null Last>
+      //
+      // if (left.isNull || right.isNull) != FALSE, we can ensure one of them is NULL.
+
+      adapter.iload(nullFlag);
+      if (compImpl.getSortSpecs()[idx].isNullFirst()) {
+        compMethod.visitJumpInsn(Opcodes.IFEQ, pushComp);
+      } else {
+        compMethod.visitJumpInsn(Opcodes.IFNE, pushComp);
+      }
+
+      SortSpec sortSpec = compImpl.getSortSpecs()[idx];
+      DataType dataType = sortSpec.getSortKey().getDataType();
+
+      if (dataType.getType() == Type.INET4) { // should be dealt as unsigned integers
+        emitComparisonForUnsignedInts(adapter, compImpl, idx);
+      } else if (TajoGeneratorAdapter.isJVMInternalInt(dataType)) {
+        emitComparisonForJVMInteger(adapter, compImpl, idx);
+      } else if (TajoGeneratorAdapter.getWordSize(dataType) == 2 || dataType.getType() == Type.FLOAT4) {
+        emitComparisonForOtherPrimitives(adapter, compImpl, idx);
+      } else if (dataType.getType() == Type.TEXT) {
+        emitComparisonForText(adapter, compImpl, idx, ensureUnSafeTuple);
+      } else {
+        throw new UnsupportedException("Unknown sort type: " + dataType.getType().name());
+      }
+      compMethod.visitJumpInsn(Opcodes.GOTO, nextComp);
+
+      compMethod.visitLabel(pushComp); // manually push compVal due to stack height balance
+      adapter.push(0);
+
+      compMethod.visitLabel(nextComp); // label for next value comparison
+    }
+
+    compMethod.visitLabel(returnLabel);
+    compMethod.visitInsn(Opcodes.IRETURN);
+    compMethod.visitMaxs(1, 0);
+    compMethod.visitEnd();
+  }
+
+  private void emitGetParam(TajoGeneratorAdapter adapter, BaseTupleComparator c, int idx, int paramIdx) {
+    Preconditions.checkArgument(paramIdx == LEFT_VALUE || paramIdx == RIGHT_VALUE,
+        "Param Index must be either 1 or 2.");
+
+    // If sort order is a descending order, it switches left and right sides..
+    boolean asc = c.getSortSpecs()[idx].isAscending();
+    adapter.methodvisitor.visitVarInsn(Opcodes.ALOAD,
+        asc ? paramIdx : (paramIdx == LEFT_VALUE ? RIGHT_VALUE : LEFT_VALUE));
+  }
+
+  private void emitComparisonForJVMInteger(TajoGeneratorAdapter adapter, BaseTupleComparator c, int idx) {
+    emitGetParam(adapter, c, idx, LEFT_VALUE);
+    adapter.emitGetValueOfTuple(c.getSortSpecs()[idx].getSortKey().getDataType(), c.getSortKeyIds()[idx]);
+
+    emitGetParam(adapter, c, idx, RIGHT_VALUE);
+    adapter.emitGetValueOfTuple(c.getSortSpecs()[idx].getSortKey().getDataType(), c.getSortKeyIds()[idx]);
+
+    adapter.methodvisitor.visitInsn(Opcodes.ISUB);
+  }
+
+  private void emitComparisonForUnsignedInts(TajoGeneratorAdapter adapter, BaseTupleComparator c, int idx) {
+    emitGetParam(adapter, c, idx, LEFT_VALUE);
+    adapter.emitGetValueOfTuple(c.getSortSpecs()[idx].getSortKey().getDataType(), c.getSortKeyIds()[idx]);
+
+    emitGetParam(adapter, c, idx, RIGHT_VALUE);
+    adapter.emitGetValueOfTuple(c.getSortSpecs()[idx].getSortKey().getDataType(), c.getSortKeyIds()[idx]);
+
+    adapter.invokeStatic(UnsignedInts.class, "compare", int.class, new Class [] {int.class, int.class});
+  }
+
+  private void emitComparisonForOtherPrimitives(TajoGeneratorAdapter adapter, BaseTupleComparator comp, int idx) {
+    DataType dataType = comp.getSortSpecs()[idx].getSortKey().getDataType();
+    emitGetParam(adapter, comp, idx, LEFT_VALUE);
+    adapter.emitGetValueOfTuple(dataType, comp.getSortKeyIds()[idx]);
+    int lhs = adapter.store(dataType);
+
+    emitGetParam(adapter, comp, idx, RIGHT_VALUE);
+    adapter.emitGetValueOfTuple(dataType, comp.getSortKeyIds()[idx]);
+    int rhs = adapter.store(dataType);
+
+    Label equalLabel = new Label();
+    Label elseLabel = new Label();
+    Label returnLabel = new Label();
+
+    adapter.load(dataType, lhs);
+    adapter.load(dataType, rhs);
+    adapter.ifCmp(dataType, EvalType.LTH, equalLabel);
+    adapter.push(-1);
+    adapter.gotoLabel(returnLabel);
+
+    adapter.methodvisitor.visitLabel(equalLabel);
+    adapter.load(dataType, lhs);
+    adapter.load(dataType, rhs);
+    adapter.ifCmp(dataType, EvalType.EQUAL, elseLabel);
+    adapter.push(0);
+    adapter.gotoLabel(returnLabel);
+
+    adapter.methodvisitor.visitLabel(elseLabel);
+    adapter.push(1);
+    adapter.methodvisitor.visitLabel(returnLabel);
+  }
+
+  private void emitComparisonForText(TajoGeneratorAdapter adapter, BaseTupleComparator c, int idx,
+                                     boolean ensureUnSafeTuple) {
+    if (ensureUnSafeTuple) {
+      emitGetParam(adapter, c, idx, LEFT_VALUE);
+      adapter.methodvisitor.visitTypeInsn(Opcodes.CHECKCAST, getInternalName(UnSafeTuple.class));
+      adapter.push(c.getSortKeyIds()[idx]);
+      adapter.invokeVirtual(UnSafeTuple.class, "getFieldAddr", long.class, new Class[]{int.class});
+
+      emitGetParam(adapter, c, idx, RIGHT_VALUE);
+      adapter.methodvisitor.visitTypeInsn(Opcodes.CHECKCAST, getInternalName(UnSafeTuple.class));
+      adapter.push(c.getSortKeyIds()[idx]);
+      adapter.invokeVirtual(UnSafeTuple.class, "getFieldAddr", long.class, new Class[]{int.class});
+
+      adapter.invokeStatic(UnSafeTupleBytesComparator.class, "compare", int.class, new Class[]{long.class, long.class});
+    } else {
+      emitGetParam(adapter, c, idx, LEFT_VALUE);
+      adapter.push(c.getSortKeyIds()[idx]);
+      adapter.invokeInterface(Tuple.class, "getBytes", byte [].class, new Class [] {int.class});
+
+      emitGetParam(adapter, c, idx, RIGHT_VALUE);
+      adapter.push(c.getSortKeyIds()[idx]);
+      adapter.invokeInterface(Tuple.class, "getBytes", byte [].class, new Class [] {int.class});
+
+      adapter.invokeStatic(UnsafeComparer.class, "compareStatic", int.class, new Class[]{byte[].class, byte[].class});
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/AvgDouble.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/AvgDouble.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/AvgDouble.java
index 493c098..7d0eb16 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/AvgDouble.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/AvgDouble.java
@@ -66,7 +66,7 @@ public class AvgDouble extends AggFunction {
   public void merge(FunctionContext ctx, Tuple part) {
     AvgContext avgCtx = (AvgContext) ctx;
     Datum d = part.get(0);
-    if (d instanceof NullDatum) {
+    if (d.isNull()) {
       return;
     }
     ProtobufDatum datum = (ProtobufDatum) d;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/AvgLong.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/AvgLong.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/AvgLong.java
index 4794f4d..70b930e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/AvgLong.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/AvgLong.java
@@ -61,7 +61,7 @@ public class AvgLong extends AggFunction<Float8Datum> {
   public void merge(FunctionContext ctx, Tuple part) {
     AvgContext avgCtx = (AvgContext) ctx;
     Datum d = part.get(0);
-    if (d instanceof NullDatum) {
+    if (d.isNull()) {
       return;
     }
     ProtobufDatum datum = (ProtobufDatum) d;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Coalesce.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Coalesce.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Coalesce.java
index 98abb02..e9ce4b4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Coalesce.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/Coalesce.java
@@ -34,7 +34,7 @@ public abstract class Coalesce extends GeneralFunction {
     int paramSize = params.size();
     for (int i = 0; i < paramSize; i++) {
       Datum datum = params.get(i);
-      if (datum instanceof NullDatum) {
+      if (datum.isNull()) {
         continue;
       }
       return datum;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CountValue.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CountValue.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CountValue.java
index 12d8892..f2ca8e9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CountValue.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CountValue.java
@@ -20,7 +20,6 @@ package org.apache.tajo.engine.function.builtin;
 
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.engine.function.FunctionContext;
 import org.apache.tajo.engine.function.annotation.Description;
 import org.apache.tajo.engine.function.annotation.ParamTypes;
@@ -47,7 +46,7 @@ public final class CountValue extends CountRows {
 
   @Override
   public void eval(FunctionContext ctx, Tuple params) {
-    if (!(params.get(0) instanceof NullDatum)) {
+    if (!(params.get(0).isNull())) {
       ((CountRowContext) ctx).count++;
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/SumDoubleDistinct.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/SumDoubleDistinct.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/SumDoubleDistinct.java
index 9b5b190..e011efc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/SumDoubleDistinct.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/SumDoubleDistinct.java
@@ -26,7 +26,6 @@ import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.Float8Datum;
-import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.engine.function.AggFunction;
 import org.apache.tajo.engine.function.FunctionContext;
 import org.apache.tajo.engine.function.annotation.Description;
@@ -66,7 +65,7 @@ public class SumDoubleDistinct extends AggFunction<Datum> {
   public void merge(FunctionContext context, Tuple params) {
     SumContext distinctContext = (SumContext) context;
     Datum value = params.get(0);
-    if ((distinctContext.latest == null || (!distinctContext.latest.equals(value)) && !(value instanceof NullDatum))) {
+    if ((distinctContext.latest == null || (!distinctContext.latest.equals(value)) && !(value.isNull()))) {
       distinctContext.latest = value;
       distinctContext.sum += value.asFloat8();
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/SumFloatDistinct.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/SumFloatDistinct.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/SumFloatDistinct.java
index f230622..ab2ae27 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/SumFloatDistinct.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/SumFloatDistinct.java
@@ -26,7 +26,6 @@ import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.Float8Datum;
-import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.engine.function.AggFunction;
 import org.apache.tajo.engine.function.FunctionContext;
 import org.apache.tajo.engine.function.annotation.Description;
@@ -66,7 +65,7 @@ public class SumFloatDistinct extends AggFunction<Datum> {
   public void merge(FunctionContext context, Tuple params) {
     SumContext distinctContext = (SumContext) context;
     Datum value = params.get(0);
-    if ((distinctContext.latest == null || (!distinctContext.latest.equals(value)) && !(value instanceof NullDatum))) {
+    if ((distinctContext.latest == null || (!distinctContext.latest.equals(value)) && !(value.isNull()))) {
       distinctContext.latest = value;
       distinctContext.sum += value.asFloat4();
     }


Mime
View raw message