tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [4/6] TAJO-1153: Merge off-heap package in block_iteration branch to master branch.
Date Tue, 04 Nov 2014 12:32:36 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
index 167e4a8..bfbe478 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
@@ -20,6 +20,7 @@ package org.apache.tajo.storage;
 
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.IntervalDatum;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.datum.ProtobufDatum;
 import org.apache.tajo.exception.UnsupportedException;
@@ -68,7 +69,12 @@ public class LazyTuple implements Tuple, Cloneable {
 
   @Override
   public boolean isNull(int fieldid) {
-    return get(fieldid) instanceof NullDatum;
+    return get(fieldid).isNull();
+  }
+
+  @Override
+  public boolean isNotNull(int fieldid) {
+    return !isNull(fieldid);
   }
 
   @Override
@@ -199,6 +205,11 @@ public class LazyTuple implements Tuple, Cloneable {
   }
 
   @Override
+  public IntervalDatum getInterval(int fieldId) {
+    return (IntervalDatum) get(fieldId);
+  }
+
+  @Override
   public char[] getUnicodeChars(int fieldId) {
     return get(fieldId).asUnicodeChars();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
index 70044ca..24b6280 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
@@ -23,8 +23,10 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.ProtobufDatum;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.storage.exception.UnknownDataTypeException;
+import org.apache.tajo.tuple.offheap.RowWriter;
 import org.apache.tajo.util.BitArray;
 
 import java.nio.ByteBuffer;
@@ -177,7 +179,8 @@ public class RowStoreUtil {
       nullFlags = new BitArray(schema.size());
       headerSize = nullFlags.bytesLength();
     }
-    public byte [] toBytes(Tuple tuple) {
+
+    public byte[] toBytes(Tuple tuple) {
       nullFlags.clear();
       int size = estimateTupleDataSize(tuple);
       ByteBuffer bb = ByteBuffer.allocate(size + headerSize);
@@ -191,42 +194,64 @@ public class RowStoreUtil {
 
         col = schema.getColumn(i);
         switch (col.getDataType().getType()) {
-          case NULL_TYPE: nullFlags.set(i); break;
-          case BOOLEAN: bb.put(tuple.get(i).asByte()); break;
-          case BIT: bb.put(tuple.get(i).asByte()); break;
-          case CHAR: bb.put(tuple.get(i).asByte()); break;
-          case INT2: bb.putShort(tuple.get(i).asInt2()); break;
-          case INT4: bb.putInt(tuple.get(i).asInt4()); break;
-          case INT8: bb.putLong(tuple.get(i).asInt8()); break;
-          case FLOAT4: bb.putFloat(tuple.get(i).asFloat4()); break;
-          case FLOAT8: bb.putDouble(tuple.get(i).asFloat8()); break;
-          case TEXT:
-            byte [] _string = tuple.get(i).asByteArray();
-            bb.putInt(_string.length);
-            bb.put(_string);
-            break;
-          case DATE: bb.putInt(tuple.get(i).asInt4()); break;
-          case TIME:
-          case TIMESTAMP:
-            bb.putLong(tuple.get(i).asInt8());
-            break;
-          case INTERVAL:
-            IntervalDatum interval = (IntervalDatum) tuple.get(i);
-            bb.putInt(interval.getMonths());
-            bb.putLong(interval.getMilliSeconds());
-            break;
-          case BLOB:
-            byte [] bytes = tuple.get(i).asByteArray();
-            bb.putInt(bytes.length);
-            bb.put(bytes);
-            break;
-          case INET4:
-            byte [] ipBytes = tuple.get(i).asByteArray();
-            bb.put(ipBytes);
-            break;
-          case INET6: bb.put(tuple.get(i).asByteArray()); break;
-          default:
-            throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
+        case NULL_TYPE:
+          nullFlags.set(i);
+          break;
+        case BOOLEAN:
+          bb.put(tuple.get(i).asByte());
+          break;
+        case BIT:
+          bb.put(tuple.get(i).asByte());
+          break;
+        case CHAR:
+          bb.put(tuple.get(i).asByte());
+          break;
+        case INT2:
+          bb.putShort(tuple.get(i).asInt2());
+          break;
+        case INT4:
+          bb.putInt(tuple.get(i).asInt4());
+          break;
+        case INT8:
+          bb.putLong(tuple.get(i).asInt8());
+          break;
+        case FLOAT4:
+          bb.putFloat(tuple.get(i).asFloat4());
+          break;
+        case FLOAT8:
+          bb.putDouble(tuple.get(i).asFloat8());
+          break;
+        case TEXT:
+          byte[] _string = tuple.get(i).asByteArray();
+          bb.putInt(_string.length);
+          bb.put(_string);
+          break;
+        case DATE:
+          bb.putInt(tuple.get(i).asInt4());
+          break;
+        case TIME:
+        case TIMESTAMP:
+          bb.putLong(tuple.get(i).asInt8());
+          break;
+        case INTERVAL:
+          IntervalDatum interval = (IntervalDatum) tuple.get(i);
+          bb.putInt(interval.getMonths());
+          bb.putLong(interval.getMilliSeconds());
+          break;
+        case BLOB:
+          byte[] bytes = tuple.get(i).asByteArray();
+          bb.putInt(bytes.length);
+          bb.put(bytes);
+          break;
+        case INET4:
+          byte[] ipBytes = tuple.get(i).asByteArray();
+          bb.put(ipBytes);
+          break;
+        case INET6:
+          bb.put(tuple.get(i).asByteArray());
+          break;
+        default:
+          throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
         }
       }
 
@@ -237,7 +262,7 @@ public class RowStoreUtil {
 
       bb.position(finalPosition);
       bb.flip();
-      byte [] buf = new byte [bb.limit()];
+      byte[] buf = new byte[bb.limit()];
       bb.get(buf);
       return buf;
     }
@@ -254,24 +279,38 @@ public class RowStoreUtil {
 
         col = schema.getColumn(i);
         switch (col.getDataType().getType()) {
-          case BOOLEAN:
-          case BIT:
-          case CHAR: size += 1; break;
-          case INT2: size += 2; break;
-          case DATE:
-          case INT4:
-          case FLOAT4: size += 4; break;
-          case TIME:
-          case TIMESTAMP:
-          case INT8:
-          case FLOAT8: size += 8; break;
-          case INTERVAL: size += 12; break;
-          case TEXT:
-          case BLOB: size += (4 + tuple.get(i).asByteArray().length); break;
-          case INET4:
-          case INET6: size += tuple.get(i).asByteArray().length; break;
-          default:
-            throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
+        case BOOLEAN:
+        case BIT:
+        case CHAR:
+          size += 1;
+          break;
+        case INT2:
+          size += 2;
+          break;
+        case DATE:
+        case INT4:
+        case FLOAT4:
+          size += 4;
+          break;
+        case TIME:
+        case TIMESTAMP:
+        case INT8:
+        case FLOAT8:
+          size += 8;
+          break;
+        case INTERVAL:
+          size += 12;
+          break;
+        case TEXT:
+        case BLOB:
+          size += (4 + tuple.get(i).asByteArray().length);
+          break;
+        case INET4:
+        case INET6:
+          size += tuple.get(i).asByteArray().length;
+          break;
+        default:
+          throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
         }
       }
 
@@ -284,4 +323,55 @@ public class RowStoreUtil {
       return schema;
     }
   }
+
+  public static void convert(Tuple tuple, RowWriter writer) {
+    writer.startRow();
+
+    for (int i = 0; i < writer.dataTypes().length; i++) {
+      if (tuple.isNull(i)) {
+        writer.skipField();
+        continue;
+      }
+      switch (writer.dataTypes()[i].getType()) {
+      case BOOLEAN:
+        writer.putBool(tuple.getBool(i));
+        break;
+      case INT1:
+      case INT2:
+        writer.putInt2(tuple.getInt2(i));
+        break;
+      case INT4:
+      case DATE:
+      case INET4:
+        writer.putInt4(tuple.getInt4(i));
+        break;
+      case INT8:
+      case TIMESTAMP:
+      case TIME:
+        writer.putInt8(tuple.getInt8(i));
+        break;
+      case FLOAT4:
+        writer.putFloat4(tuple.getFloat4(i));
+        break;
+      case FLOAT8:
+        writer.putFloat8(tuple.getFloat8(i));
+        break;
+      case TEXT:
+        writer.putText(tuple.getBytes(i));
+        break;
+      case INTERVAL:
+        writer.putInterval((IntervalDatum) tuple.getInterval(i));
+        break;
+      case PROTOBUF:
+        writer.putProtoDatum((ProtobufDatum) tuple.getProtobufDatum(i));
+        break;
+      case NULL_TYPE:
+        writer.skipField();
+        break;
+      default:
+        throw new UnsupportedException("Unknown data type: " + writer.dataTypes()[i]);
+      }
+    }
+    writer.endRow();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
index 084c105..8dffd8d 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
@@ -1,4 +1,4 @@
-/**
+/***
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,164 +18,15 @@
 
 package org.apache.tajo.storage;
 
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.common.ProtoObject;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
 
 import java.util.Comparator;
 
-import static org.apache.tajo.catalog.proto.CatalogProtos.TupleComparatorSpecProto;
 import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
 
-/**
- * The Comparator class for Tuples
- * 
- * @see Tuple
- */
-public class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComparatorProto> {
-  private final int[] sortKeyIds;
-  private final boolean[] asc;
-  @SuppressWarnings("unused")
-  private final boolean[] nullFirsts;  
-
-  /**
-   * @param schema The schema of input tuples
-   * @param sortKeys The description of sort keys
-   */
-  public TupleComparator(Schema schema, SortSpec[] sortKeys) {
-    Preconditions.checkArgument(sortKeys.length > 0, 
-        "At least one sort key must be specified.");
-
-    this.sortKeyIds = new int[sortKeys.length];
-    this.asc = new boolean[sortKeys.length];
-    this.nullFirsts = new boolean[sortKeys.length];
-    for (int i = 0; i < sortKeys.length; i++) {
-      if (sortKeys[i].getSortKey().hasQualifier()) {
-        this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName());
-      } else {
-        this.sortKeyIds[i] = schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName());
-      }
-          
-      this.asc[i] = sortKeys[i].isAscending();
-      this.nullFirsts[i]= sortKeys[i].isNullFirst();
-    }
-  }
-
-  public TupleComparator(TupleComparatorProto proto) {
-    this.sortKeyIds = new int[proto.getCompSpecsCount()];
-    this.asc = new boolean[proto.getCompSpecsCount()];
-    this.nullFirsts = new boolean[proto.getCompSpecsCount()];
-
-    for (int i = 0; i < proto.getCompSpecsCount(); i++) {
-      TupleComparatorSpecProto sortSepcProto = proto.getCompSpecs(i);
-      sortKeyIds[i] = sortSepcProto.getColumnId();
-      asc[i] = sortSepcProto.getAscending();
-      nullFirsts[i] = sortSepcProto.getNullFirst();
-    }
-  }
-
-  public boolean isAscendingFirstKey() {
-    return this.asc[0];
-  }
-
-  @Override
-  public int compare(Tuple tuple1, Tuple tuple2) {
-    Datum left = null;
-    Datum right = null;
-    int compVal = 0;
-
-    for (int i = 0; i < sortKeyIds.length; i++) {
-      left = tuple1.get(sortKeyIds[i]);
-      right = tuple2.get(sortKeyIds[i]);
-
-      if (left instanceof NullDatum || right instanceof NullDatum) {
-        if (!left.equals(right)) {
-          if (left instanceof NullDatum) {
-            compVal = 1;
-          } else if (right instanceof NullDatum) {
-            compVal = -1;
-          }
-          if (nullFirsts[i]) {
-            if (compVal != 0) {
-              compVal *= -1;
-            }
-          }
-        } else {
-          compVal = 0;
-        }
-      } else {
-        if (asc[i]) {
-          compVal = left.compareTo(right);
-        } else {
-          compVal = right.compareTo(left);
-        }
-      }
-
-      if (compVal < 0 || compVal > 0) {
-        return compVal;
-      }
-    }
-    return 0;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(sortKeyIds);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj instanceof TupleComparator) {
-      TupleComparator other = (TupleComparator) obj;
-      if (sortKeyIds.length != other.sortKeyIds.length) {
-        return false;
-      }
-
-      for (int i = 0; i < sortKeyIds.length; i++) {
-        if (sortKeyIds[i] != other.sortKeyIds[i] ||
-            asc[i] != other.asc[i] ||
-            nullFirsts[i] != other.nullFirsts[i]) {
-          return false;
-        }
-      }
-
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public TupleComparatorProto getProto() {
-    TupleComparatorProto.Builder builder = TupleComparatorProto.newBuilder();
-    TupleComparatorSpecProto.Builder sortSpecBuilder;
-
-    for (int i = 0; i < sortKeyIds.length; i++) {
-      sortSpecBuilder = TupleComparatorSpecProto.newBuilder();
-      sortSpecBuilder.setColumnId(sortKeyIds[i]);
-      sortSpecBuilder.setAscending(asc[i]);
-      sortSpecBuilder.setNullFirst(nullFirsts[i]);
-      builder.addCompSpecs(sortSpecBuilder);
-    }
-
-    return builder.build();
-  }
+public abstract class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComparatorProto> {
 
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
+  public abstract int compare(Tuple o1, Tuple o2);
 
-    String prefix = "";
-    for (int i = 0; i < sortKeyIds.length; i++) {
-      sb.append(prefix).append("SortKeyId=").append(sortKeyIds[i])
-        .append(",Asc=").append(asc[i])
-        .append(",NullFirst=").append(nullFirsts[i]);
-      prefix = " ,";
-    }
-    return sb.toString();
-  }
-}
\ No newline at end of file
+  public abstract boolean isAscendingFirstKey();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
index 6cc09d4..e824b99 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
@@ -33,7 +33,7 @@ public class TupleRange implements Comparable<TupleRange>, Cloneable {
   private final TupleComparator comp;
 
   public TupleRange(final SortSpec[] sortSpecs, final Tuple start, final Tuple end) {
-    this.comp = new TupleComparator(sortSpecsToSchema(sortSpecs), sortSpecs);
+    this.comp = new BaseTupleComparator(sortSpecsToSchema(sortSpecs), sortSpecs);
     // if there is only one value, start == end
     this.start = start;
     this.end = end;

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
index 74be7ff..ccba3be 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
@@ -20,6 +20,7 @@ package org.apache.tajo.storage.index;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.storage.BaseTupleComparator;
 import org.apache.tajo.storage.TupleComparator;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
index 5d43bd5..f093f9d 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
@@ -27,12 +27,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
-import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
-import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.index.IndexMethod;
 import org.apache.tajo.storage.index.IndexWriter;
 import org.apache.tajo.storage.index.OrderIndexReader;
@@ -72,8 +69,7 @@ public class BSTIndex implements IndexMethod {
   }
 
   @Override
-  public BSTIndexReader getIndexReader(Path fileName, Schema keySchema,
-      TupleComparator comparator) throws IOException {
+  public BSTIndexReader getIndexReader(Path fileName, Schema keySchema, TupleComparator comparator) throws IOException {
     return new BSTIndexReader(fileName, keySchema, comparator);
   }
 
@@ -350,7 +346,7 @@ public class BSTIndex implements IndexMethod {
 
       TupleComparatorProto.Builder compProto = TupleComparatorProto.newBuilder();
       compProto.mergeFrom(compBytes);
-      this.comparator = new TupleComparator(compProto.build());
+      this.comparator = new BaseTupleComparator(compProto.build());
 
       // level
       this.level = indexIn.readInt();

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
new file mode 100644
index 0000000..c1835df
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
@@ -0,0 +1,112 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.offheap.HeapTuple;
+import org.apache.tajo.tuple.offheap.OffHeapRowWriter;
+import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, Deallocatable {
+  private static final Log LOG = LogFactory.getLog(BaseTupleBuilder.class);
+
+  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+  // buffer
+  private ByteBuffer buffer;
+  private long address;
+
+  public BaseTupleBuilder(Schema schema) {
+    super(SchemaUtil.toDataTypes(schema));
+    buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB).order(ByteOrder.nativeOrder());
+    address = UnsafeUtil.getAddress(buffer);
+  }
+
+  @Override
+  public long address() {
+    return address;
+  }
+
+  public void ensureSize(int size) {
+    if (buffer.remaining() - size < 0) { // check the remain size
+      // enlarge new buffer and copy writing data
+      int newBlockSize = UnsafeUtil.alignedSize(buffer.capacity() * 2);
+      ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize);
+      long newAddress = ((DirectBuffer)newByteBuf).address();
+      UNSAFE.copyMemory(this.address, newAddress, buffer.limit());
+      LOG.debug("Increase the buffer size to " + FileUtil.humanReadableByteCount(newBlockSize, false));
+
+      // release existing buffer and replace variables
+      UnsafeUtil.free(buffer);
+      buffer = newByteBuf;
+      address = newAddress;
+    }
+  }
+
+  @Override
+  public int position() {
+    return 0;
+  }
+
+  @Override
+  public void forward(int length) {
+  }
+
+  @Override
+  public void endRow() {
+    super.endRow();
+    buffer.position(0).limit(offset());
+  }
+
+  @Override
+  public Tuple build() {
+    return buildToHeapTuple();
+  }
+
+  public HeapTuple buildToHeapTuple() {
+    byte [] bytes = new byte[buffer.limit()];
+    UNSAFE.copyMemory(null, address, bytes, UnsafeUtil.ARRAY_BOOLEAN_BASE_OFFSET, buffer.limit());
+    return new HeapTuple(bytes, dataTypes());
+  }
+
+  public ZeroCopyTuple buildToZeroCopyTuple() {
+    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+    zcTuple.set(buffer, 0, buffer.limit(), dataTypes());
+    return zcTuple;
+  }
+
+  public void release() {
+    UnsafeUtil.free(buffer);
+    buffer = null;
+    address = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
new file mode 100644
index 0000000..be734e1
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.tajo.storage.Tuple;
+
+public interface RowBlockReader<T extends Tuple> {
+
+  /**
+   * Return for each tuple
+   *
+   * @return True if tuple block is filled with tuples. Otherwise, It will return false.
+   */
+  public boolean next(T tuple);
+
+  public void reset();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
new file mode 100644
index 0000000..c43c018
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
@@ -0,0 +1,26 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.offheap.RowWriter;
+
+public interface TupleBuilder extends RowWriter {
+  public Tuple build();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
new file mode 100644
index 0000000..9662d5a
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.util.UnsafeUtil;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class DirectBufTuple extends UnSafeTuple implements Deallocatable {
+  private ByteBuffer bb;
+
+  public DirectBufTuple(int length, DataType[] types) {
+    bb = ByteBuffer.allocateDirect(length).order(ByteOrder.nativeOrder());
+    set(bb, 0, length, types);
+  }
+
+  @Override
+  public void release() {
+    UnsafeUtil.free(bb);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
new file mode 100644
index 0000000..a327123
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+/**
+ * Fixed size limit specification
+ */
+public class FixedSizeLimitSpec extends ResizableLimitSpec {
+  public FixedSizeLimitSpec(long size) {
+    super(size, size);
+  }
+
+  public FixedSizeLimitSpec(long size, float allowedOverflowRatio) {
+    super(size, size, allowedOverflowRatio);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
new file mode 100644
index 0000000..e38555c
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
@@ -0,0 +1,269 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.sun.tools.javac.util.Convert;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class HeapTuple implements Tuple {
+  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+  private static final long BASE_OFFSET = UnsafeUtil.ARRAY_BYTE_BASE_OFFSET;
+
+  private final byte [] data;
+  private final DataType [] types;
+
+  public HeapTuple(final byte [] bytes, final DataType [] types) {
+    this.data = bytes;
+    this.types = types;
+  }
+
+  @Override
+  public int size() {
+    return data.length;
+  }
+
+  public ByteBuffer nioBuffer() {
+    return ByteBuffer.wrap(data);
+  }
+
+  private int getFieldOffset(int fieldId) {
+    return UNSAFE.getInt(data, BASE_OFFSET + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
+  }
+
+  private int checkNullAndGetOffset(int fieldId) {
+    int offset = getFieldOffset(fieldId);
+    if (offset == OffHeapRowBlock.NULL_FIELD_OFFSET) {
+      throw new RuntimeException("Invalid Field Access: " + fieldId);
+    }
+    return offset;
+  }
+
+  @Override
+  public boolean contains(int fieldid) {
+    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public boolean isNull(int fieldid) {
+    return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public boolean isNotNull(int fieldid) {
+    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public void clear() {
+    // nothing to do
+  }
+
+  @Override
+  public void put(int fieldId, Datum value) {
+    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum).");
+  }
+
+  @Override
+  public void put(int fieldId, Datum[] values) {
+    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
+  }
+
+  @Override
+  public void put(int fieldId, Tuple tuple) {
+    throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
+  }
+
+  @Override
+  public void put(Datum[] values) {
+    throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
+  }
+
+  @Override
+  public Datum get(int fieldId) {
+    if (isNull(fieldId)) {
+      return NullDatum.get();
+    }
+
+    switch (types[fieldId].getType()) {
+    case BOOLEAN:
+      return DatumFactory.createBool(getBool(fieldId));
+    case INT1:
+    case INT2:
+      return DatumFactory.createInt2(getInt2(fieldId));
+    case INT4:
+      return DatumFactory.createInt4(getInt4(fieldId));
+    case INT8:
+      return DatumFactory.createInt8(getInt4(fieldId));
+    case FLOAT4:
+      return DatumFactory.createFloat4(getFloat4(fieldId));
+    case FLOAT8:
+      return DatumFactory.createFloat8(getFloat8(fieldId));
+    case TEXT:
+      return DatumFactory.createText(getText(fieldId));
+    case TIMESTAMP:
+      return DatumFactory.createTimestamp(getInt8(fieldId));
+    case DATE:
+      return DatumFactory.createDate(getInt4(fieldId));
+    case TIME:
+      return DatumFactory.createTime(getInt8(fieldId));
+    case INTERVAL:
+      return getInterval(fieldId);
+    case INET4:
+      return DatumFactory.createInet4(getInt4(fieldId));
+    case PROTOBUF:
+      return getProtobufDatum(fieldId);
+    default:
+      throw new UnsupportedException("Unknown type: " + types[fieldId]);
+    }
+  }
+
+  @Override
+  public void setOffset(long offset) {
+  }
+
+  @Override
+  public long getOffset() {
+    return 0;
+  }
+
+  @Override
+  public boolean getBool(int fieldId) {
+    return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)) == 0x01;
+  }
+
+  @Override
+  public byte getByte(int fieldId) {
+    return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public char getChar(int fieldId) {
+    return UNSAFE.getChar(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public byte[] getBytes(int fieldId) {
+    long pos = checkNullAndGetOffset(fieldId);
+    int len = UNSAFE.getInt(data, BASE_OFFSET + pos);
+    pos += SizeOf.SIZE_OF_INT;
+
+    byte [] bytes = new byte[len];
+    UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+    return bytes;
+  }
+
+  @Override
+  public short getInt2(int fieldId) {
+    return UNSAFE.getShort(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public int getInt4(int fieldId) {
+    return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public long getInt8(int fieldId) {
+    return UNSAFE.getLong(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public float getFloat4(int fieldId) {
+    return UNSAFE.getFloat(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public double getFloat8(int fieldId) {
+    return UNSAFE.getDouble(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public String getText(int fieldId) {
+    return new String(getBytes(fieldId));
+  }
+
+  public IntervalDatum getInterval(int fieldId) {
+    long pos = checkNullAndGetOffset(fieldId);
+    int months = UNSAFE.getInt(data, BASE_OFFSET + pos);
+    pos += SizeOf.SIZE_OF_INT;
+    long millisecs = UNSAFE.getLong(data, BASE_OFFSET + pos);
+    return new IntervalDatum(months, millisecs);
+  }
+
+  @Override
+  public Datum getProtobufDatum(int fieldId) {
+    byte [] bytes = getBytes(fieldId);
+
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode());
+    Message.Builder builder = factory.newBuilder();
+    try {
+      builder.mergeFrom(bytes);
+    } catch (InvalidProtocolBufferException e) {
+      return NullDatum.get();
+    }
+
+    return new ProtobufDatum(builder.build());
+  }
+
+  @Override
+  public char[] getUnicodeChars(int fieldId) {
+    long pos = checkNullAndGetOffset(fieldId);
+    int len = UNSAFE.getInt(data, BASE_OFFSET + pos);
+    pos += SizeOf.SIZE_OF_INT;
+
+    byte [] bytes = new byte[len];
+    UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+    return Convert.utf2chars(bytes);
+  }
+
+  @Override
+  public Tuple clone() throws CloneNotSupportedException {
+    return this;
+  }
+
+  @Override
+  public Datum[] getValues() {
+    Datum [] datums = new Datum[size()];
+    for (int i = 0; i < size(); i++) {
+      if (contains(i)) {
+        datums[i] = get(i);
+      } else {
+        datums[i] = NullDatum.get();
+      }
+    }
+    return datums;
+  }
+
+  @Override
+  public String toString() {
+    return VTuple.toDisplayString(getValues());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
new file mode 100644
index 0000000..2f8e349
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class OffHeapMemory implements Deallocatable {
+  private static final Log LOG = LogFactory.getLog(OffHeapMemory.class);
+
+  protected static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+  protected ByteBuffer buffer;
+  protected int memorySize;
+  protected ResizableLimitSpec limitSpec;
+  protected long address;
+
+  @VisibleForTesting
+  protected OffHeapMemory(ByteBuffer buffer, ResizableLimitSpec limitSpec) {
+    this.buffer = buffer;
+    this.address = ((DirectBuffer) buffer).address();
+    this.memorySize = buffer.limit();
+    this.limitSpec = limitSpec;
+  }
+
+  public OffHeapMemory(ResizableLimitSpec limitSpec) {
+    this(ByteBuffer.allocateDirect((int) limitSpec.initialSize()).order(ByteOrder.nativeOrder()), limitSpec);
+  }
+
+  public long address() {
+    return address;
+  }
+
+  public long size() {
+    return memorySize;
+  }
+
+  public void resize(int newSize) {
+    Preconditions.checkArgument(newSize > 0, "Size must be greater than 0 bytes");
+
+    if (newSize > limitSpec.limit()) {
+      throw new RuntimeException("Resize cannot exceed the size limit");
+    }
+
+    if (newSize < memorySize) {
+      LOG.warn("The size reduction is ignored.");
+    }
+
+    int newBlockSize = UnsafeUtil.alignedSize(newSize);
+    ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize);
+    long newAddress = ((DirectBuffer)newByteBuf).address();
+
+    UNSAFE.copyMemory(this.address, newAddress, memorySize);
+
+    UnsafeUtil.free(buffer);
+    this.memorySize = newSize;
+    this.buffer = newByteBuf;
+    this.address = newAddress;
+  }
+
+  public java.nio.Buffer nioBuffer() {
+    return (ByteBuffer) buffer.position(0).limit(memorySize);
+  }
+
+  @Override
+  public void release() {
+    UnsafeUtil.free(this.buffer);
+    this.buffer = null;
+    this.address = 0;
+    this.memorySize = 0;
+  }
+
+  public String toString() {
+    return "memory=" + FileUtil.humanReadableByteCount(memorySize, false) + "," + limitSpec;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
new file mode 100644
index 0000000..689efb7
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
@@ -0,0 +1,176 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.SizeOf;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class OffHeapRowBlock extends OffHeapMemory implements Deallocatable {
+  private static final Log LOG = LogFactory.getLog(OffHeapRowBlock.class);
+
+  public static final int NULL_FIELD_OFFSET = -1;
+
+  DataType [] dataTypes;
+
+  // Basic States
+  private int maxRowNum = Integer.MAX_VALUE; // optional
+  private int rowNum;
+  protected int position = 0;
+
+  private OffHeapRowBlockWriter builder;
+
+  private OffHeapRowBlock(ByteBuffer buffer, Schema schema, ResizableLimitSpec limitSpec) {
+    super(buffer, limitSpec);
+    initialize(schema);
+  }
+
+  public OffHeapRowBlock(Schema schema, ResizableLimitSpec limitSpec) {
+    super(limitSpec);
+    initialize(schema);
+  }
+
+  private void initialize(Schema schema) {
+    dataTypes = SchemaUtil.toDataTypes(schema);
+
+    this.builder = new OffHeapRowBlockWriter(this);
+  }
+
+  @VisibleForTesting
+  public OffHeapRowBlock(Schema schema, int bytes) {
+    this(schema, new ResizableLimitSpec(bytes));
+  }
+
+  @VisibleForTesting
+  public OffHeapRowBlock(Schema schema, ByteBuffer buffer) {
+    this(buffer, schema, ResizableLimitSpec.DEFAULT_LIMIT);
+  }
+
+  public void position(int pos) {
+    this.position = pos;
+  }
+
+  public void clear() {
+    this.position = 0;
+    this.rowNum = 0;
+
+    builder.clear();
+  }
+
+  @Override
+  public ByteBuffer nioBuffer() {
+    return (ByteBuffer) buffer.position(0).limit(position);
+  }
+
+  public int position() {
+    return position;
+  }
+
+  public long usedMem() {
+    return position;
+  }
+
+  /**
+   * Ensure that this buffer has enough remaining space to add the size.
+   * Creates and copies to a new buffer if necessary
+   *
+   * @param size Size to add
+   */
+  public void ensureSize(int size) {
+    if (remain() - size < 0) {
+      if (!limitSpec.canIncrease(memorySize)) {
+        throw new RuntimeException("Cannot increase RowBlock anymore.");
+      }
+
+      int newBlockSize = limitSpec.increasedSize(memorySize);
+      resize(newBlockSize);
+      LOG.info("Increase DirectRowBlock to " + FileUtil.humanReadableByteCount(newBlockSize, false));
+    }
+  }
+
+  public long remain() {
+    return memorySize - position - builder.offset();
+  }
+
+  public int maxRowNum() {
+    return maxRowNum;
+  }
+  public int rows() {
+    return rowNum;
+  }
+
+  public void setRows(int rowNum) {
+    this.rowNum = rowNum;
+  }
+
+  public boolean copyFromChannel(FileChannel channel, TableStats stats) throws IOException {
+    if (channel.position() < channel.size()) {
+      clear();
+
+      buffer.clear();
+      channel.read(buffer);
+      memorySize = buffer.position();
+
+      while (position < memorySize) {
+        long recordPtr = address + position;
+
+        if (remain() < SizeOf.SIZE_OF_INT) {
+          channel.position(channel.position() - remain());
+          memorySize = (int) (memorySize - remain());
+          return true;
+        }
+
+        int recordSize = UNSAFE.getInt(recordPtr);
+
+        if (remain() < recordSize) {
+          channel.position(channel.position() - remain());
+          memorySize = (int) (memorySize - remain());
+          return true;
+        }
+
+        position += recordSize;
+        rowNum++;
+      }
+
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public RowWriter getWriter() {
+    return builder;
+  }
+
+  public OffHeapRowBlockReader getReader() {
+    return new OffHeapRowBlockReader(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
new file mode 100644
index 0000000..4a9313f
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
@@ -0,0 +1,63 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.tuple.RowBlockReader;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+
+public class OffHeapRowBlockReader implements RowBlockReader<ZeroCopyTuple> {
+  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+  OffHeapRowBlock rowBlock;
+
+  // Read States
+  private int curRowIdxForRead;
+  private int curPosForRead;
+
+  public OffHeapRowBlockReader(OffHeapRowBlock rowBlock) {
+    this.rowBlock = rowBlock;
+  }
+
+  public long remainForRead() {
+    return rowBlock.memorySize - curPosForRead;
+  }
+
+  @Override
+  public boolean next(ZeroCopyTuple tuple) {
+    if (curRowIdxForRead < rowBlock.rows()) {
+
+      long recordStartPtr = rowBlock.address() + curPosForRead;
+      int recordLen = UNSAFE.getInt(recordStartPtr);
+      tuple.set(rowBlock.buffer, curPosForRead, recordLen, rowBlock.dataTypes);
+
+      curPosForRead += recordLen;
+      curRowIdxForRead++;
+
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public void reset() {
+    curPosForRead = 0;
+    curRowIdxForRead = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
new file mode 100644
index 0000000..dbc3188
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.collect.Lists;
+import org.apache.tajo.storage.Tuple;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+public class OffHeapRowBlockUtils {
+
+  public static List<Tuple> sort(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) {
+    List<Tuple> tupleList = Lists.newArrayList();
+    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+    while(reader.next(zcTuple)) {
+      tupleList.add(zcTuple);
+      zcTuple = new ZeroCopyTuple();
+    }
+    Collections.sort(tupleList, comparator);
+    return tupleList;
+  }
+
+  public static Tuple[] sortToArray(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) {
+    Tuple[] tuples = new Tuple[rowBlock.rows()];
+    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+    for (int i = 0; i < rowBlock.rows() && reader.next(zcTuple); i++) {
+      tuples[i] = zcTuple;
+      zcTuple = new ZeroCopyTuple();
+    }
+    Arrays.sort(tuples, comparator);
+    return tuples;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
new file mode 100644
index 0000000..d177e0c
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.common.TajoDataTypes;
+
+public class OffHeapRowBlockWriter extends OffHeapRowWriter {
+  OffHeapRowBlock rowBlock;
+
+  OffHeapRowBlockWriter(OffHeapRowBlock rowBlock) {
+    super(rowBlock.dataTypes);
+    this.rowBlock = rowBlock;
+  }
+
+  public long address() {
+    return rowBlock.address();
+  }
+
+  public int position() {
+    return rowBlock.position();
+  }
+
+  @Override
+  public void forward(int length) {
+    rowBlock.position(position() + length);
+  }
+
+  public void ensureSize(int size) {
+    rowBlock.ensureSize(size);
+  }
+
+  @Override
+  public void endRow() {
+    super.endRow();
+    rowBlock.setRows(rowBlock.rows() + 1);
+  }
+
+  @Override
+  public TajoDataTypes.DataType[] dataTypes() {
+    return rowBlock.dataTypes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
new file mode 100644
index 0000000..85c7e0b
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.UnsafeUtil;
+
+/**
+ *
+ * Row Record Structure
+ *
+ * | row length (4 bytes) | field 1 offset | field 2 offset | ... | field N offset| field 1 | field 2| ... | field N |
+ *                              4 bytes          4 bytes               4 bytes
+ *
+ */
+public abstract class OffHeapRowWriter implements RowWriter {
+  /** record size + offset list */
+  private final int headerSize;
+  /** field offsets */
+  private final int [] fieldOffsets;
+  private final TajoDataTypes.DataType [] dataTypes;
+
+  private int curFieldIdx;
+  private int curOffset;
+
+  public OffHeapRowWriter(final TajoDataTypes.DataType [] dataTypes) {
+    this.dataTypes = dataTypes;
+    fieldOffsets = new int[dataTypes.length];
+    headerSize = SizeOf.SIZE_OF_INT * (dataTypes.length + 1);
+  }
+
+  public void clear() {
+    curOffset = 0;
+    curFieldIdx = 0;
+  }
+
+  public long recordStartAddr() {
+    return address() + position();
+  }
+
+  public abstract long address();
+
+  public abstract void ensureSize(int size);
+
+  public int offset() {
+    return curOffset;
+  }
+
+  /**
+   * Current position
+   *
+   * @return The position
+   */
+  public abstract int position();
+
+  /**
+   * Forward the address;
+   *
+   * @param length Length to be forwarded
+   */
+  public abstract void forward(int length);
+
+  @Override
+  public TajoDataTypes.DataType[] dataTypes() {
+    return dataTypes;
+  }
+
+  public boolean startRow() {
+    curOffset = headerSize;
+    curFieldIdx = 0;
+    return true;
+  }
+
+  public void endRow() {
+    long rowHeaderPos = address() + position();
+    OffHeapMemory.UNSAFE.putInt(rowHeaderPos, curOffset);
+    rowHeaderPos += SizeOf.SIZE_OF_INT;
+
+    for (int i = 0; i < curFieldIdx; i++) {
+      OffHeapMemory.UNSAFE.putInt(rowHeaderPos, fieldOffsets[i]);
+      rowHeaderPos += SizeOf.SIZE_OF_INT;
+    }
+    for (int i = curFieldIdx; i < dataTypes.length; i++) {
+      OffHeapMemory.UNSAFE.putInt(rowHeaderPos, OffHeapRowBlock.NULL_FIELD_OFFSET);
+      rowHeaderPos += SizeOf.SIZE_OF_INT;
+    }
+
+    // rowOffset is equivalent to a byte length of this row.
+    forward(curOffset);
+  }
+
+  public void skipField() {
+    fieldOffsets[curFieldIdx++] = OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  private void forwardField() {
+    fieldOffsets[curFieldIdx++] = curOffset;
+  }
+
+  public void putBool(boolean val) {
+    ensureSize(SizeOf.SIZE_OF_BOOL);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putByte(recordStartAddr() + curOffset, (byte) (val ? 0x01 : 0x00));
+
+    curOffset += SizeOf.SIZE_OF_BOOL;
+  }
+
+  public void putInt2(short val) {
+    ensureSize(SizeOf.SIZE_OF_SHORT);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putShort(recordStartAddr() + curOffset, val);
+    curOffset += SizeOf.SIZE_OF_SHORT;
+  }
+
+  public void putInt4(int val) {
+    ensureSize(SizeOf.SIZE_OF_INT);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, val);
+    curOffset += SizeOf.SIZE_OF_INT;
+  }
+
+  public void putInt8(long val) {
+    ensureSize(SizeOf.SIZE_OF_LONG);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putLong(recordStartAddr() + curOffset, val);
+    curOffset += SizeOf.SIZE_OF_LONG;
+  }
+
+  public void putFloat4(float val) {
+    ensureSize(SizeOf.SIZE_OF_FLOAT);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putFloat(recordStartAddr() + curOffset, val);
+    curOffset += SizeOf.SIZE_OF_FLOAT;
+  }
+
+  public void putFloat8(double val) {
+    ensureSize(SizeOf.SIZE_OF_DOUBLE);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putDouble(recordStartAddr() + curOffset, val);
+    curOffset += SizeOf.SIZE_OF_DOUBLE;
+  }
+
+  public void putText(String val) {
+    byte[] bytes = val.getBytes(TextDatum.DEFAULT_CHARSET);
+    putText(bytes);
+  }
+
+  public void putText(byte[] val) {
+    int bytesLen = val.length;
+
+    ensureSize(SizeOf.SIZE_OF_INT + bytesLen);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen);
+    curOffset += SizeOf.SIZE_OF_INT;
+
+    OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null,
+        recordStartAddr() + curOffset, bytesLen);
+    curOffset += bytesLen;
+  }
+
+  public void putBlob(byte[] val) {
+    int bytesLen = val.length;
+
+    ensureSize(SizeOf.SIZE_OF_INT + bytesLen);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen);
+    curOffset += SizeOf.SIZE_OF_INT;
+
+    OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null,
+        recordStartAddr() + curOffset, bytesLen);
+    curOffset += bytesLen;
+  }
+
+  public void putTimestamp(long val) {
+    putInt8(val);
+  }
+
+  public void putDate(int val) {
+    putInt4(val);
+  }
+
+  public void putTime(long val) {
+    putInt8(val);
+  }
+
+  public void putInterval(IntervalDatum val) {
+    ensureSize(SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG);
+    forwardField();
+
+    long offset = recordStartAddr() + curOffset;
+    OffHeapMemory.UNSAFE.putInt(offset, val.getMonths());
+    offset += SizeOf.SIZE_OF_INT;
+    OffHeapMemory.UNSAFE.putLong(offset, val.getMilliSeconds());
+    curOffset += SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG;
+  }
+
+  public void putInet4(int val) {
+    putInt4(val);
+  }
+
+  public void putProtoDatum(ProtobufDatum val) {
+    putBlob(val.asByteArray());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
new file mode 100644
index 0000000..14e67b2
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.FileUtil;
+
+/**
+ * It specifies the maximum size or increasing ratio. In addition,
+ * it guarantees that all numbers are less than or equal to Integer.MAX_VALUE 2^31
+ * due to ByteBuffer.
+ */
+public class ResizableLimitSpec {
+  private final Log LOG = LogFactory.getLog(ResizableLimitSpec.class);
+
+  public static final int MAX_SIZE_BYTES = Integer.MAX_VALUE;
+  public static final ResizableLimitSpec DEFAULT_LIMIT = new ResizableLimitSpec(Integer.MAX_VALUE);
+
+  private final long initSize;
+  private final long limitBytes;
+  private final float incRatio;
+  private final float allowedOVerflowRatio;
+  private final static float DEFAULT_ALLOWED_OVERFLOW_RATIO = 0.1f;
+  private final static float DEFAULT_INCREASE_RATIO = 1.0f;
+
+  public ResizableLimitSpec(long initSize) {
+    this(initSize, MAX_SIZE_BYTES, DEFAULT_ALLOWED_OVERFLOW_RATIO);
+  }
+
+  public ResizableLimitSpec(long initSize, long limitBytes) {
+    this(initSize, limitBytes, DEFAULT_ALLOWED_OVERFLOW_RATIO);
+  }
+
+  public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflow) {
+    this(initSize, limitBytes, allowedOverflow, DEFAULT_INCREASE_RATIO);
+  }
+
+  public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflowRatio, float incRatio) {
+    Preconditions.checkArgument(initSize > 0, "initial size must be greater than 0 bytes.");
+    Preconditions.checkArgument(initSize <= MAX_SIZE_BYTES, "The maximum initial size is 2GB.");
+    Preconditions.checkArgument(limitBytes > 0, "The limit size must be greater than 0 bytes.");
+    Preconditions.checkArgument(limitBytes <= MAX_SIZE_BYTES, "The maximum limit size is 2GB.");
+    Preconditions.checkArgument(incRatio > 0.0f, "Increase Ratio must be greater than 0.");
+
+    if (initSize == limitBytes) {
+      long overflowedSize = (long) (initSize + (initSize * allowedOverflowRatio));
+
+      if (overflowedSize > Integer.MAX_VALUE) {
+        overflowedSize = Integer.MAX_VALUE;
+      }
+
+      this.initSize = overflowedSize;
+      this.limitBytes = overflowedSize;
+    } else {
+      this.initSize = initSize;
+      limitBytes = (long) (limitBytes + (limitBytes * allowedOverflowRatio));
+
+      if (limitBytes > Integer.MAX_VALUE) {
+        this.limitBytes = Integer.MAX_VALUE;
+      } else {
+        this.limitBytes = limitBytes;
+      }
+    }
+
+    this.allowedOVerflowRatio = allowedOverflowRatio;
+    this.incRatio = incRatio;
+  }
+
+  public long initialSize() {
+    return initSize;
+  }
+
+  public long limit() {
+    return limitBytes;
+  }
+
+  public float remainRatio(long currentSize) {
+    Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
+    if (currentSize > Integer.MAX_VALUE) {
+      currentSize = Integer.MAX_VALUE;
+    }
+    return (float)currentSize / (float)limitBytes;
+  }
+
+  public boolean canIncrease(long currentSize) {
+    return remain(currentSize) > 0;
+  }
+
+  public long remain(long currentSize) {
+    Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
+    return limitBytes > Integer.MAX_VALUE ? Integer.MAX_VALUE - currentSize : limitBytes - currentSize;
+  }
+
+  public int increasedSize(int currentSize) {
+    if (currentSize < initSize) {
+      return (int) initSize;
+    }
+
+    if (currentSize > Integer.MAX_VALUE) {
+      LOG.warn("Current size already exceeds the maximum size (" + Integer.MAX_VALUE + " bytes)");
+      return Integer.MAX_VALUE;
+    }
+    long nextSize = (long) (currentSize + ((float) currentSize * incRatio));
+
+    if (nextSize > limitBytes) {
+      LOG.info("Increasing reaches size limit (" + FileUtil.humanReadableByteCount(limitBytes, false) + ")");
+      nextSize = limitBytes;
+    }
+
+    if (nextSize > Integer.MAX_VALUE) {
+      LOG.info("Increasing reaches maximum size (" + FileUtil.humanReadableByteCount(Integer.MAX_VALUE, false) + ")");
+      nextSize = Integer.MAX_VALUE;
+    }
+
+    return (int) nextSize;
+  }
+
+  @Override
+  public String toString() {
+    return "init=" + FileUtil.humanReadableByteCount(initSize, false) + ",limit="
+        + FileUtil.humanReadableByteCount(limitBytes, false) + ",overflow_ratio=" + allowedOVerflowRatio
+        + ",inc_ratio=" + incRatio;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
new file mode 100644
index 0000000..a2b2561
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
@@ -0,0 +1,73 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+
+/**
+ * The call sequence should be as follows:
+ *
+ * <pre>
+ *   startRow() -->  skipField() or putXXX --> endRow()
+ * </pre>
+ *
+ * The total number of skipField and putXXX invocations must be equivalent to the number of fields.
+ */
+public interface RowWriter {
+
+  public TajoDataTypes.DataType [] dataTypes();
+
+  public boolean startRow();
+
+  public void endRow();
+
+  public void skipField();
+
+  public void putBool(boolean val);
+
+  public void putInt2(short val);
+
+  public void putInt4(int val);
+
+  public void putInt8(long val);
+
+  public void putFloat4(float val);
+
+  public void putFloat8(double val);
+
+  public void putText(String val);
+
+  public void putText(byte[] val);
+
+  public void putBlob(byte[] val);
+
+  public void putTimestamp(long val);
+
+  public void putTime(long val);
+
+  public void putDate(int val);
+
+  public void putInterval(IntervalDatum val);
+
+  public void putInet4(int val);
+
+  public void putProtoDatum(ProtobufDatum datum);
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
new file mode 100644
index 0000000..138386f
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
@@ -0,0 +1,308 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.sun.tools.javac.util.Convert;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public abstract class UnSafeTuple implements Tuple {
+  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+  private DirectBuffer bb;
+  private int relativePos;
+  private int length;
+  private DataType [] types;
+
+  protected void set(ByteBuffer bb, int relativePos, int length, DataType [] types) {
+    this.bb = (DirectBuffer) bb;
+    this.relativePos = relativePos;
+    this.length = length;
+    this.types = types;
+  }
+
+  void set(ByteBuffer bb, DataType [] types) {
+    set(bb, 0, bb.limit(), types);
+  }
+
+  @Override
+  public int size() {
+    return types.length;
+  }
+
+  public ByteBuffer nioBuffer() {
+    return ((ByteBuffer)((ByteBuffer)bb).duplicate().position(relativePos).limit(relativePos + length)).slice();
+  }
+
+  public HeapTuple toHeapTuple() {
+    byte [] bytes = new byte[length];
+    UNSAFE.copyMemory(null, bb.address() + relativePos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, length);
+    return new HeapTuple(bytes, types);
+  }
+
+  public void copyFrom(UnSafeTuple tuple) {
+    Preconditions.checkNotNull(tuple);
+
+    ((ByteBuffer) bb).clear();
+    if (length < tuple.length) {
+      UnsafeUtil.free((ByteBuffer) bb);
+      bb = (DirectBuffer) ByteBuffer.allocateDirect(tuple.length).order(ByteOrder.nativeOrder());
+      this.relativePos = 0;
+      this.length = tuple.length;
+    }
+
+    ((ByteBuffer) bb).put(tuple.nioBuffer());
+  }
+
+  private int getFieldOffset(int fieldId) {
+    return UNSAFE.getInt(bb.address() + relativePos + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
+  }
+
+  public long getFieldAddr(int fieldId) {
+    int fieldOffset = getFieldOffset(fieldId);
+    if (fieldOffset == -1) {
+      throw new RuntimeException("Invalid Field Access: " + fieldId);
+    }
+    return bb.address() + relativePos + fieldOffset;
+  }
+
+  @Override
+  public boolean contains(int fieldid) {
+    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public boolean isNull(int fieldid) {
+    return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public boolean isNotNull(int fieldid) {
+    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public void clear() {
+    // nothing to do
+  }
+
+  @Override
+  public void put(int fieldId, Datum value) {
+    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum).");
+  }
+
+  @Override
+  public void put(int fieldId, Datum[] values) {
+    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
+  }
+
+  @Override
+  public void put(int fieldId, Tuple tuple) {
+    throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
+  }
+
+  @Override
+  public void put(Datum[] values) {
+    throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
+  }
+
+  @Override
+  public Datum get(int fieldId) {
+    if (isNull(fieldId)) {
+      return NullDatum.get();
+    }
+
+    switch (types[fieldId].getType()) {
+    case BOOLEAN:
+      return DatumFactory.createBool(getBool(fieldId));
+    case INT1:
+    case INT2:
+      return DatumFactory.createInt2(getInt2(fieldId));
+    case INT4:
+      return DatumFactory.createInt4(getInt4(fieldId));
+    case INT8:
+      return DatumFactory.createInt8(getInt4(fieldId));
+    case FLOAT4:
+      return DatumFactory.createFloat4(getFloat4(fieldId));
+    case FLOAT8:
+      return DatumFactory.createFloat8(getFloat8(fieldId));
+    case TEXT:
+      return DatumFactory.createText(getText(fieldId));
+    case TIMESTAMP:
+      return DatumFactory.createTimestamp(getInt8(fieldId));
+    case DATE:
+      return DatumFactory.createDate(getInt4(fieldId));
+    case TIME:
+      return DatumFactory.createTime(getInt8(fieldId));
+    case INTERVAL:
+      return getInterval(fieldId);
+    case INET4:
+      return DatumFactory.createInet4(getInt4(fieldId));
+    case PROTOBUF:
+      return getProtobufDatum(fieldId);
+    default:
+      throw new UnsupportedException("Unknown type: " + types[fieldId]);
+    }
+  }
+
+  @Override
+  public void setOffset(long offset) {
+  }
+
+  @Override
+  public long getOffset() {
+    return 0;
+  }
+
+  @Override
+  public boolean getBool(int fieldId) {
+    return UNSAFE.getByte(getFieldAddr(fieldId)) == 0x01;
+  }
+
+  @Override
+  public byte getByte(int fieldId) {
+    return UNSAFE.getByte(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public char getChar(int fieldId) {
+    return UNSAFE.getChar(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public byte[] getBytes(int fieldId) {
+    long pos = getFieldAddr(fieldId);
+    int len = UNSAFE.getInt(pos);
+    pos += SizeOf.SIZE_OF_INT;
+
+    byte [] bytes = new byte[len];
+    UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+    return bytes;
+  }
+
+  @Override
+  public short getInt2(int fieldId) {
+    long addr = getFieldAddr(fieldId);
+    return UNSAFE.getShort(addr);
+  }
+
+  @Override
+  public int getInt4(int fieldId) {
+    return UNSAFE.getInt(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public long getInt8(int fieldId) {
+    return UNSAFE.getLong(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public float getFloat4(int fieldId) {
+    return UNSAFE.getFloat(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public double getFloat8(int fieldId) {
+    return UNSAFE.getDouble(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public String getText(int fieldId) {
+    long pos = getFieldAddr(fieldId);
+    int len = UNSAFE.getInt(pos);
+    pos += SizeOf.SIZE_OF_INT;
+
+    byte [] bytes = new byte[len];
+    UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+    return new String(bytes);
+  }
+
+  public IntervalDatum getInterval(int fieldId) {
+    long pos = getFieldAddr(fieldId);
+    int months = UNSAFE.getInt(pos);
+    pos += SizeOf.SIZE_OF_INT;
+    long millisecs = UNSAFE.getLong(pos);
+    return new IntervalDatum(months, millisecs);
+  }
+
+  @Override
+  public Datum getProtobufDatum(int fieldId) {
+    byte [] bytes = getBytes(fieldId);
+
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode());
+    Message.Builder builder = factory.newBuilder();
+    try {
+      builder.mergeFrom(bytes);
+    } catch (InvalidProtocolBufferException e) {
+      return NullDatum.get();
+    }
+
+    return new ProtobufDatum(builder.build());
+  }
+
+  @Override
+  public char[] getUnicodeChars(int fieldId) {
+    long pos = getFieldAddr(fieldId);
+    int len = UNSAFE.getInt(pos);
+    pos += SizeOf.SIZE_OF_INT;
+
+    byte [] bytes = new byte[len];
+    UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+    return Convert.utf2chars(bytes);
+  }
+
+  @Override
+  public Tuple clone() throws CloneNotSupportedException {
+    return toHeapTuple();
+  }
+
+  @Override
+  public Datum[] getValues() {
+    Datum [] datums = new Datum[size()];
+    for (int i = 0; i < size(); i++) {
+      if (contains(i)) {
+        datums[i] = get(i);
+      } else {
+        datums[i] = NullDatum.get();
+      }
+    }
+    return datums;
+  }
+
+  @Override
+  public String toString() {
+    return VTuple.toDisplayString(getValues());
+  }
+
+  public abstract void release();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
new file mode 100644
index 0000000..73e1e2f
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
@@ -0,0 +1,99 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.UnsignedLongs;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+
+import java.nio.ByteOrder;
+
+/**
+ * It directly access UTF bytes in UnSafeTuple without any copy. It is used by compiled TupleComparator.
+ */
+public class UnSafeTupleBytesComparator {
+  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+  static final boolean littleEndian =
+      ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
+
+  public static int compare(long ptr1, long ptr2) {
+    int lstrLen = UNSAFE.getInt(ptr1);
+    int rstrLen = UNSAFE.getInt(ptr2);
+
+    ptr1 += SizeOf.SIZE_OF_INT;
+    ptr2 += SizeOf.SIZE_OF_INT;
+
+    int minLength = Math.min(lstrLen, rstrLen);
+    int minWords = minLength / Longs.BYTES;
+
+        /*
+         * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a
+         * time is no slower than comparing 4 bytes at a time even on 32-bit.
+         * On the other hand, it is substantially faster on 64-bit.
+         */
+    for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
+      long lw = UNSAFE.getLong(ptr1);
+      long rw = UNSAFE.getLong(ptr2);
+      long diff = lw ^ rw;
+
+      if (diff != 0) {
+        if (!littleEndian) {
+          return UnsignedLongs.compare(lw, rw);
+        }
+
+        // Use binary search
+        int n = 0;
+        int y;
+        int x = (int) diff;
+        if (x == 0) {
+          x = (int) (diff >>> 32);
+          n = 32;
+        }
+
+        y = x << 16;
+        if (y == 0) {
+          n += 16;
+        } else {
+          x = y;
+        }
+
+        y = x << 8;
+        if (y == 0) {
+          n += 8;
+        }
+        return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
+      }
+
+      ptr1 += SizeOf.SIZE_OF_LONG;
+      ptr2 += SizeOf.SIZE_OF_LONG;
+    }
+
+    // The epilogue to cover the last (minLength % 8) elements.
+    for (int i = minWords * Longs.BYTES; i < minLength; i++) {
+      int result = UNSAFE.getByte(ptr1++) - UNSAFE.getByte(ptr2++);
+      if (result != 0) {
+        return result;
+      }
+    }
+    return lstrLen - rstrLen;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
new file mode 100644
index 0000000..51dbb29
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class ZeroCopyTuple extends UnSafeTuple {
+
+  public void set(ByteBuffer bb, int relativePos, int length, DataType [] types) {
+    super.set(bb, relativePos, length, types);
+  }
+
+  @Override
+  public void release() {
+    // nothing to do
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/main/proto/IndexProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/proto/IndexProtos.proto b/tajo-storage/src/main/proto/IndexProtos.proto
index bcb0cbe..f5c8a08 100644
--- a/tajo-storage/src/main/proto/IndexProtos.proto
+++ b/tajo-storage/src/main/proto/IndexProtos.proto
@@ -25,5 +25,7 @@ option java_generate_equals_and_hash = true;
 import "CatalogProtos.proto";
 
 message TupleComparatorProto {
-  repeated TupleComparatorSpecProto compSpecs = 1;
+  required SchemaProto schema = 1;
+  repeated SortSpecProto sortSpecs = 2;
+  repeated TupleComparatorSpecProto compSpecs = 3;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
index ab7c2b2..639ca04 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
@@ -69,7 +69,7 @@ public class TestTupleComparator {
     SortSpec sortKey1 = new SortSpec(schema.getColumn("col4"), true, false);
     SortSpec sortKey2 = new SortSpec(schema.getColumn("col5"), true, false);
 
-    TupleComparator tc = new TupleComparator(schema,
+    BaseTupleComparator tc = new BaseTupleComparator(schema,
         new SortSpec[] {sortKey1, sortKey2});
     assertEquals(-1, tc.compare(tuple1, tuple2));
     assertEquals(1, tc.compare(tuple2, tuple1));


Mime
View raw message