tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject [4/4] tajo git commit: TAJO-1738: Improve off-heap RowBlock.
Date Thu, 03 Sep 2015 12:45:34 GMT
TAJO-1738: Improve off-heap RowBlock.

Closes #712


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/3a30f45c
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/3a30f45c
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/3a30f45c

Branch: refs/heads/master
Commit: 3a30f45c7befca5baf1aca59eda76ad8231ba005
Parents: 8c50410
Author: Jinho Kim <jhkim@apache.org>
Authored: Thu Sep 3 21:44:09 2015 +0900
Committer: Jinho Kim <jhkim@apache.org>
Committed: Thu Sep 3 21:44:09 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 tajo-cli/pom.xml                                |   4 +
 .../org/apache/tajo/storage/RowStoreUtil.java   | 140 ++---
 tajo-cluster-tests/pom.xml                      |  20 +
 .../org/apache/tajo/storage/BufferPool.java     | 142 +++++
 .../org/apache/tajo/tuple/BaseTupleBuilder.java |  84 +++
 .../org/apache/tajo/tuple/RowBlockReader.java   |  35 ++
 .../org/apache/tajo/tuple/TupleBuilder.java     |  26 +
 .../tajo/tuple/memory/DirectBufTuple.java       |  42 ++
 .../tajo/tuple/memory/FixedSizeLimitSpec.java   |  32 +
 .../tajo/tuple/memory/HeapRowBlockReader.java   |  70 +++
 .../org/apache/tajo/tuple/memory/HeapTuple.java | 300 ++++++++++
 .../apache/tajo/tuple/memory/MemoryBlock.java   | 155 +++++
 .../tajo/tuple/memory/MemoryRowBlock.java       | 174 ++++++
 .../tuple/memory/OffHeapRowBlockReader.java     |  77 +++
 .../tajo/tuple/memory/OffHeapRowBlockUtils.java | 141 +++++
 .../tuple/memory/OffHeapRowBlockWriter.java     |  63 ++
 .../tajo/tuple/memory/OffHeapRowWriter.java     | 305 ++++++++++
 .../tajo/tuple/memory/ResizableLimitSpec.java   | 142 +++++
 .../tajo/tuple/memory/ResizableMemoryBlock.java | 231 +++++++
 .../org/apache/tajo/tuple/memory/RowBlock.java  |  48 ++
 .../org/apache/tajo/tuple/memory/RowWriter.java |  80 +++
 .../apache/tajo/tuple/memory/UnSafeTuple.java   | 342 +++++++++++
 .../memory/UnSafeTupleBytesComparator.java      |  99 +++
 .../apache/tajo/tuple/memory/ZeroCopyTuple.java |  48 ++
 .../java/org/apache/tajo/util/UnsafeUtil.java   |  10 +-
 .../apache/tajo/tuple/TestBaseTupleBuilder.java |  80 +++
 .../apache/tajo/tuple/memory/TestHeapTuple.java |  82 +++
 .../tajo/tuple/memory/TestMemoryRowBlock.java   | 595 +++++++++++++++++++
 .../tajo/tuple/memory/TestResizableSpec.java    |  59 ++
 tajo-core-tests/pom.xml                         |  16 +
 tajo-jdbc/pom.xml                               |   4 +
 tajo-storage/tajo-storage-common/pom.xml        |   8 +
 .../org/apache/tajo/storage/BufferPool.java     | 125 ----
 .../org/apache/tajo/storage/RowStoreUtil.java   |  54 --
 .../org/apache/tajo/tuple/BaseTupleBuilder.java | 112 ----
 .../org/apache/tajo/tuple/RowBlockReader.java   |  33 -
 .../org/apache/tajo/tuple/TupleBuilder.java     |  26 -
 .../tajo/tuple/offheap/DirectBufTuple.java      |  41 --
 .../tajo/tuple/offheap/FixedSizeLimitSpec.java  |  32 -
 .../apache/tajo/tuple/offheap/HeapTuple.java    | 292 ---------
 .../tajo/tuple/offheap/OffHeapMemory.java       | 102 ----
 .../tajo/tuple/offheap/OffHeapRowBlock.java     | 213 -------
 .../tuple/offheap/OffHeapRowBlockReader.java    |  63 --
 .../tuple/offheap/OffHeapRowBlockUtils.java     |  54 --
 .../tuple/offheap/OffHeapRowBlockWriter.java    |  58 --
 .../tajo/tuple/offheap/OffHeapRowWriter.java    | 232 --------
 .../tajo/tuple/offheap/ResizableLimitSpec.java  | 142 -----
 .../apache/tajo/tuple/offheap/RowWriter.java    |  73 ---
 .../apache/tajo/tuple/offheap/UnSafeTuple.java  | 331 -----------
 .../offheap/UnSafeTupleBytesComparator.java     |  99 ---
 .../tajo/tuple/offheap/ZeroCopyTuple.java       |  35 --
 .../src/main/resources/storage-default.xml      |  20 +-
 .../apache/tajo/tuple/TestBaseTupleBuilder.java |  76 ---
 .../tajo/tuple/offheap/TestHeapTuple.java       |  45 --
 .../tajo/tuple/offheap/TestOffHeapRowBlock.java | 577 ------------------
 .../tajo/tuple/offheap/TestResizableSpec.java   |  59 --
 .../src/test/resources/storage-default.xml      |  22 +-
 tajo-storage/tajo-storage-hbase/pom.xml         |  10 +
 .../storage/rawfile/DirectRawFileScanner.java   |  75 ++-
 .../storage/rawfile/DirectRawFileWriter.java    | 100 +---
 .../tajo/storage/text/ByteBufLineReader.java    |   4 +-
 .../org/apache/tajo/storage/TestStorages.java   | 138 ++---
 .../tajo/storage/raw/TestDirectRawFile.java     |  21 +-
 .../src/test/resources/storage-default.xml      |  33 +-
 65 files changed, 3804 insertions(+), 3149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 350ba1e..d306497 100644
--- a/CHANGES
+++ b/CHANGES
@@ -34,6 +34,8 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1738: Improve off-heap RowBlock. (jinho)
+
     TAJO-1810: Remove QueryMasterTask cache immediately, if it stored to 
     persistent storage. (jinho)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-cli/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-cli/pom.xml b/tajo-cli/pom.xml
index 83d7056..8de3c54 100644
--- a/tajo-cli/pom.xml
+++ b/tajo-cli/pom.xml
@@ -168,6 +168,10 @@
           <artifactId>hadoop-mapreduce-client-core</artifactId>
           <groupId>org.apache.hadoop</groupId>
         </exclusion>
+        <exclusion>
+          <artifactId>netty-all</artifactId>
+          <groupId>io.netty</groupId>
+        </exclusion>
       </exclusions>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
index 4c7fe0a..bc23af8 100644
--- a/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
+++ b/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
@@ -70,7 +70,7 @@ public class RowStoreUtil {
     }
 
 
-    public Tuple toTuple(byte [] bytes) {
+    public Tuple toTuple(byte[] bytes) {
       nullFlags.clear();
       ByteBuffer bb = ByteBuffer.wrap(bytes);
       Tuple tuple = new VTuple(schema.size());
@@ -81,7 +81,7 @@ public class RowStoreUtil {
       nullFlags.fromByteBuffer(bb);
       bb.limit(bytes.length);
 
-      for (int i =0; i < schema.size(); i++) {
+      for (int i = 0; i < schema.size(); i++) {
         if (nullFlags.get(i)) {
           tuple.put(i, DatumFactory.createNullDatum());
           continue;
@@ -90,73 +90,75 @@ public class RowStoreUtil {
         col = schema.getColumn(i);
         type = col.getDataType();
         switch (type.getType()) {
-          case BOOLEAN: tuple.put(i, DatumFactory.createBool(bb.get())); break;
-          case BIT:
-            byte b = bb.get();
-            tuple.put(i, DatumFactory.createBit(b));
-            break;
-
-          case CHAR:
-            byte [] _str = new byte[type.getLength()];
-            bb.get(_str);
-            tuple.put(i, DatumFactory.createChar(_str));
-            break;
-
-          case INT2:
-            short s = bb.getShort();
-            tuple.put(i, DatumFactory.createInt2(s));
-            break;
-
-          case INT4:
-          case DATE:
-            int i_ = bb.getInt();
-            tuple.put(i, DatumFactory.createFromInt4(type, i_));
-            break;
-
-          case INT8:
-          case TIME:
-          case TIMESTAMP:
-            long l = bb.getLong();
-            tuple.put(i, DatumFactory.createFromInt8(type, l));
-            break;
-
-          case INTERVAL:
-            int month  = bb.getInt();
-            long milliseconds  = bb.getLong();
-            tuple.put(i, new IntervalDatum(month, milliseconds));
-            break;
-
-          case FLOAT4:
-            float f = bb.getFloat();
-            tuple.put(i, DatumFactory.createFloat4(f));
-            break;
-
-          case FLOAT8:
-            double d = bb.getDouble();
-            tuple.put(i, DatumFactory.createFloat8(d));
-            break;
-
-          case TEXT:
-            byte [] _string = new byte[bb.getInt()];
-            bb.get(_string);
-            tuple.put(i, DatumFactory.createText(_string));
-            break;
-
-          case BLOB:
-            byte [] _bytes = new byte[bb.getInt()];
-            bb.get(_bytes);
-            tuple.put(i, DatumFactory.createBlob(_bytes));
-            break;
-
-          case INET4:
-            byte [] _ipv4 = new byte[4];
-            bb.get(_ipv4);
-            tuple.put(i, DatumFactory.createInet4(_ipv4));
-            break;
-
-          default:
-            throw new TajoRuntimeException(
-                new UnsupportedException("data type '" + col.getDataType().getType().name() + "'"));
+        case BOOLEAN:
+          tuple.put(i, DatumFactory.createBool(bb.get()));
+          break;
+        case BIT:
+          byte b = bb.get();
+          tuple.put(i, DatumFactory.createBit(b));
+          break;
+
+        case CHAR:
+          byte[] _str = new byte[type.getLength()];
+          bb.get(_str);
+          tuple.put(i, DatumFactory.createChar(_str));
+          break;
+
+        case INT2:
+          short s = bb.getShort();
+          tuple.put(i, DatumFactory.createInt2(s));
+          break;
+
+        case INT4:
+        case DATE:
+          int i_ = bb.getInt();
+          tuple.put(i, DatumFactory.createFromInt4(type, i_));
+          break;
+
+        case INT8:
+        case TIME:
+        case TIMESTAMP:
+          long l = bb.getLong();
+          tuple.put(i, DatumFactory.createFromInt8(type, l));
+          break;
+
+        case INTERVAL:
+          int month = bb.getInt();
+          long milliseconds = bb.getLong();
+          tuple.put(i, new IntervalDatum(month, milliseconds));
+          break;
+
+        case FLOAT4:
+          float f = bb.getFloat();
+          tuple.put(i, DatumFactory.createFloat4(f));
+          break;
+
+        case FLOAT8:
+          double d = bb.getDouble();
+          tuple.put(i, DatumFactory.createFloat8(d));
+          break;
+
+        case TEXT:
+          byte[] _string = new byte[bb.getInt()];
+          bb.get(_string);
+          tuple.put(i, DatumFactory.createText(_string));
+          break;
+
+        case BLOB:
+          byte[] _bytes = new byte[bb.getInt()];
+          bb.get(_bytes);
+          tuple.put(i, DatumFactory.createBlob(_bytes));
+          break;
+
+        case INET4:
+          byte[] _ipv4 = new byte[4];
+          bb.get(_ipv4);
+          tuple.put(i, DatumFactory.createInet4(_ipv4));
+          break;
+
+        default:
+          throw new TajoRuntimeException(
+              new UnsupportedException("data type '" + col.getDataType().getType().name() + "'"));
         }
       }
       return tuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-cluster-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-cluster-tests/pom.xml b/tajo-cluster-tests/pom.xml
index 2958586..2d879d9 100644
--- a/tajo-cluster-tests/pom.xml
+++ b/tajo-cluster-tests/pom.xml
@@ -213,6 +213,10 @@
           <groupId>com.sun.jersey.jersey-test-framework</groupId>
           <artifactId>jersey-test-framework-grizzly2</artifactId>
         </exclusion>
+        <exclusion>
+          <artifactId>netty-all</artifactId>
+          <groupId>io.netty</groupId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -240,6 +244,10 @@
           <groupId>com.sun.jersey.jersey-test-framework</groupId>
           <artifactId>jersey-test-framework-grizzly2</artifactId>
         </exclusion>
+        <exclusion>
+          <artifactId>netty-all</artifactId>
+          <groupId>io.netty</groupId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -264,12 +272,24 @@
       <version>${hbase.version}</version>
       <type>test-jar</type>
       <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>netty-all</artifactId>
+          <groupId>io.netty</groupId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-server</artifactId>
       <version>${hbase.version}</version>
       <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>netty-all</artifactId>
+          <groupId>io.netty</groupId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java
new file mode 100644
index 0000000..3120083
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import io.netty.buffer.*;
+import io.netty.util.ResourceLeakDetector;
+import io.netty.util.internal.PlatformDependent;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.CommonTestingUtil;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/* this class is PooledBuffer holder */
+public class BufferPool {
+
+  public static final String ALLOW_CACHE = "tajo.storage.buffer.thread-local.cache";
+  private static final ByteBufAllocator ALLOCATOR;
+
+  private BufferPool() {
+  }
+
+  static {
+    /* TODO Enable thread cache
+    *  Create a pooled ByteBuf allocator but disables the thread-local cache.
+    *  Because the TaskRunner thread is newly created
+    * */
+
+    if (System.getProperty(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) {
+      /* Disable pooling buffers for memory usage  */
+      ALLOCATOR = UnpooledByteBufAllocator.DEFAULT;
+
+      /* if you are finding memory leak, please enable this line */
+      ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
+    } else {
+      TajoConf tajoConf = new TajoConf();
+      ALLOCATOR = createPooledByteBufAllocator(true, tajoConf.getBoolean(ALLOW_CACHE, false), 0);
+    }
+  }
+
+  /**
+   * borrowed from Spark
+   */
+  public static PooledByteBufAllocator createPooledByteBufAllocator(
+      boolean allowDirectBufs,
+      boolean allowCache,
+      int numCores) {
+    if (numCores == 0) {
+      numCores = Runtime.getRuntime().availableProcessors();
+    }
+    return new PooledByteBufAllocator(
+        allowDirectBufs && PlatformDependent.directBufferPreferred(),
+        Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores),
+        Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores : 0),
+        getPrivateStaticField("DEFAULT_PAGE_SIZE"),
+        getPrivateStaticField("DEFAULT_MAX_ORDER"),
+        allowCache ? getPrivateStaticField("DEFAULT_TINY_CACHE_SIZE") : 0,
+        allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0,
+        allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0
+    );
+  }
+
+  /** Used to get defaults from Netty's private static fields. */
+  private static int getPrivateStaticField(String name) {
+    try {
+      Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
+      f.setAccessible(true);
+      return f.getInt(null);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static long maxDirectMemory() {
+    return PlatformDependent.maxDirectMemory();
+  }
+
+
+  public static ByteBuf directBuffer(int size) {
+    return ALLOCATOR.directBuffer(size);
+  }
+
+  /**
+   *
+   * @param size the initial capacity
+   * @param max the max capacity
+   * @return allocated ByteBuf from pool
+   */
+  public static ByteBuf directBuffer(int size, int max) {
+    return ALLOCATOR.directBuffer(size, max).order(ByteOrder.LITTLE_ENDIAN);
+  }
+
+  /**
+   *
+   * @param size the initial capacity
+   * @param max the max capacity
+   * @return heap ByteBuf
+   */
+  public static ByteBuf heapBuffer(int size, int max) {
+    return Unpooled.buffer(size, max).order(ByteOrder.LITTLE_ENDIAN);
+  }
+
+  @InterfaceStability.Unstable
+  public static void forceRelease(ByteBuf buf) {
+    buf.release(buf.refCnt());
+  }
+
+  /**
+   * the ByteBuf will increase to writable size
+   * @param buf
+   * @param minWritableBytes required minimum writable size
+   */
+  public static ByteBuf ensureWritable(ByteBuf buf, int minWritableBytes) {
+    return buf.ensureWritable(minWritableBytes).order(ByteOrder.LITTLE_ENDIAN);
+  }
+
+  /**
+   * deallocate the specified direct
+   * @param byteBuffer
+   */
+  public static void free(ByteBuffer byteBuffer) {
+    PlatformDependent.freeDirectBuffer(byteBuffer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
new file mode 100644
index 0000000..04a0267
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
@@ -0,0 +1,84 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.memory.*;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.Deallocatable;
+
+public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, Deallocatable {
+
+  private MemoryBlock memoryBlock;
+
+  public BaseTupleBuilder(DataType[] schema) {
+    super(schema);
+    this.memoryBlock = new ResizableMemoryBlock(new ResizableLimitSpec(64 * StorageUnit.KB), true);
+  }
+
+  @Override
+  public long address() {
+    return memoryBlock.address();
+  }
+
+  public void ensureSize(int size) {
+    memoryBlock.ensureSize(size);
+  }
+
+  @Override
+  public int position() {
+    return memoryBlock.writerPosition();
+  }
+
+  @Override
+  public void forward(int length) {
+    memoryBlock.writerPosition(memoryBlock.writerPosition() + length);
+  }
+
+  @Override
+  public boolean startRow() {
+    memoryBlock.writerPosition(0);
+    return super.startRow();
+  }
+
+  @Override
+  public void endRow() {
+    super.endRow();
+  }
+
+  @Override
+  public Tuple build() {
+    return buildToHeapTuple();
+  }
+
+  public HeapTuple buildToHeapTuple() {
+    return buildToZeroCopyTuple().toHeapTuple();
+  }
+
+  public UnSafeTuple buildToZeroCopyTuple() {
+    UnSafeTuple zcTuple = new UnSafeTuple();
+    zcTuple.set(memoryBlock, memoryBlock.readerPosition(), memoryBlock.readableBytes(), dataTypes());
+    return zcTuple;
+  }
+
+  public void release() {
+    memoryBlock.release();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java b/tajo-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
new file mode 100644
index 0000000..59cdef5
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.tajo.storage.Tuple;
+
+public interface RowBlockReader<T extends Tuple> {
+
+  /**
+   * Return for each tuple
+   *
+   * @return True if tuple block is filled with tuples. Otherwise, It will return false.
+   */
+  boolean next(T tuple);
+
+  void reset();
+
+  long remainForRead();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java b/tajo-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
new file mode 100644
index 0000000..5b4bd80
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
@@ -0,0 +1,26 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.memory.RowWriter;
+
+public interface TupleBuilder extends RowWriter {
+  public Tuple build();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/DirectBufTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/DirectBufTuple.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/DirectBufTuple.java
new file mode 100644
index 0000000..10e493f
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/DirectBufTuple.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import org.apache.tajo.util.Deallocatable;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class DirectBufTuple extends UnSafeTuple implements Deallocatable {
+  private MemoryBlock memoryBlock;
+
+  public DirectBufTuple(int length, DataType[] types) {
+    ByteBuffer bb = ByteBuffer.allocateDirect(length).order(ByteOrder.LITTLE_ENDIAN);
+    memoryBlock = new ResizableMemoryBlock(bb);
+
+    set(memoryBlock, 0, length, types);
+  }
+
+  @Override
+  public void release() {
+    memoryBlock.release();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/FixedSizeLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/FixedSizeLimitSpec.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/FixedSizeLimitSpec.java
new file mode 100644
index 0000000..367d90d
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/FixedSizeLimitSpec.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+/**
+ * Fixed size limit specification
+ */
+public class FixedSizeLimitSpec extends ResizableLimitSpec {
+  public FixedSizeLimitSpec(long size) {
+    super(size, size);
+  }
+
+  public FixedSizeLimitSpec(long size, float allowedOverflowRatio) {
+    super(size, size, allowedOverflowRatio);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapRowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapRowBlockReader.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapRowBlockReader.java
new file mode 100644
index 0000000..dd377cf
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapRowBlockReader.java
@@ -0,0 +1,70 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.tuple.RowBlockReader;
+
+public class HeapRowBlockReader implements RowBlockReader<HeapTuple> {
+  private final DataType[] dataTypes;
+  private final MemoryBlock memoryBlock;
+  private final int rows;
+
+  // Read States
+  private int curRowIdxForRead;
+  private int curPosForRead;
+
+  public HeapRowBlockReader(MemoryRowBlock rowBlock) {
+    this(rowBlock.getMemory(), rowBlock.getDataTypes(), rowBlock.rows());
+  }
+
+  public HeapRowBlockReader(MemoryBlock memoryBlock, DataType[] dataTypes, int rows) {
+    this.dataTypes = dataTypes;
+    this.rows = rows;
+    this.memoryBlock = memoryBlock.duplicate();
+  }
+
+  public long remainForRead() {
+    return memoryBlock.readableBytes();
+  }
+
+  @Override
+  public boolean next(HeapTuple tuple) {
+    if (curRowIdxForRead < rows) {
+
+      int recordLen = memoryBlock.getInt(curPosForRead);
+      tuple.set(memoryBlock, curPosForRead, recordLen, dataTypes);
+
+      curPosForRead += recordLen;
+      curRowIdxForRead++;
+      memoryBlock.readerPosition(curPosForRead);
+
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public void reset() {
+    curPosForRead = 0;
+    curRowIdxForRead = 0;
+    memoryBlock.readerPosition(curPosForRead);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java
new file mode 100644
index 0000000..5d2fdc9
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java
@@ -0,0 +1,300 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.StringUtils;
+import org.apache.tajo.util.datetime.TimeMeta;
+
+import java.nio.ByteOrder;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class HeapTuple extends ZeroCopyTuple implements Cloneable {
+  private ByteBuf buffer;
+  private DataType[] types;
+
+  @Override
+  public void set(MemoryBlock memoryBlock, int relativePos, int length, DataType[] types) {
+    this.buffer = memoryBlock.getBuffer();
+    this.types = types;
+    super.set(relativePos, length);
+  }
+
+  protected void set(final byte[] bytes, final DataType[] types) {
+    this.buffer = Unpooled.wrappedBuffer(bytes).order(ByteOrder.LITTLE_ENDIAN);
+    this.types = types;
+    super.set(0, bytes.length);
+  }
+
+  @Override
+  public int size() {
+    return types.length;
+  }
+
+  @Override
+  public TajoDataTypes.Type type(int fieldId) {
+    return types[fieldId].getType();
+  }
+
+  @Override
+  public int size(int fieldId) {
+    return buffer.getInt(checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public void clearOffset() {
+  }
+
+  private int getFieldOffset(int fieldId) {
+    return buffer.getInt(getRelativePos() + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
+  }
+
+  private int checkNullAndGetOffset(int fieldId) {
+    int offset = getFieldOffset(fieldId);
+    if (offset == MemoryRowBlock.NULL_FIELD_OFFSET) {
+      throw new RuntimeException("Invalid Field Access: " + fieldId);
+    }
+    return offset + getRelativePos();
+  }
+
+  @Override
+  public boolean contains(int fieldid) {
+    return getFieldOffset(fieldid) > MemoryRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public boolean isBlank(int fieldid) {
+    return getFieldOffset(fieldid) == MemoryRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public boolean isBlankOrNull(int fieldid) {
+    return getFieldOffset(fieldid) == MemoryRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public void put(int fieldId, Tuple tuple) {
+    throw new TajoRuntimeException(new UnsupportedException());
+  }
+
+  @Override
+  public void clear() {
+    // nothing to do
+  }
+
+  @Override
+  public void put(int fieldId, Datum value) {
+    throw new TajoRuntimeException(new UnsupportedException());
+  }
+
+  @Override
+  public void put(Datum[] values) {
+    throw new TajoRuntimeException(new UnsupportedException());
+  }
+
+  @Override
+  public Datum asDatum(int fieldId) {
+    if (isBlankOrNull(fieldId)) {
+      return NullDatum.get();
+    }
+
+    switch (types[fieldId].getType()) {
+    case BOOLEAN:
+      return DatumFactory.createBool(getBool(fieldId));
+    case BIT:
+      return DatumFactory.createBit(getByte(fieldId));
+    case INT1:
+    case INT2:
+      return DatumFactory.createInt2(getInt2(fieldId));
+    case INT4:
+      return DatumFactory.createInt4(getInt4(fieldId));
+    case INT8:
+      return DatumFactory.createInt8(getInt8(fieldId));
+    case FLOAT4:
+      return DatumFactory.createFloat4(getFloat4(fieldId));
+    case FLOAT8:
+      return DatumFactory.createFloat8(getFloat8(fieldId));
+    case CHAR:
+      return DatumFactory.createChar(getBytes(fieldId));
+    case TEXT:
+      return DatumFactory.createText(getBytes(fieldId));
+    case BLOB :
+      return DatumFactory.createBlob(getBytes(fieldId));
+    case TIMESTAMP:
+      return DatumFactory.createTimestamp(getInt8(fieldId));
+    case DATE:
+      return DatumFactory.createDate(getInt4(fieldId));
+    case TIME:
+      return DatumFactory.createTime(getInt8(fieldId));
+    case INTERVAL:
+      return getInterval(fieldId);
+    case INET4:
+      return DatumFactory.createInet4(getInt4(fieldId));
+    case PROTOBUF:
+      return getProtobufDatum(fieldId);
+    case NULL_TYPE:
+      return NullDatum.get();
+    default:
+      throw new TajoRuntimeException(new UnsupportedException("data type '" + types[fieldId] + "'"));
+    }
+  }
+
+  @Override
+  public void setOffset(long offset) {
+  }
+
+  @Override
+  public long getOffset() {
+    return 0;
+  }
+
+  @Override
+  public boolean getBool(int fieldId) {
+    return buffer.getByte(checkNullAndGetOffset(fieldId)) == 0x01;
+  }
+
+  @Override
+  public byte getByte(int fieldId) {
+    return buffer.getByte(checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public char getChar(int fieldId) {
+    return buffer.getChar(checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public byte[] getBytes(int fieldId) {
+    int pos = checkNullAndGetOffset(fieldId);
+    int len = buffer.getInt(pos);
+
+    byte [] bytes = new byte[len];
+    buffer.getBytes(pos + SizeOf.SIZE_OF_INT, bytes);
+    return bytes;
+  }
+
+  @Override
+  public byte[] getTextBytes(int fieldId) {
+    return asDatum(fieldId).asTextBytes();
+  }
+
+  @Override
+  public short getInt2(int fieldId) {
+    return buffer.getShort(checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public int getInt4(int fieldId) {
+    return buffer.getInt(checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public long getInt8(int fieldId) {
+    return buffer.getLong(checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public float getFloat4(int fieldId) {
+    return buffer.getFloat(checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public double getFloat8(int fieldId) {
+    return buffer.getDouble(checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public String getText(int fieldId) {
+    return new String(getBytes(fieldId), TextDatum.DEFAULT_CHARSET);
+  }
+
+  @Override
+  public TimeMeta getTimeDate(int fieldId) {
+    return asDatum(fieldId).asTimeMeta();
+  }
+
+  public IntervalDatum getInterval(int fieldId) {
+    int pos = checkNullAndGetOffset(fieldId);
+    int months = buffer.getInt(pos);
+    long millisecs = buffer.getLong(pos + SizeOf.SIZE_OF_INT);
+    return new IntervalDatum(months, millisecs);
+  }
+
+  @Override
+  public Datum getProtobufDatum(int fieldId) {
+    byte [] bytes = getBytes(fieldId);
+
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode());
+    Message.Builder builder = factory.newBuilder();
+    try {
+      builder.mergeFrom(bytes);
+    } catch (InvalidProtocolBufferException e) {
+      return NullDatum.get();
+    }
+
+    return new ProtobufDatum(builder.build());
+  }
+
+  @Override
+  public char[] getUnicodeChars(int fieldId) {
+    int pos = checkNullAndGetOffset(fieldId);
+    int len = buffer.getInt(pos);
+
+    byte [] bytes = new byte[len];
+    buffer.getBytes(pos + SizeOf.SIZE_OF_INT, bytes);
+    return StringUtils.convertBytesToChars(bytes, TextDatum.DEFAULT_CHARSET);
+  }
+
+  @Override
+  public Datum[] getValues() {
+    Datum [] datums = new Datum[size()];
+    for (int i = 0; i < size(); i++) {
+      if (contains(i)) {
+        datums[i] = asDatum(i);
+      } else {
+        datums[i] = NullDatum.get();
+      }
+    }
+    return datums;
+  }
+
+  @Override
+  public String toString() {
+    return VTuple.toDisplayString(getValues());
+  }
+
+  @Override
+  public Tuple clone() throws CloneNotSupportedException {
+    HeapTuple heapTuple = (HeapTuple) super.clone();
+    heapTuple.buffer = buffer.copy(getRelativePos(), getLength());
+    heapTuple.relativePos = 0;
+    return heapTuple;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryBlock.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryBlock.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryBlock.java
new file mode 100644
index 0000000..cc4536d
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryBlock.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.tajo.util.Deallocatable;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+
+/**
+ * This interface provides a memory spec for off-heap or heap
+ */
+public interface MemoryBlock extends Deallocatable {
+
+  /**
+   * Current memory address of this buffer
+   *
+   * @return the memory address
+   */
+  long address();
+
+  /**
+   * @return if true, the buffer has a reference to the low-level memory address
+   */
+  boolean hasAddress();
+
+  /**
+   * @return the number of bytes this buffer can contain.
+   */
+  int capacity();
+
+  /**
+   * reset the buffer
+   */
+  void clear();
+
+  /**
+   * @return true if this buffer has an remaining bytes.
+   */
+  boolean isReadable();
+
+  /**
+   * @return the number of readable bytes in this buffer
+   */
+  int readableBytes();
+
+  /**
+   * @return the current reader position of this buffer
+   */
+  int readerPosition();
+
+  /**
+   * Sets the reader position of this buffer
+   */
+  void readerPosition(int pos);
+
+  /**
+   * @return true if this buffer is not full filled
+   */
+  boolean isWritable();
+
+  /**
+   * @return the number of writable bytes in this buffer
+   */
+  int writableBytes();
+
+  /**
+   * @return the current writer position of this buffer
+   */
+  int writerPosition();
+
+  /**
+   * Sets the writer position of this buffer
+   */
+  void writerPosition(int pos);
+
+  /**
+   * Ensure that this buffer has enough remaining space to add the capacity.
+   * Creates and copies to a new buffer if necessary
+   *
+   * @param size Size to add
+   */
+  void ensureSize(int size);
+
+  /**
+   * Transfers the content of the channel to this buffer
+   * @param in the input channel
+   * @return the actual number of bytes read in channel
+   */
+  int writeBytes(ScatteringByteChannel in) throws IOException;
+
+  /**
+   * Transfers the content of this buffer to the byte array
+   * @param dst the destination byte array
+   * @param dstIndex the first index of the destination
+   * @param length   the number of bytes to transfer
+   * @return the actual number of bytes transfers to the destination byte array
+   */
+  int getBytes(byte[] dst, int dstIndex, int length) throws IOException;
+
+  /**
+   * This method does not modify {@code readerPosition} or {@code writerPosition} of this buffer.
+   * @return a 32-bit integer in this buffer
+   */
+  int getInt(int index);
+
+  /**
+   * Transfers the content of this buffer to the channel
+   * @param out the output channel
+   * @param length the maximum number of bytes to transfer
+   * @return the actual number of bytes transfers to the channel
+   */
+  int writeTo(GatheringByteChannel out, int length) throws IOException;
+
+  int writeTo(GatheringByteChannel out) throws IOException;
+
+  /**
+   * Transfers the content of this buffer to the stream
+   * @param out the output stream
+   * @param length the maximum number of bytes to transfer
+   * @return the actual number of bytes transfers to the stream
+   */
+  int writeTo(OutputStream out, int length) throws IOException;
+
+  int writeTo(OutputStream out) throws IOException;
+
+  /**
+   * @return a MemoryBlock which shares the whole region of this.
+   */
+  MemoryBlock duplicate();
+
+  /**
+   * @return a internal buffer
+   */
+  ByteBuf getBuffer();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryRowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryRowBlock.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryRowBlock.java
new file mode 100644
index 0000000..922fc68
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/MemoryRowBlock.java
@@ -0,0 +1,174 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import io.netty.util.internal.PlatformDependent;
+import org.apache.tajo.exception.NotImplementedException;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.tuple.RowBlockReader;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.nio.channels.ScatteringByteChannel;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class MemoryRowBlock implements RowBlock, Deallocatable {
+  public static final int NULL_FIELD_OFFSET = -1;
+
+  private DataType[] dataTypes;
+
+  // Basic States
+  private int maxRowNum = Integer.MAX_VALUE; // optional
+  private int rowNum;
+
+  private RowWriter builder;
+  private MemoryBlock memory;
+
+  public MemoryRowBlock(DataType[] dataTypes, ResizableLimitSpec limitSpec, boolean isDirect) {
+    this.memory = new ResizableMemoryBlock(limitSpec, isDirect);
+    this.dataTypes = dataTypes;
+  }
+
+  public MemoryRowBlock(MemoryRowBlock rowBlock) {
+    this.memory = TUtil.checkTypeAndGet(rowBlock.getMemory().duplicate(), ResizableMemoryBlock.class);
+    this.rowNum = rowBlock.rowNum;
+    this.dataTypes = rowBlock.dataTypes;
+  }
+
+  public MemoryRowBlock(MemoryBlock memory, DataType[] dataTypes, int rowNum) {
+    this.memory = memory;
+    this.rowNum = rowNum;
+    this.dataTypes = dataTypes;
+  }
+
+  public MemoryRowBlock(DataType[] dataTypes) {
+    this(dataTypes, new ResizableLimitSpec(64 * StorageUnit.KB), true);
+  }
+
+  public MemoryRowBlock(DataType[] dataTypes, int bytes) {
+    this(dataTypes, new ResizableLimitSpec(bytes), true);
+  }
+
+  public MemoryRowBlock(DataType[] dataTypes, int bytes, boolean isDirect) {
+    this(dataTypes, new ResizableLimitSpec(bytes), isDirect);
+  }
+
+  @Override
+  public void clear() {
+    reset();
+    memory.clear();
+  }
+
+  private void reset() {
+    rowNum = 0;
+    if (builder != null) {
+      builder.clear();
+    }
+  }
+
+  @Override
+  public int capacity() {
+    return memory.capacity();
+  }
+
+  public int maxRowNum() {
+    return maxRowNum;
+  }
+
+  @Override
+  public int rows() {
+    return rowNum;
+  }
+
+  @Override
+  public void setRows(int rowNum) {
+    this.rowNum = rowNum;
+  }
+
+  @Override
+  public DataType[] getDataTypes() {
+    return dataTypes;
+  }
+
+  @Override
+  public boolean copyFromChannel(ScatteringByteChannel channel) throws IOException {
+    reset();
+
+    int readBytes = memory.writeBytes(channel);
+
+    if (readBytes > 0) {
+      // get row capacity in buffer
+      while (memory.isReadable()) {
+        if (memory.readableBytes() < SizeOf.SIZE_OF_INT) {
+          return true;
+        }
+
+        int recordSize = PlatformDependent.getInt(memory.address() + memory.readerPosition());
+        assert recordSize > 0;
+        if (memory.readableBytes() < recordSize) {
+          return true;
+        } else {
+          memory.readerPosition(memory.readerPosition() + recordSize);
+        }
+
+        rowNum++;
+      }
+
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public RowWriter getWriter() {
+
+    if (builder == null) {
+      if (!getMemory().hasAddress()) {
+        throw new TajoInternalError(new NotImplementedException("Heap memory writer not implemented yet"));
+      } else {
+        this.builder = new OffHeapRowBlockWriter(this);
+      }
+    }
+    return builder;
+  }
+
+  @Override
+  public MemoryBlock getMemory() {
+    return memory;
+  }
+
+  @Override
+  public void release() {
+    memory.release();
+  }
+
+  @Override
+  public RowBlockReader getReader() {
+    if (!getMemory().hasAddress()) {
+      return new HeapRowBlockReader(this);
+    } else {
+      return new OffHeapRowBlockReader(this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockReader.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockReader.java
new file mode 100644
index 0000000..ccaeffc
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockReader.java
@@ -0,0 +1,77 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import io.netty.util.internal.PlatformDependent;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.tuple.RowBlockReader;
+
+public class OffHeapRowBlockReader implements RowBlockReader<ZeroCopyTuple> {
+  private final DataType[] dataTypes;
+  private final MemoryBlock memoryBlock;
+  private final int rows;
+
+  // Read States
+  private int curRowIdxForRead;
+  private int curPosForRead;
+
+  public OffHeapRowBlockReader(MemoryRowBlock rowBlock) {
+    this(rowBlock.getMemory(), rowBlock.getDataTypes(), rowBlock.rows());
+  }
+
+  public OffHeapRowBlockReader(MemoryBlock memoryBlock, DataType[] dataTypes, int rows) {
+    this.memoryBlock = memoryBlock;
+    this.dataTypes = dataTypes;
+    this.rows = rows;
+    if (!memoryBlock.hasAddress()) {
+      throw new TajoInternalError(memoryBlock.getClass().getSimpleName()
+          + " does not support to direct memory access");
+    }
+  }
+
+  public long remainForRead() {
+    return memoryBlock.readableBytes();
+  }
+
+  @Override
+  public boolean next(ZeroCopyTuple tuple) {
+    if (curRowIdxForRead < rows) {
+
+      long recordStartPtr = memoryBlock.address() + curPosForRead;
+      int recordLen = PlatformDependent.getInt(recordStartPtr);
+      tuple.set(memoryBlock, curPosForRead, recordLen, dataTypes);
+
+      curPosForRead += recordLen;
+      curRowIdxForRead++;
+      memoryBlock.readerPosition(curPosForRead);
+
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public void reset() {
+    curPosForRead = 0;
+    curRowIdxForRead = 0;
+    memoryBlock.readerPosition(curPosForRead);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java
new file mode 100644
index 0000000..e8f219c
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import com.google.common.collect.Lists;
+import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.RowBlockReader;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+public class OffHeapRowBlockUtils {
+
+  public static List<Tuple> sort(MemoryRowBlock rowBlock, Comparator<Tuple> comparator) {
+    List<Tuple> tupleList = Lists.newArrayList();
+
+    ZeroCopyTuple zcTuple;
+    if(rowBlock.getMemory().hasAddress()) {
+      zcTuple = new UnSafeTuple();
+    } else {
+      zcTuple = new HeapTuple();
+    }
+
+    RowBlockReader reader = rowBlock.getReader();
+    while(reader.next(zcTuple)) {
+      tupleList.add(zcTuple);
+
+      if(rowBlock.getMemory().hasAddress()) {
+        zcTuple = new UnSafeTuple();
+      } else {
+        zcTuple = new HeapTuple();
+      }
+    }
+    Collections.sort(tupleList, comparator);
+    return tupleList;
+  }
+
+  public static Tuple[] sortToArray(MemoryRowBlock rowBlock, Comparator<Tuple> comparator) {
+    Tuple[] tuples = new Tuple[rowBlock.rows()];
+
+    ZeroCopyTuple zcTuple;
+    if(rowBlock.getMemory().hasAddress()) {
+      zcTuple = new UnSafeTuple();
+    } else {
+      zcTuple = new HeapTuple();
+    }
+
+    RowBlockReader reader = rowBlock.getReader();
+    for (int i = 0; i < rowBlock.rows() && reader.next(zcTuple); i++) {
+      tuples[i] = zcTuple;
+      if(rowBlock.getMemory().hasAddress()) {
+        zcTuple = new UnSafeTuple();
+      } else {
+        zcTuple = new HeapTuple();
+      }
+    }
+    Arrays.sort(tuples, comparator);
+    return tuples;
+  }
+
+  public static void convert(Tuple tuple, RowWriter writer) {
+    writer.startRow();
+
+    for (int i = 0; i < writer.dataTypes().length; i++) {
+      if (tuple.isBlankOrNull(i)) {
+        writer.skipField();
+        continue;
+      }
+      switch (writer.dataTypes()[i].getType()) {
+      case BOOLEAN:
+        writer.putBool(tuple.getBool(i));
+        break;
+      case BIT:
+        writer.putByte(tuple.getByte(i));
+        break;
+      case INT1:
+      case INT2:
+        writer.putInt2(tuple.getInt2(i));
+        break;
+      case INT4:
+      case DATE:
+      case INET4:
+        writer.putInt4(tuple.getInt4(i));
+        break;
+      case INT8:
+      case TIMESTAMP:
+      case TIME:
+        writer.putInt8(tuple.getInt8(i));
+        break;
+      case FLOAT4:
+        writer.putFloat4(tuple.getFloat4(i));
+        break;
+      case FLOAT8:
+        writer.putFloat8(tuple.getFloat8(i));
+        break;
+      case CHAR:
+      case TEXT:
+        writer.putText(tuple.getBytes(i));
+        break;
+      case BLOB:
+        writer.putBlob(tuple.getBytes(i));
+        break;
+      case INTERVAL:
+        writer.putInterval((IntervalDatum) tuple.getInterval(i));
+        break;
+      case PROTOBUF:
+        writer.putProtoDatum((ProtobufDatum) tuple.getProtobufDatum(i));
+        break;
+      case NULL_TYPE:
+        writer.skipField();
+        break;
+      default:
+        throw new TajoRuntimeException(
+            new UnsupportedException("unknown data type '" + writer.dataTypes()[i].getType().name() + "'"));
+      }
+    }
+    writer.endRow();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java
new file mode 100644
index 0000000..6832730
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.exception.TajoInternalError;
+
+public class OffHeapRowBlockWriter extends OffHeapRowWriter {
+  private RowBlock rowBlock;
+
+  OffHeapRowBlockWriter(RowBlock rowBlock) {
+    super(rowBlock.getDataTypes());
+    this.rowBlock = rowBlock;
+    if (!rowBlock.getMemory().hasAddress()) {
+      throw new TajoInternalError(rowBlock.getMemory().getClass().getSimpleName()
+          + " does not support to direct memory access");
+    }
+  }
+
+  public long address() {
+    return rowBlock.getMemory().address();
+  }
+
+  public int position() {
+    return rowBlock.getMemory().writerPosition();
+  }
+
+  @Override
+  public void forward(int length) {
+    rowBlock.getMemory().writerPosition(rowBlock.getMemory().writerPosition() + length);
+  }
+
+  public void ensureSize(int size) {
+    rowBlock.getMemory().ensureSize(size);
+  }
+
+  @Override
+  public void endRow() {
+    super.endRow();
+    rowBlock.setRows(rowBlock.rows() + 1);
+  }
+
+  @Override
+  public TajoDataTypes.DataType[] dataTypes() {
+    return rowBlock.getDataTypes();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java
new file mode 100644
index 0000000..c9b233f
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import io.netty.util.internal.PlatformDependent;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.TUtil;
+
+/**
+ *
+ * Row Record Structure
+ *
+ * | row length (4 bytes) | field 1 offset | field 2 offset | ... | field N offset| field 1 | field 2| ... | field N |
+ *                              4 bytes          4 bytes               4 bytes
+ *
+ */
+public abstract class OffHeapRowWriter implements RowWriter {
+  /** record capacity + offset list */
+  private final int headerSize;
+
+  private final DataType[] dataTypes;
+
+  private int curFieldIdx;
+  private int curFieldOffset;
+  private int curOffset;
+
+  public OffHeapRowWriter(final DataType[] dataTypes) {
+    this.dataTypes = dataTypes;
+    this.headerSize = SizeOf.SIZE_OF_INT * (dataTypes.length + 1);
+    this.curFieldOffset = SizeOf.SIZE_OF_INT;
+  }
+
+  /**
+   * Current memory address of the row
+   *
+   * @return The memory address
+   */
+  public long recordStartAddr() {
+    return currentAddr() - curOffset;
+  }
+
+  /**
+   * Memory address that point to the first byte of the buffer
+   *
+   * @return The memory address
+   */
+  private long currentAddr() {
+    return address() + position();
+  }
+
+  /**
+   * Current memory address of the buffer
+   *
+   * @return The memory address
+   */
+  public abstract long address();
+
+  public abstract void ensureSize(int size);
+
+  public int offset() {
+    return position();
+  }
+
+  /**
+   * Current position
+   *
+   * @return The position
+   */
+  public abstract int position();
+
+  /**
+   * Forward the address;
+   *
+   * @param length Length to be forwarded
+   */
+  public abstract void forward(int length);
+
+
+  @Override
+  public void clear() {
+    curOffset = 0;
+    curFieldIdx = 0;
+    curFieldOffset = SizeOf.SIZE_OF_INT;
+  }
+
+  @Override
+  public DataType[] dataTypes() {
+    return dataTypes;
+  }
+
+  @Override
+  public boolean startRow() {
+    ensureSize(headerSize);
+    curOffset = headerSize;
+    curFieldOffset = SizeOf.SIZE_OF_INT;
+    curFieldIdx = 0;
+    forward(headerSize);
+    return true;
+  }
+
+  @Override
+  public void endRow() {
+    long rowHeaderPos = recordStartAddr();
+    // curOffset is equivalent to a byte length of this row.
+    PlatformDependent.putInt(rowHeaderPos, curOffset);
+
+    //forward (record offset + fields offset)
+    rowHeaderPos += SizeOf.SIZE_OF_INT + curFieldOffset;
+    // set remain header field length
+    for (int i = curFieldIdx; i < dataTypes.length; i++) {
+      PlatformDependent.putInt(rowHeaderPos, MemoryRowBlock.NULL_FIELD_OFFSET);
+      rowHeaderPos += SizeOf.SIZE_OF_INT;
+    }
+  }
+
+  @Override
+  public void skipField() {
+    // set header field length
+    putFieldHeader(currentAddr(), MemoryRowBlock.NULL_FIELD_OFFSET);
+  }
+
+  /**
+   * set current buffer position and forward field length
+   * @param fieldLength
+   */
+  private void forwardField(int fieldLength) {
+    forward(fieldLength);
+    curOffset += fieldLength;
+
+  }
+
+  private void putFieldHeader(long currentAddr, int length) {
+    long currentHeaderAddr = currentAddr - curOffset + curFieldOffset;
+
+    // set header field length
+    PlatformDependent.putInt(currentHeaderAddr, length);
+    curFieldOffset += SizeOf.SIZE_OF_INT;
+    curFieldIdx++;
+  }
+
+  @Override
+  public void putByte(byte val) {
+    ensureSize(SizeOf.SIZE_OF_BYTE);
+    long addr = currentAddr();
+
+    PlatformDependent.putByte(addr, val);
+    putFieldHeader(addr, curOffset);
+    forwardField(SizeOf.SIZE_OF_BYTE);
+  }
+
+  @Override
+  public void putBool(boolean val) {
+    ensureSize(SizeOf.SIZE_OF_BOOL);
+    long addr = currentAddr();
+
+    PlatformDependent.putByte(addr, (byte) (val ? 0x01 : 0x00));
+    putFieldHeader(addr, curOffset);
+    forwardField(SizeOf.SIZE_OF_BOOL);
+  }
+
+  @Override
+  public void putInt2(short val) {
+    ensureSize(SizeOf.SIZE_OF_SHORT);
+    long addr = currentAddr();
+
+    PlatformDependent.putShort(addr, val);
+    putFieldHeader(addr, curOffset);
+    forwardField(SizeOf.SIZE_OF_SHORT);
+  }
+
+  @Override
+  public void putInt4(int val) {
+    ensureSize(SizeOf.SIZE_OF_INT);
+    long addr = currentAddr();
+
+    PlatformDependent.putInt(addr, val);
+    putFieldHeader(addr, curOffset);
+    forwardField(SizeOf.SIZE_OF_INT);
+  }
+
+  @Override
+  public void putInt8(long val) {
+    ensureSize(SizeOf.SIZE_OF_LONG);
+    long addr = currentAddr();
+
+    PlatformDependent.putLong(addr, val);
+    putFieldHeader(addr, curOffset);
+    forwardField(SizeOf.SIZE_OF_LONG);
+  }
+
+  @Override
+  public void putFloat4(float val) {
+    ensureSize(SizeOf.SIZE_OF_INT);
+    long addr = currentAddr();
+
+    PlatformDependent.putInt(addr, Float.floatToRawIntBits(val));
+    putFieldHeader(addr, curOffset);
+    forwardField(SizeOf.SIZE_OF_INT);
+  }
+
+  @Override
+  public void putFloat8(double val) {
+    ensureSize(SizeOf.SIZE_OF_LONG);
+    long addr = currentAddr();
+
+    PlatformDependent.putLong(addr, Double.doubleToRawLongBits(val));
+    putFieldHeader(addr, curOffset);
+    forwardField(SizeOf.SIZE_OF_LONG);
+  }
+
+  @Override
+  public void putText(String val) {
+    byte[] bytes = val.getBytes(TextDatum.DEFAULT_CHARSET);
+    putText(bytes);
+  }
+
+  @Override
+  public void putText(byte[] val) {
+    putBlob(val);
+  }
+
+  @Override
+  public void putBlob(byte[] val) {
+    int bytesLen = val.length;
+    int fieldLen = SizeOf.SIZE_OF_INT + bytesLen;
+
+    ensureSize(fieldLen);
+    long addr = currentAddr();
+
+    PlatformDependent.putInt(addr, bytesLen);
+    PlatformDependent.copyMemory(val, 0, addr + SizeOf.SIZE_OF_INT, bytesLen);
+    putFieldHeader(addr, curOffset);
+    forwardField(fieldLen);
+  }
+
+  @Override
+  public void putTimestamp(long val) {
+    putInt8(val);
+  }
+
+  @Override
+  public void putDate(int val) {
+    putInt4(val);
+  }
+
+  @Override
+  public void putTime(long val) {
+    putInt8(val);
+  }
+
+  @Override
+  public void putInterval(IntervalDatum val) {
+    ensureSize(SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG);
+    long addr = currentAddr();
+
+    PlatformDependent.putInt(addr, val.getMonths());
+    PlatformDependent.putLong(addr + SizeOf.SIZE_OF_INT, val.getMilliSeconds());
+    putFieldHeader(addr, curOffset);
+    forwardField(SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG);
+  }
+
+  @Override
+  public void putInet4(int val) {
+    putInt4(val);
+  }
+
+  @Override
+  public void putProtoDatum(ProtobufDatum val) {
+    putBlob(val.asByteArray());
+  }
+
+  @Override
+  public void addTuple(Tuple tuple) {
+    if (tuple instanceof UnSafeTuple) {
+      UnSafeTuple unSafeTuple = TUtil.checkTypeAndGet(tuple, UnSafeTuple.class);
+      int length = unSafeTuple.getLength();
+      ensureSize(length);
+      PlatformDependent.copyMemory(unSafeTuple.address(), address() + position(), length);
+      forward(length);
+    } else {
+      OffHeapRowBlockUtils.convert(tuple, this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableLimitSpec.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableLimitSpec.java
new file mode 100644
index 0000000..614b3fb
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableLimitSpec.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.FileUtil;
+
+/**
+ * It specifies the maximum size or increasing ratio. In addition,
+ * it guarantees that all numbers are less than or equal to Integer.MAX_VALUE 2^31
+ * due to ByteBuffer.
+ */
+public class ResizableLimitSpec {
+  private final Log LOG = LogFactory.getLog(ResizableLimitSpec.class);
+
+  public static final int MAX_SIZE_BYTES = Integer.MAX_VALUE;
+  public static final ResizableLimitSpec DEFAULT_LIMIT = new ResizableLimitSpec(Integer.MAX_VALUE);
+
+  private final long initSize;
+  private final long limitBytes;
+  private final float incRatio;
+  private final float allowedOVerflowRatio;
+  private final static float DEFAULT_ALLOWED_OVERFLOW_RATIO = 0.1f;
+  private final static float DEFAULT_INCREASE_RATIO = 1.0f;
+
+  public ResizableLimitSpec(long initSize) {
+    this(initSize, MAX_SIZE_BYTES, DEFAULT_ALLOWED_OVERFLOW_RATIO);
+  }
+
+  public ResizableLimitSpec(long initSize, long limitBytes) {
+    this(initSize, limitBytes, DEFAULT_ALLOWED_OVERFLOW_RATIO);
+  }
+
+  public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflow) {
+    this(initSize, limitBytes, allowedOverflow, DEFAULT_INCREASE_RATIO);
+  }
+
+  public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflowRatio, float incRatio) {
+    Preconditions.checkArgument(initSize > 0, "initial size must be greater than 0 bytes.");
+    Preconditions.checkArgument(initSize <= MAX_SIZE_BYTES, "The maximum initial size is 2GB.");
+    Preconditions.checkArgument(limitBytes > 0, "The limit size must be greater than 0 bytes.");
+    Preconditions.checkArgument(limitBytes <= MAX_SIZE_BYTES, "The maximum limit size is 2GB.");
+    Preconditions.checkArgument(incRatio > 0.0f, "Increase Ratio must be greater than 0.");
+
+    if (initSize == limitBytes) {
+      long overflowedSize = (long) (initSize + (initSize * allowedOverflowRatio));
+
+      if (overflowedSize > Integer.MAX_VALUE) {
+        overflowedSize = Integer.MAX_VALUE;
+      }
+
+      this.initSize = overflowedSize;
+      this.limitBytes = overflowedSize;
+    } else {
+      this.initSize = initSize;
+      limitBytes = (long) (limitBytes + (limitBytes * allowedOverflowRatio));
+
+      if (limitBytes > Integer.MAX_VALUE) {
+        this.limitBytes = Integer.MAX_VALUE;
+      } else {
+        this.limitBytes = limitBytes;
+      }
+    }
+
+    this.allowedOVerflowRatio = allowedOverflowRatio;
+    this.incRatio = incRatio;
+  }
+
+  public long initialSize() {
+    return initSize;
+  }
+
+  public long limit() {
+    return limitBytes;
+  }
+
+  public float remainRatio(long currentSize) {
+    Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
+    if (currentSize > Integer.MAX_VALUE) {
+      currentSize = Integer.MAX_VALUE;
+    }
+    return (float)currentSize / (float)limitBytes;
+  }
+
+  public boolean canIncrease(long currentSize) {
+    return remain(currentSize) > 0;
+  }
+
+  public long remain(long currentSize) {
+    Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
+    return limitBytes > Integer.MAX_VALUE ? Integer.MAX_VALUE - currentSize : limitBytes - currentSize;
+  }
+
+  public int increasedSize(int currentSize) {
+    if (currentSize < initSize) {
+      return (int) initSize;
+    }
+
+    if (currentSize > Integer.MAX_VALUE) {
+      LOG.warn("Current size already exceeds the maximum size (" + Integer.MAX_VALUE + " bytes)");
+      return Integer.MAX_VALUE;
+    }
+    long nextSize = (long) (currentSize + ((float) currentSize * incRatio));
+
+    if (nextSize > limitBytes) {
+      LOG.info("Increasing reaches size limit (" + FileUtil.humanReadableByteCount(limitBytes, false) + ")");
+      nextSize = limitBytes;
+    }
+
+    if (nextSize > Integer.MAX_VALUE) {
+      LOG.info("Increasing reaches maximum size (" + FileUtil.humanReadableByteCount(Integer.MAX_VALUE, false) + ")");
+      nextSize = Integer.MAX_VALUE;
+    }
+
+    return (int) nextSize;
+  }
+
+  @Override
+  public String toString() {
+    return "init=" + FileUtil.humanReadableByteCount(initSize, false) + ",limit="
+        + FileUtil.humanReadableByteCount(limitBytes, false) + ",overflow_ratio=" + allowedOVerflowRatio
+        + ",inc_ratio=" + incRatio;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java
new file mode 100644
index 0000000..22c2561
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.storage.BufferPool;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.UnsafeUtil;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+
+public class ResizableMemoryBlock implements MemoryBlock {
+  private static final Log LOG = LogFactory.getLog(ResizableMemoryBlock.class);
+
+  protected ByteBuf buffer;
+  protected ResizableLimitSpec limitSpec;
+
+  public ResizableMemoryBlock(ByteBuf buffer, ResizableLimitSpec limitSpec) {
+    this.buffer = buffer.order(ByteOrder.LITTLE_ENDIAN);
+    this.limitSpec = limitSpec;
+  }
+
+  public ResizableMemoryBlock(ByteBuf buffer) {
+    this(buffer, new ResizableLimitSpec(buffer.capacity()));
+  }
+
+  public ResizableMemoryBlock(ByteBuffer buffer) {
+    this.buffer = Unpooled.wrappedBuffer(buffer).order(ByteOrder.LITTLE_ENDIAN);
+    this.limitSpec = new ResizableLimitSpec(buffer.capacity());
+  }
+
+  public ResizableMemoryBlock(ResizableLimitSpec limitSpec, boolean isDirect) {
+    if (isDirect) {
+      this.buffer = BufferPool.directBuffer((int) limitSpec.initialSize(), (int) limitSpec.limit());
+    } else {
+      this.buffer = BufferPool.heapBuffer((int) limitSpec.initialSize(), (int) limitSpec.limit());
+    }
+    this.limitSpec = limitSpec;
+  }
+
+  @Override
+  public long address() {
+    return buffer.memoryAddress();
+  }
+
+  @Override
+  public boolean hasAddress() {
+    return buffer.hasMemoryAddress();
+  }
+
+  @Override
+  public int capacity() {
+    return buffer.capacity();
+  }
+
+  @Override
+  public void clear() {
+    buffer.clear();
+  }
+
+  @Override
+  public boolean isReadable() {
+    return buffer.isReadable();
+  }
+
+  @Override
+  public int readableBytes() {
+    return buffer.readableBytes();
+  }
+
+  @Override
+  public int readerPosition() {
+    return buffer.readerIndex();
+  }
+
+  @Override
+  public void readerPosition(int pos) {
+    buffer.readerIndex(pos);
+  }
+
+  @Override
+  public boolean isWritable() {
+    return buffer.isWritable();
+  }
+
+  @Override
+  public int writableBytes() {
+    return buffer.writableBytes();
+  }
+
+  @Override
+  public void writerPosition(int pos) {
+    buffer.writerIndex(pos);
+  }
+
+  @Override
+  public int writerPosition() {
+    return buffer.writerIndex();
+  }
+
+
+  @Override
+  public void ensureSize(int size) {
+    if (!buffer.isWritable(size)) {
+      if (!limitSpec.canIncrease(buffer.capacity())) {
+        throw new RuntimeException("Cannot increase RowBlock anymore.");
+      }
+
+      int newBlockSize = limitSpec.increasedSize(buffer.capacity());
+      resize(newBlockSize);
+      LOG.info("Increase DirectRowBlock to " + FileUtil.humanReadableByteCount(newBlockSize, false));
+    }
+  }
+
+  private void resize(int newSize) {
+    Preconditions.checkArgument(newSize > 0, "Size must be greater than 0 bytes");
+
+    if (newSize > limitSpec.limit()) {
+      throw new RuntimeException("Resize cannot exceed the capacity limit");
+    }
+
+    if (newSize < buffer.capacity()) {
+      LOG.warn("The capacity reduction is ignored.");
+    }
+
+    int newBlockSize = UnsafeUtil.alignedSize(newSize);
+    buffer = BufferPool.ensureWritable(buffer, newBlockSize);
+  }
+
+  @Override
+  public void release() {
+    buffer.release();
+  }
+
+  @Override
+  public MemoryBlock duplicate() {
+    return new ResizableMemoryBlock(buffer.duplicate().readerIndex(0), limitSpec);
+  }
+
+  @Override
+  public ByteBuf getBuffer() {
+    return buffer;
+  }
+
+  @Override
+  public int writeBytes(ScatteringByteChannel channel) throws IOException {
+
+    if (buffer.readableBytes() > 0) {
+      this.buffer.markReaderIndex();
+      this.buffer.discardReadBytes();  // compact the buffer
+    } else {
+      buffer.clear();
+    }
+
+    int readBytes = 0;
+    while (buffer.writableBytes() > 0) {
+      int localReadBytes = buffer.writeBytes(channel, buffer.writableBytes());
+      if (localReadBytes < 0) {
+        break;
+      }
+      readBytes += localReadBytes;
+    }
+
+    return readBytes;
+  }
+
+  @Override
+  public int getBytes(byte[] bytes, int dstIndex, int length) throws IOException {
+    int readableBytes = buffer.readableBytes();
+    buffer.readBytes(bytes, dstIndex, length);
+    return readableBytes - buffer.readableBytes();
+  }
+
+  @Override
+  public int getInt(int index) {
+    return buffer.getInt(index);
+  }
+
+  @Override
+  public int writeTo(GatheringByteChannel channel, int length) throws IOException {
+    return buffer.readBytes(channel, length);
+  }
+
+  @Override
+  public int writeTo(GatheringByteChannel channel) throws IOException {
+    return buffer.readBytes(channel, buffer.readableBytes());
+  }
+
+  @Override
+  public int writeTo(OutputStream outputStream, int length) throws IOException {
+    buffer.readBytes(outputStream, length);
+    return length;
+  }
+
+  @Override
+  public int writeTo(OutputStream outputStream) throws IOException {
+    int readableBytes = buffer.readableBytes();
+    buffer.readBytes(outputStream, readableBytes);
+    return readableBytes - buffer.readableBytes();
+  }
+
+  @Override
+  public String toString() {
+    return "memory=" + FileUtil.humanReadableByteCount(capacity(), false) + "," + limitSpec;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowBlock.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowBlock.java
new file mode 100644
index 0000000..68902fb
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowBlock.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.tuple.RowBlockReader;
+
+import java.io.IOException;
+import java.nio.channels.ScatteringByteChannel;
+
+public interface RowBlock {
+
+  void clear();
+
+  int capacity();
+
+  void setRows(int rowNum);
+
+  int rows();
+
+  TajoDataTypes.DataType[] getDataTypes();
+
+  RowBlockReader getReader();
+
+  RowWriter getWriter();
+
+  MemoryBlock getMemory();
+
+  void release();
+
+  boolean copyFromChannel(ScatteringByteChannel channel) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowWriter.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowWriter.java
new file mode 100644
index 0000000..0393714
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowWriter.java
@@ -0,0 +1,80 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.storage.Tuple;
+
+/**
+ * The call sequence should be as follows:
+ *
+ * <pre>
+ *   startRow() -->  skipField() or putXXX --> endRow()
+ * </pre>
+ *
+ * The total number of skipField and putXXX invocations must be equivalent to the number of fields.
+ */
+public interface RowWriter {
+
+  TajoDataTypes.DataType [] dataTypes();
+
+  boolean startRow();
+
+  void endRow();
+
+  void skipField();
+
+  void clear();
+
+  void putByte(byte val);
+
+  void putBool(boolean val);
+
+  void putInt2(short val);
+
+  void putInt4(int val);
+
+  void putInt8(long val);
+
+  void putFloat4(float val);
+
+  void putFloat8(double val);
+
+  void putText(String val);
+
+  void putText(byte[] val);
+
+  void putBlob(byte[] val);
+
+  void putTimestamp(long val);
+
+  void putTime(long val);
+
+  void putDate(int val);
+
+  void putInterval(IntervalDatum val);
+
+  void putInet4(int val);
+
+  void putProtoDatum(ProtobufDatum datum);
+
+  void addTuple(Tuple tuple);
+}


Mime
View raw message