tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject [2/4] tajo git commit: TAJO-1738: Improve off-heap RowBlock.
Date Thu, 03 Sep 2015 12:48:24 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
deleted file mode 100644
index 4a9313f..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.tuple.RowBlockReader;
-import org.apache.tajo.util.UnsafeUtil;
-import sun.misc.Unsafe;
-
-public class OffHeapRowBlockReader implements RowBlockReader<ZeroCopyTuple> {
-  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-  OffHeapRowBlock rowBlock;
-
-  // Read States
-  private int curRowIdxForRead;
-  private int curPosForRead;
-
-  public OffHeapRowBlockReader(OffHeapRowBlock rowBlock) {
-    this.rowBlock = rowBlock;
-  }
-
-  public long remainForRead() {
-    return rowBlock.memorySize - curPosForRead;
-  }
-
-  @Override
-  public boolean next(ZeroCopyTuple tuple) {
-    if (curRowIdxForRead < rowBlock.rows()) {
-
-      long recordStartPtr = rowBlock.address() + curPosForRead;
-      int recordLen = UNSAFE.getInt(recordStartPtr);
-      tuple.set(rowBlock.buffer, curPosForRead, recordLen, rowBlock.dataTypes);
-
-      curPosForRead += recordLen;
-      curRowIdxForRead++;
-
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public void reset() {
-    curPosForRead = 0;
-    curRowIdxForRead = 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
deleted file mode 100644
index dbc3188..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.collect.Lists;
-import org.apache.tajo.storage.Tuple;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-public class OffHeapRowBlockUtils {
-
-  public static List<Tuple> sort(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) {
-    List<Tuple> tupleList = Lists.newArrayList();
-    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
-    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
-    while(reader.next(zcTuple)) {
-      tupleList.add(zcTuple);
-      zcTuple = new ZeroCopyTuple();
-    }
-    Collections.sort(tupleList, comparator);
-    return tupleList;
-  }
-
-  public static Tuple[] sortToArray(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) {
-    Tuple[] tuples = new Tuple[rowBlock.rows()];
-    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
-    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
-    for (int i = 0; i < rowBlock.rows() && reader.next(zcTuple); i++) {
-      tuples[i] = zcTuple;
-      zcTuple = new ZeroCopyTuple();
-    }
-    Arrays.sort(tuples, comparator);
-    return tuples;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
deleted file mode 100644
index d177e0c..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.common.TajoDataTypes;
-
-public class OffHeapRowBlockWriter extends OffHeapRowWriter {
-  OffHeapRowBlock rowBlock;
-
-  OffHeapRowBlockWriter(OffHeapRowBlock rowBlock) {
-    super(rowBlock.dataTypes);
-    this.rowBlock = rowBlock;
-  }
-
-  public long address() {
-    return rowBlock.address();
-  }
-
-  public int position() {
-    return rowBlock.position();
-  }
-
-  @Override
-  public void forward(int length) {
-    rowBlock.position(position() + length);
-  }
-
-  public void ensureSize(int size) {
-    rowBlock.ensureSize(size);
-  }
-
-  @Override
-  public void endRow() {
-    super.endRow();
-    rowBlock.setRows(rowBlock.rows() + 1);
-  }
-
-  @Override
-  public TajoDataTypes.DataType[] dataTypes() {
-    return rowBlock.dataTypes;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
deleted file mode 100644
index 85c7e0b..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.IntervalDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-import org.apache.tajo.datum.TextDatum;
-import org.apache.tajo.util.SizeOf;
-import org.apache.tajo.util.UnsafeUtil;
-
-/**
- *
- * Row Record Structure
- *
- * | row length (4 bytes) | field 1 offset | field 2 offset | ... | field N offset| field 1 | field 2| ... | field N |
- *                              4 bytes          4 bytes               4 bytes
- *
- */
-public abstract class OffHeapRowWriter implements RowWriter {
-  /** record size + offset list */
-  private final int headerSize;
-  /** field offsets */
-  private final int [] fieldOffsets;
-  private final TajoDataTypes.DataType [] dataTypes;
-
-  private int curFieldIdx;
-  private int curOffset;
-
-  public OffHeapRowWriter(final TajoDataTypes.DataType [] dataTypes) {
-    this.dataTypes = dataTypes;
-    fieldOffsets = new int[dataTypes.length];
-    headerSize = SizeOf.SIZE_OF_INT * (dataTypes.length + 1);
-  }
-
-  public void clear() {
-    curOffset = 0;
-    curFieldIdx = 0;
-  }
-
-  public long recordStartAddr() {
-    return address() + position();
-  }
-
-  public abstract long address();
-
-  public abstract void ensureSize(int size);
-
-  public int offset() {
-    return curOffset;
-  }
-
-  /**
-   * Current position
-   *
-   * @return The position
-   */
-  public abstract int position();
-
-  /**
-   * Forward the address;
-   *
-   * @param length Length to be forwarded
-   */
-  public abstract void forward(int length);
-
-  @Override
-  public TajoDataTypes.DataType[] dataTypes() {
-    return dataTypes;
-  }
-
-  public boolean startRow() {
-    curOffset = headerSize;
-    curFieldIdx = 0;
-    return true;
-  }
-
-  public void endRow() {
-    long rowHeaderPos = address() + position();
-    OffHeapMemory.UNSAFE.putInt(rowHeaderPos, curOffset);
-    rowHeaderPos += SizeOf.SIZE_OF_INT;
-
-    for (int i = 0; i < curFieldIdx; i++) {
-      OffHeapMemory.UNSAFE.putInt(rowHeaderPos, fieldOffsets[i]);
-      rowHeaderPos += SizeOf.SIZE_OF_INT;
-    }
-    for (int i = curFieldIdx; i < dataTypes.length; i++) {
-      OffHeapMemory.UNSAFE.putInt(rowHeaderPos, OffHeapRowBlock.NULL_FIELD_OFFSET);
-      rowHeaderPos += SizeOf.SIZE_OF_INT;
-    }
-
-    // rowOffset is equivalent to a byte length of this row.
-    forward(curOffset);
-  }
-
-  public void skipField() {
-    fieldOffsets[curFieldIdx++] = OffHeapRowBlock.NULL_FIELD_OFFSET;
-  }
-
-  private void forwardField() {
-    fieldOffsets[curFieldIdx++] = curOffset;
-  }
-
-  public void putBool(boolean val) {
-    ensureSize(SizeOf.SIZE_OF_BOOL);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putByte(recordStartAddr() + curOffset, (byte) (val ? 0x01 : 0x00));
-
-    curOffset += SizeOf.SIZE_OF_BOOL;
-  }
-
-  public void putInt2(short val) {
-    ensureSize(SizeOf.SIZE_OF_SHORT);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putShort(recordStartAddr() + curOffset, val);
-    curOffset += SizeOf.SIZE_OF_SHORT;
-  }
-
-  public void putInt4(int val) {
-    ensureSize(SizeOf.SIZE_OF_INT);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, val);
-    curOffset += SizeOf.SIZE_OF_INT;
-  }
-
-  public void putInt8(long val) {
-    ensureSize(SizeOf.SIZE_OF_LONG);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putLong(recordStartAddr() + curOffset, val);
-    curOffset += SizeOf.SIZE_OF_LONG;
-  }
-
-  public void putFloat4(float val) {
-    ensureSize(SizeOf.SIZE_OF_FLOAT);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putFloat(recordStartAddr() + curOffset, val);
-    curOffset += SizeOf.SIZE_OF_FLOAT;
-  }
-
-  public void putFloat8(double val) {
-    ensureSize(SizeOf.SIZE_OF_DOUBLE);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putDouble(recordStartAddr() + curOffset, val);
-    curOffset += SizeOf.SIZE_OF_DOUBLE;
-  }
-
-  public void putText(String val) {
-    byte[] bytes = val.getBytes(TextDatum.DEFAULT_CHARSET);
-    putText(bytes);
-  }
-
-  public void putText(byte[] val) {
-    int bytesLen = val.length;
-
-    ensureSize(SizeOf.SIZE_OF_INT + bytesLen);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen);
-    curOffset += SizeOf.SIZE_OF_INT;
-
-    OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null,
-        recordStartAddr() + curOffset, bytesLen);
-    curOffset += bytesLen;
-  }
-
-  public void putBlob(byte[] val) {
-    int bytesLen = val.length;
-
-    ensureSize(SizeOf.SIZE_OF_INT + bytesLen);
-    forwardField();
-
-    OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen);
-    curOffset += SizeOf.SIZE_OF_INT;
-
-    OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null,
-        recordStartAddr() + curOffset, bytesLen);
-    curOffset += bytesLen;
-  }
-
-  public void putTimestamp(long val) {
-    putInt8(val);
-  }
-
-  public void putDate(int val) {
-    putInt4(val);
-  }
-
-  public void putTime(long val) {
-    putInt8(val);
-  }
-
-  public void putInterval(IntervalDatum val) {
-    ensureSize(SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG);
-    forwardField();
-
-    long offset = recordStartAddr() + curOffset;
-    OffHeapMemory.UNSAFE.putInt(offset, val.getMonths());
-    offset += SizeOf.SIZE_OF_INT;
-    OffHeapMemory.UNSAFE.putLong(offset, val.getMilliSeconds());
-    curOffset += SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG;
-  }
-
-  public void putInet4(int val) {
-    putInt4(val);
-  }
-
-  public void putProtoDatum(ProtobufDatum val) {
-    putBlob(val.asByteArray());
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
deleted file mode 100644
index 14e67b2..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.util.FileUtil;
-
-/**
- * It specifies the maximum size or increasing ratio. In addition,
- * it guarantees that all numbers are less than or equal to Integer.MAX_VALUE 2^31
- * due to ByteBuffer.
- */
-public class ResizableLimitSpec {
-  private final Log LOG = LogFactory.getLog(ResizableLimitSpec.class);
-
-  public static final int MAX_SIZE_BYTES = Integer.MAX_VALUE;
-  public static final ResizableLimitSpec DEFAULT_LIMIT = new ResizableLimitSpec(Integer.MAX_VALUE);
-
-  private final long initSize;
-  private final long limitBytes;
-  private final float incRatio;
-  private final float allowedOVerflowRatio;
-  private final static float DEFAULT_ALLOWED_OVERFLOW_RATIO = 0.1f;
-  private final static float DEFAULT_INCREASE_RATIO = 1.0f;
-
-  public ResizableLimitSpec(long initSize) {
-    this(initSize, MAX_SIZE_BYTES, DEFAULT_ALLOWED_OVERFLOW_RATIO);
-  }
-
-  public ResizableLimitSpec(long initSize, long limitBytes) {
-    this(initSize, limitBytes, DEFAULT_ALLOWED_OVERFLOW_RATIO);
-  }
-
-  public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflow) {
-    this(initSize, limitBytes, allowedOverflow, DEFAULT_INCREASE_RATIO);
-  }
-
-  public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflowRatio, float incRatio) {
-    Preconditions.checkArgument(initSize > 0, "initial size must be greater than 0 bytes.");
-    Preconditions.checkArgument(initSize <= MAX_SIZE_BYTES, "The maximum initial size is 2GB.");
-    Preconditions.checkArgument(limitBytes > 0, "The limit size must be greater than 0 bytes.");
-    Preconditions.checkArgument(limitBytes <= MAX_SIZE_BYTES, "The maximum limit size is 2GB.");
-    Preconditions.checkArgument(incRatio > 0.0f, "Increase Ratio must be greater than 0.");
-
-    if (initSize == limitBytes) {
-      long overflowedSize = (long) (initSize + (initSize * allowedOverflowRatio));
-
-      if (overflowedSize > Integer.MAX_VALUE) {
-        overflowedSize = Integer.MAX_VALUE;
-      }
-
-      this.initSize = overflowedSize;
-      this.limitBytes = overflowedSize;
-    } else {
-      this.initSize = initSize;
-      limitBytes = (long) (limitBytes + (limitBytes * allowedOverflowRatio));
-
-      if (limitBytes > Integer.MAX_VALUE) {
-        this.limitBytes = Integer.MAX_VALUE;
-      } else {
-        this.limitBytes = limitBytes;
-      }
-    }
-
-    this.allowedOVerflowRatio = allowedOverflowRatio;
-    this.incRatio = incRatio;
-  }
-
-  public long initialSize() {
-    return initSize;
-  }
-
-  public long limit() {
-    return limitBytes;
-  }
-
-  public float remainRatio(long currentSize) {
-    Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
-    if (currentSize > Integer.MAX_VALUE) {
-      currentSize = Integer.MAX_VALUE;
-    }
-    return (float)currentSize / (float)limitBytes;
-  }
-
-  public boolean canIncrease(long currentSize) {
-    return remain(currentSize) > 0;
-  }
-
-  public long remain(long currentSize) {
-    Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
-    return limitBytes > Integer.MAX_VALUE ? Integer.MAX_VALUE - currentSize : limitBytes - currentSize;
-  }
-
-  public int increasedSize(int currentSize) {
-    if (currentSize < initSize) {
-      return (int) initSize;
-    }
-
-    if (currentSize > Integer.MAX_VALUE) {
-      LOG.warn("Current size already exceeds the maximum size (" + Integer.MAX_VALUE + " bytes)");
-      return Integer.MAX_VALUE;
-    }
-    long nextSize = (long) (currentSize + ((float) currentSize * incRatio));
-
-    if (nextSize > limitBytes) {
-      LOG.info("Increasing reaches size limit (" + FileUtil.humanReadableByteCount(limitBytes, false) + ")");
-      nextSize = limitBytes;
-    }
-
-    if (nextSize > Integer.MAX_VALUE) {
-      LOG.info("Increasing reaches maximum size (" + FileUtil.humanReadableByteCount(Integer.MAX_VALUE, false) + ")");
-      nextSize = Integer.MAX_VALUE;
-    }
-
-    return (int) nextSize;
-  }
-
-  @Override
-  public String toString() {
-    return "init=" + FileUtil.humanReadableByteCount(initSize, false) + ",limit="
-        + FileUtil.humanReadableByteCount(limitBytes, false) + ",overflow_ratio=" + allowedOVerflowRatio
-        + ",inc_ratio=" + incRatio;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
deleted file mode 100644
index a2b2561..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.IntervalDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-
-/**
- * The call sequence should be as follows:
- *
- * <pre>
- *   startRow() -->  skipField() or putXXX --> endRow()
- * </pre>
- *
- * The total number of skipField and putXXX invocations must be equivalent to the number of fields.
- */
-public interface RowWriter {
-
-  public TajoDataTypes.DataType [] dataTypes();
-
-  public boolean startRow();
-
-  public void endRow();
-
-  public void skipField();
-
-  public void putBool(boolean val);
-
-  public void putInt2(short val);
-
-  public void putInt4(int val);
-
-  public void putInt8(long val);
-
-  public void putFloat4(float val);
-
-  public void putFloat8(double val);
-
-  public void putText(String val);
-
-  public void putText(byte[] val);
-
-  public void putBlob(byte[] val);
-
-  public void putTimestamp(long val);
-
-  public void putTime(long val);
-
-  public void putDate(int val);
-
-  public void putInterval(IntervalDatum val);
-
-  public void putInet4(int val);
-
-  public void putProtoDatum(ProtobufDatum datum);
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
deleted file mode 100644
index 3756064..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
+++ /dev/null
@@ -1,331 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.exception.TajoRuntimeException;
-import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.util.SizeOf;
-import org.apache.tajo.util.StringUtils;
-import org.apache.tajo.util.UnsafeUtil;
-import org.apache.tajo.util.datetime.TimeMeta;
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.charset.Charset;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public abstract class UnSafeTuple implements Tuple {
-  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-
-  private DirectBuffer bb;
-  private int relativePos;
-  private int length;
-  private DataType [] types;
-
-  protected void set(ByteBuffer bb, int relativePos, int length, DataType [] types) {
-    this.bb = (DirectBuffer) bb;
-    this.relativePos = relativePos;
-    this.length = length;
-    this.types = types;
-  }
-
-  void set(ByteBuffer bb, DataType [] types) {
-    set(bb, 0, bb.limit(), types);
-  }
-
-  @Override
-  public int size() {
-    return types.length;
-  }
-
-  @Override
-  public TajoDataTypes.Type type(int fieldId) {
-    return types[fieldId].getType();
-  }
-
-  @Override
-  public int size(int fieldId) {
-    return UNSAFE.getInt(getFieldAddr(fieldId));
-  }
-
-  public ByteBuffer nioBuffer() {
-    return ((ByteBuffer)((ByteBuffer)bb).duplicate().position(relativePos).limit(relativePos + length)).slice();
-  }
-
-  public HeapTuple toHeapTuple() {
-    byte [] bytes = new byte[length];
-    UNSAFE.copyMemory(null, bb.address() + relativePos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, length);
-    return new HeapTuple(bytes, types);
-  }
-
-  public void copyFrom(UnSafeTuple tuple) {
-    Preconditions.checkNotNull(tuple);
-
-    ((ByteBuffer) bb).clear();
-    if (length < tuple.length) {
-      UnsafeUtil.free((ByteBuffer) bb);
-      bb = (DirectBuffer) ByteBuffer.allocateDirect(tuple.length).order(ByteOrder.nativeOrder());
-      this.relativePos = 0;
-      this.length = tuple.length;
-    }
-
-    ((ByteBuffer) bb).put(tuple.nioBuffer());
-  }
-
-  private int getFieldOffset(int fieldId) {
-    return UNSAFE.getInt(bb.address() + (long)(relativePos + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT)));
-  }
-
-  public long getFieldAddr(int fieldId) {
-    int fieldOffset = getFieldOffset(fieldId);
-    if (fieldOffset == -1) {
-      throw new RuntimeException("Invalid Field Access: " + fieldId);
-    }
-    return bb.address() + relativePos + fieldOffset;
-  }
-
-  @Override
-  public boolean contains(int fieldid) {
-    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
-  }
-
-  @Override
-  public boolean isBlank(int fieldid) {
-    return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
-  }
-
-  @Override
-  public boolean isBlankOrNull(int fieldid) {
-    return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
-  }
-
-  @Override
-  public void clear() {
-    // nothing to do
-  }
-
-  @Override
-  public void put(int fieldId, Datum value) {
-    throw new TajoRuntimeException(new UnsupportedException());
-  }
-
-  @Override
-  public void put(int fieldId, Tuple tuple) {
-    throw new TajoRuntimeException(new UnsupportedException());
-  }
-
-  @Override
-  public void put(Datum[] values) {
-    throw new TajoRuntimeException(new UnsupportedException());
-  }
-
-  @Override
-  public Datum asDatum(int fieldId) {
-    if (isBlankOrNull(fieldId)) {
-      return NullDatum.get();
-    }
-
-    switch (types[fieldId].getType()) {
-    case BOOLEAN:
-      return DatumFactory.createBool(getBool(fieldId));
-    case INT1:
-    case INT2:
-      return DatumFactory.createInt2(getInt2(fieldId));
-    case INT4:
-      return DatumFactory.createInt4(getInt4(fieldId));
-    case INT8:
-      return DatumFactory.createInt8(getInt4(fieldId));
-    case FLOAT4:
-      return DatumFactory.createFloat4(getFloat4(fieldId));
-    case FLOAT8:
-      return DatumFactory.createFloat8(getFloat8(fieldId));
-    case TEXT:
-      return DatumFactory.createText(getText(fieldId));
-    case TIMESTAMP:
-      return DatumFactory.createTimestamp(getInt8(fieldId));
-    case DATE:
-      return DatumFactory.createDate(getInt4(fieldId));
-    case TIME:
-      return DatumFactory.createTime(getInt8(fieldId));
-    case INTERVAL:
-      return getInterval(fieldId);
-    case INET4:
-      return DatumFactory.createInet4(getInt4(fieldId));
-    case PROTOBUF:
-      return getProtobufDatum(fieldId);
-    default:
-      throw new TajoRuntimeException(new UnsupportedException("data type '" + types[fieldId] + "'"));
-    }
-  }
-
-  @Override
-  public void clearOffset() {
-  }
-
-  @Override
-  public void setOffset(long offset) {
-  }
-
-  @Override
-  public long getOffset() {
-    return 0;
-  }
-
-  @Override
-  public boolean getBool(int fieldId) {
-    return UNSAFE.getByte(getFieldAddr(fieldId)) == 0x01;
-  }
-
-  @Override
-  public byte getByte(int fieldId) {
-    return UNSAFE.getByte(getFieldAddr(fieldId));
-  }
-
-  @Override
-  public char getChar(int fieldId) {
-    return UNSAFE.getChar(getFieldAddr(fieldId));
-  }
-
-  @Override
-  public byte[] getBytes(int fieldId) {
-    long pos = getFieldAddr(fieldId);
-    int len = UNSAFE.getInt(pos);
-    pos += SizeOf.SIZE_OF_INT;
-
-    byte [] bytes = new byte[len];
-    UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
-    return bytes;
-  }
-
-  @Override
-  public byte[] getTextBytes(int fieldId) {
-    long pos = getFieldAddr(fieldId);
-    int len = UNSAFE.getInt(pos);
-    pos += SizeOf.SIZE_OF_INT;
-
-    byte[] bytes = new byte[len];
-    UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
-    return bytes;
-  }
-
-  @Override
-  public short getInt2(int fieldId) {
-    long addr = getFieldAddr(fieldId);
-    return UNSAFE.getShort(addr);
-  }
-
-  @Override
-  public int getInt4(int fieldId) {
-    return UNSAFE.getInt(getFieldAddr(fieldId));
-  }
-
-  @Override
-  public long getInt8(int fieldId) {
-    return UNSAFE.getLong(getFieldAddr(fieldId));
-  }
-
-  @Override
-  public float getFloat4(int fieldId) {
-    return UNSAFE.getFloat(getFieldAddr(fieldId));
-  }
-
-  @Override
-  public double getFloat8(int fieldId) {
-    return UNSAFE.getDouble(getFieldAddr(fieldId));
-  }
-
-  @Override
-  public String getText(int fieldId) {
-    return new String(getTextBytes(fieldId));
-  }
-
-  public IntervalDatum getInterval(int fieldId) {
-    long pos = getFieldAddr(fieldId);
-    int months = UNSAFE.getInt(pos);
-    pos += SizeOf.SIZE_OF_INT;
-    long millisecs = UNSAFE.getLong(pos);
-    return new IntervalDatum(months, millisecs);
-  }
-
-  @Override
-  public Datum getProtobufDatum(int fieldId) {
-    byte [] bytes = getBytes(fieldId);
-
-    ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId]);
-    Message.Builder builder = factory.newBuilder();
-    try {
-      builder.mergeFrom(bytes);
-    } catch (InvalidProtocolBufferException e) {
-      return NullDatum.get();
-    }
-
-    return new ProtobufDatum(builder.build());
-  }
-
-  @Override
-  public char[] getUnicodeChars(int fieldId) {
-    long pos = getFieldAddr(fieldId);
-    int len = UNSAFE.getInt(pos);
-    pos += SizeOf.SIZE_OF_INT;
-
-    byte [] bytes = new byte[len];
-    UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
-    return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8"));
-  }
-
-  @Override
-  public TimeMeta getTimeDate(int fieldId) {
-    return null;
-  }
-
-  @Override
-  public Tuple clone() throws CloneNotSupportedException {
-    return toHeapTuple();
-  }
-
-  @Override
-  public Datum[] getValues() {
-    Datum [] datums = new Datum[size()];
-    for (int i = 0; i < size(); i++) {
-      if (contains(i)) {
-        datums[i] = asDatum(i);
-      } else {
-        datums[i] = NullDatum.get();
-      }
-    }
-    return datums;
-  }
-
-  @Override
-  public String toString() {
-    return VTuple.toDisplayString(getValues());
-  }
-
-  public abstract void release();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
deleted file mode 100644
index 73e1e2f..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.primitives.Longs;
-import com.google.common.primitives.UnsignedLongs;
-import org.apache.tajo.util.SizeOf;
-import org.apache.tajo.util.UnsafeUtil;
-import sun.misc.Unsafe;
-
-import java.nio.ByteOrder;
-
-/**
- * It directly access UTF bytes in UnSafeTuple without any copy. It is used by compiled TupleComparator.
- */
-public class UnSafeTupleBytesComparator {
-  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-
-  static final boolean littleEndian =
-      ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
-
-  public static int compare(long ptr1, long ptr2) {
-    int lstrLen = UNSAFE.getInt(ptr1);
-    int rstrLen = UNSAFE.getInt(ptr2);
-
-    ptr1 += SizeOf.SIZE_OF_INT;
-    ptr2 += SizeOf.SIZE_OF_INT;
-
-    int minLength = Math.min(lstrLen, rstrLen);
-    int minWords = minLength / Longs.BYTES;
-
-        /*
-         * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a
-         * time is no slower than comparing 4 bytes at a time even on 32-bit.
-         * On the other hand, it is substantially faster on 64-bit.
-         */
-    for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
-      long lw = UNSAFE.getLong(ptr1);
-      long rw = UNSAFE.getLong(ptr2);
-      long diff = lw ^ rw;
-
-      if (diff != 0) {
-        if (!littleEndian) {
-          return UnsignedLongs.compare(lw, rw);
-        }
-
-        // Use binary search
-        int n = 0;
-        int y;
-        int x = (int) diff;
-        if (x == 0) {
-          x = (int) (diff >>> 32);
-          n = 32;
-        }
-
-        y = x << 16;
-        if (y == 0) {
-          n += 16;
-        } else {
-          x = y;
-        }
-
-        y = x << 8;
-        if (y == 0) {
-          n += 8;
-        }
-        return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
-      }
-
-      ptr1 += SizeOf.SIZE_OF_LONG;
-      ptr2 += SizeOf.SIZE_OF_LONG;
-    }
-
-    // The epilogue to cover the last (minLength % 8) elements.
-    for (int i = minWords * Longs.BYTES; i < minLength; i++) {
-      int result = UNSAFE.getByte(ptr1++) - UNSAFE.getByte(ptr2++);
-      if (result != 0) {
-        return result;
-      }
-    }
-    return lstrLen - rstrLen;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
deleted file mode 100644
index 51dbb29..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import java.nio.ByteBuffer;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class ZeroCopyTuple extends UnSafeTuple {
-
-  public void set(ByteBuffer bb, int relativePos, int length, DataType [] types) {
-    super.set(bb, relativePos, length, types);
-  }
-
-  @Override
-  public void release() {
-    // nothing to do
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
index dfdff85..676c072 100644
--- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
@@ -39,7 +39,7 @@
   <!--- Registered Scanner Handler -->
   <property>
     <name>tajo.storage.scanner-handler</name>
-    <value>text,json,raw,rcfile,row,parquet,orc,sequencefile,avro,hbase</value>
+    <value>text,json,raw,draw,rcfile,row,parquet,orc,sequencefile,avro,hbase</value>
   </property>
 
   <!--- Fragment Class Configurations -->
@@ -56,6 +56,10 @@
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
   <property>
+    <name>tajo.storage.fragment.draw.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
     <name>tajo.storage.fragment.rcfile.class</name>
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
@@ -101,6 +105,11 @@
   </property>
 
   <property>
+    <name>tajo.storage.scanner-handler.draw.class</name>
+    <value>org.apache.tajo.storage.rawfile.DirectRawFileScanner</value>
+  </property>
+
+  <property>
     <name>tajo.storage.scanner-handler.rcfile.class</name>
     <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
   </property>
@@ -134,7 +143,7 @@
     <name>tajo.storage.scanner-handler.hbase.class</name>
     <value>org.apache.tajo.storage.hbase.HBaseScanner</value>
   </property>
-  
+
   <!--- Appender Handler -->
   <property>
     <name>tajo.storage.appender-handler</name>
@@ -157,6 +166,11 @@
   </property>
 
   <property>
+    <name>tajo.storage.appender-handler.draw.class</name>
+    <value>org.apache.tajo.storage.rawfile.DirectRawFileWriter</value>
+  </property>
+
+  <property>
     <name>tajo.storage.appender-handler.rcfile.class</name>
     <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value>
   </property>
@@ -212,4 +226,4 @@
     <value>131072</value>
     <description>128KB write buffer</description>
   </property>
-</configuration>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java
deleted file mode 100644
index b332364..0000000
--- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple;
-
-import org.apache.tajo.storage.RowStoreUtil;
-import org.apache.tajo.tuple.offheap.*;
-import org.junit.Test;
-
-public class TestBaseTupleBuilder {
-
-  @Test
-  public void testBuild() {
-    BaseTupleBuilder builder = new BaseTupleBuilder(TestOffHeapRowBlock.schema);
-
-    OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(10248);
-    OffHeapRowBlockReader reader = rowBlock.getReader();
-
-    ZeroCopyTuple inputTuple = new ZeroCopyTuple();
-
-    HeapTuple heapTuple = null;
-    ZeroCopyTuple zcTuple = null;
-    int i = 0;
-    while(reader.next(inputTuple)) {
-      RowStoreUtil.convert(inputTuple, builder);
-
-      heapTuple = builder.buildToHeapTuple();
-      TestOffHeapRowBlock.validateTupleResult(i, heapTuple);
-
-      zcTuple = builder.buildToZeroCopyTuple();
-      TestOffHeapRowBlock.validateTupleResult(i, zcTuple);
-
-      i++;
-    }
-  }
-
-  @Test
-  public void testBuildWithNull() {
-    BaseTupleBuilder builder = new BaseTupleBuilder(TestOffHeapRowBlock.schema);
-
-    OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlockWithNull(10248);
-    OffHeapRowBlockReader reader = rowBlock.getReader();
-
-    ZeroCopyTuple inputTuple = new ZeroCopyTuple();
-
-    HeapTuple heapTuple = null;
-    ZeroCopyTuple zcTuple = null;
-    int i = 0;
-    while(reader.next(inputTuple)) {
-      RowStoreUtil.convert(inputTuple, builder);
-
-      heapTuple = builder.buildToHeapTuple();
-      TestOffHeapRowBlock.validateNullity(i, heapTuple);
-
-      zcTuple = builder.buildToZeroCopyTuple();
-      TestOffHeapRowBlock.validateNullity(i, zcTuple);
-
-      i++;
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java
deleted file mode 100644
index 96f465a..0000000
--- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.catalog.SchemaUtil;
-import org.junit.Test;
-
-public class TestHeapTuple {
-
-  @Test
-  public void testHeapTuple() {
-    OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(1024);
-
-    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
-
-    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
-    int i = 0;
-    while (reader.next(zcTuple)) {
-      byte [] bytes = new byte[zcTuple.nioBuffer().limit()];
-      zcTuple.nioBuffer().get(bytes);
-
-      HeapTuple heapTuple = new HeapTuple(bytes, SchemaUtil.toDataTypes(TestOffHeapRowBlock.schema));
-      TestOffHeapRowBlock.validateTupleResult(i, heapTuple);
-      i++;
-    }
-
-    rowBlock.release();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
deleted file mode 100644
index 278d733..0000000
--- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
+++ /dev/null
@@ -1,577 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.ProtobufDatum;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.storage.BaseTupleComparator;
-import org.apache.tajo.storage.RowStoreUtil;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.unit.StorageUnit;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.ProtoUtil;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.tajo.common.TajoDataTypes.Type;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestOffHeapRowBlock {
-  private static final Log LOG = LogFactory.getLog(TestOffHeapRowBlock.class);
-  public static String UNICODE_FIELD_PREFIX = "abc_가나다_";
-  public static Schema schema;
-
-  static {
-    schema = new Schema();
-    schema.addColumn("col0", Type.BOOLEAN);
-    schema.addColumn("col1", Type.INT2);
-    schema.addColumn("col2", Type.INT4);
-    schema.addColumn("col3", Type.INT8);
-    schema.addColumn("col4", Type.FLOAT4);
-    schema.addColumn("col5", Type.FLOAT8);
-    schema.addColumn("col6", Type.TEXT);
-    schema.addColumn("col7", Type.TIMESTAMP);
-    schema.addColumn("col8", Type.DATE);
-    schema.addColumn("col9", Type.TIME);
-    schema.addColumn("col10", Type.INTERVAL);
-    schema.addColumn("col11", Type.INET4);
-    schema.addColumn("col12",
-        CatalogUtil.newDataType(TajoDataTypes.Type.PROTOBUF, PrimitiveProtos.StringProto.class.getName()));
-  }
-
-  private void explainRowBlockAllocation(OffHeapRowBlock rowBlock, long startTime, long endTime) {
-    LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated "
-        + (endTime - startTime) + " msec");
-  }
-
-  @Test
-  public void testPutAndReadValidation() {
-    int rowNum = 1000;
-
-    long allocStart = System.currentTimeMillis();
-    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024);
-    long allocEnd = System.currentTimeMillis();
-    explainRowBlockAllocation(rowBlock, allocStart, allocEnd);
-
-    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
-
-    ZeroCopyTuple tuple = new ZeroCopyTuple();
-    long writeStart = System.currentTimeMillis();
-    for (int i = 0; i < rowNum; i++) {
-      fillRow(i, rowBlock.getWriter());
-
-      reader.reset();
-      int j = 0;
-      while(reader.next(tuple)) {
-        validateTupleResult(j, tuple);
-
-        j++;
-      }
-    }
-    long writeEnd = System.currentTimeMillis();
-    LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec");
-
-    long readStart = System.currentTimeMillis();
-    tuple = new ZeroCopyTuple();
-    int j = 0;
-    reader.reset();
-    while(reader.next(tuple)) {
-      validateTupleResult(j, tuple);
-      j++;
-    }
-    long readEnd = System.currentTimeMillis();
-    LOG.info("reading takes " + (readEnd - readStart) + " msec");
-
-    rowBlock.release();
-  }
-
-  @Test
-  public void testNullityValidation() {
-    int rowNum = 1000;
-
-    long allocStart = System.currentTimeMillis();
-    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024);
-    long allocEnd = System.currentTimeMillis();
-    explainRowBlockAllocation(rowBlock, allocStart, allocEnd);
-
-    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
-    ZeroCopyTuple tuple = new ZeroCopyTuple();
-    long writeStart = System.currentTimeMillis();
-    for (int i = 0; i < rowNum; i++) {
-      fillRowBlockWithNull(i, rowBlock.getWriter());
-
-      reader.reset();
-      int j = 0;
-      while(reader.next(tuple)) {
-        validateNullity(j, tuple);
-
-        j++;
-      }
-    }
-    long writeEnd = System.currentTimeMillis();
-    LOG.info("writing and nullity validating take " + (writeEnd - writeStart) +" msec");
-
-    long readStart = System.currentTimeMillis();
-    tuple = new ZeroCopyTuple();
-    int j = 0;
-    reader.reset();
-    while(reader.next(tuple)) {
-      validateNullity(j, tuple);
-
-      j++;
-    }
-    long readEnd = System.currentTimeMillis();
-    LOG.info("reading takes " + (readEnd - readStart) + " msec");
-
-    rowBlock.release();
-  }
-
-  @Test
-  public void testEmptyRow() {
-    int rowNum = 1000;
-
-    long allocStart = System.currentTimeMillis();
-    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 10);
-    long allocEnd = System.currentTimeMillis();
-    explainRowBlockAllocation(rowBlock, allocStart, allocEnd);
-
-    long writeStart = System.currentTimeMillis();
-    for (int i = 0; i < rowNum; i++) {
-      rowBlock.getWriter().startRow();
-      // empty columns
-      rowBlock.getWriter().endRow();
-    }
-    long writeEnd = System.currentTimeMillis();
-    LOG.info("writing tooks " + (writeEnd - writeStart) + " msec");
-
-    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
-
-    long readStart = System.currentTimeMillis();
-    ZeroCopyTuple tuple = new ZeroCopyTuple();
-    int j = 0;
-    reader.reset();
-    while(reader.next(tuple)) {
-      j++;
-    }
-    long readEnd = System.currentTimeMillis();
-    LOG.info("reading takes " + (readEnd - readStart) + " msec");
-    rowBlock.release();
-
-    assertEquals(rowNum, j);
-    assertEquals(rowNum, rowBlock.rows());
-  }
-
-  @Test
-  public void testSortBenchmark() {
-    int rowNum = 1000;
-
-    OffHeapRowBlock rowBlock = createRowBlock(rowNum);
-    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
-
-    List<ZeroCopyTuple> unSafeTuples = Lists.newArrayList();
-
-    long readStart = System.currentTimeMillis();
-    ZeroCopyTuple tuple = new ZeroCopyTuple();
-    reader.reset();
-    while(reader.next(tuple)) {
-      unSafeTuples.add(tuple);
-      tuple = new ZeroCopyTuple();
-    }
-    long readEnd = System.currentTimeMillis();
-    LOG.info("reading takes " + (readEnd - readStart) + " msec");
-
-    SortSpec sortSpec = new SortSpec(new Column("col2", Type.INT4));
-    BaseTupleComparator comparator = new BaseTupleComparator(schema, new SortSpec[] {sortSpec});
-
-    long sortStart = System.currentTimeMillis();
-    Collections.sort(unSafeTuples, comparator);
-    long sortEnd = System.currentTimeMillis();
-    LOG.info("sorting took " + (sortEnd - sortStart) + " msec");
-    rowBlock.release();
-  }
-
-  @Test
-  public void testVTuplePutAndGetBenchmark() {
-    int rowNum = 1000;
-
-    List<VTuple> rowBlock = Lists.newArrayList();
-    long writeStart = System.currentTimeMillis();
-    VTuple tuple;
-    for (int i = 0; i < rowNum; i++) {
-      tuple = new VTuple(schema.size());
-      fillVTuple(i, tuple);
-      rowBlock.add(tuple);
-    }
-    long writeEnd = System.currentTimeMillis();
-    LOG.info("Writing takes " + (writeEnd - writeStart) + " msec");
-
-    long readStart = System.currentTimeMillis();
-    int j = 0;
-    for (VTuple t : rowBlock) {
-      validateTupleResult(j, t);
-      j++;
-    }
-    long readEnd = System.currentTimeMillis();
-    LOG.info("reading takes " + (readEnd - readStart) + " msec");
-
-    int count = 0;
-    for (int l = 0; l < rowBlock.size(); l++) {
-      for(int m = 0; m < schema.size(); m++ ) {
-        if (rowBlock.get(l).contains(m) && rowBlock.get(l).get(m).type() == Type.INT4) {
-          count ++;
-        }
-      }
-    }
-    // For preventing unnecessary code elimination optimization.
-    LOG.info("The number of INT4 values is " + count + ".");
-  }
-
-  @Test
-  public void testVTuplePutAndGetBenchmarkViaDirectRowEncoder() {
-    int rowNum = 1000;
-
-    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 100);
-
-    long writeStart = System.currentTimeMillis();
-    VTuple tuple = new VTuple(schema.size());
-    for (int i = 0; i < rowNum; i++) {
-      fillVTuple(i, tuple);
-
-      RowStoreUtil.convert(tuple, rowBlock.getWriter());
-    }
-    long writeEnd = System.currentTimeMillis();
-    LOG.info("Writing takes " + (writeEnd - writeStart) + " msec");
-
-    validateResults(rowBlock);
-    rowBlock.release();
-  }
-
-  @Test
-  public void testSerDerOfRowBlock() {
-    int rowNum = 1000;
-
-    OffHeapRowBlock rowBlock = createRowBlock(rowNum);
-
-    ByteBuffer bb = rowBlock.nioBuffer();
-    OffHeapRowBlock restoredRowBlock = new OffHeapRowBlock(schema, bb);
-    validateResults(restoredRowBlock);
-    rowBlock.release();
-  }
-
-  @Test
-  public void testSerDerOfZeroCopyTuple() {
-    int rowNum = 1000;
-
-    OffHeapRowBlock rowBlock = createRowBlock(rowNum);
-
-    ByteBuffer bb = rowBlock.nioBuffer();
-    OffHeapRowBlock restoredRowBlock = new OffHeapRowBlock(schema, bb);
-    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(restoredRowBlock);
-
-    long readStart = System.currentTimeMillis();
-    ZeroCopyTuple tuple = new ZeroCopyTuple();
-    ZeroCopyTuple copyTuple = new ZeroCopyTuple();
-    int j = 0;
-    reader.reset();
-    while(reader.next(tuple)) {
-      ByteBuffer copy = tuple.nioBuffer();
-      copyTuple.set(copy, SchemaUtil.toDataTypes(schema));
-
-      validateTupleResult(j, copyTuple);
-
-      j++;
-    }
-    long readEnd = System.currentTimeMillis();
-    LOG.info("reading takes " + (readEnd - readStart) + " msec");
-
-    rowBlock.release();
-  }
-
-  public static OffHeapRowBlock createRowBlock(int rowNum) {
-    long allocateStart = System.currentTimeMillis();
-    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 8);
-    long allocatedEnd = System.currentTimeMillis();
-    LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated "
-        + (allocatedEnd - allocateStart) + " msec");
-
-    long writeStart = System.currentTimeMillis();
-    for (int i = 0; i < rowNum; i++) {
-      fillRow(i, rowBlock.getWriter());
-    }
-    long writeEnd = System.currentTimeMillis();
-    LOG.info("writing takes " + (writeEnd - writeStart) + " msec");
-
-    return rowBlock;
-  }
-
-  public static OffHeapRowBlock createRowBlockWithNull(int rowNum) {
-    long allocateStart = System.currentTimeMillis();
-    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 8);
-    long allocatedEnd = System.currentTimeMillis();
-    LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated "
-        + (allocatedEnd - allocateStart) + " msec");
-
-    long writeStart = System.currentTimeMillis();
-    for (int i = 0; i < rowNum; i++) {
-      fillRowBlockWithNull(i, rowBlock.getWriter());
-    }
-    long writeEnd = System.currentTimeMillis();
-    LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec");
-
-    return rowBlock;
-  }
-
-  public static void fillRow(int i, RowWriter builder) {
-    builder.startRow();
-    builder.putBool(i % 1 == 0 ? true : false); // 0
-    builder.putInt2((short) 1);                 // 1
-    builder.putInt4(i);                         // 2
-    builder.putInt8(i);                         // 3
-    builder.putFloat4(i);                       // 4
-    builder.putFloat8(i);                       // 5
-    builder.putText((UNICODE_FIELD_PREFIX + i).getBytes());  // 6
-    builder.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7
-    builder.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8
-    builder.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9
-    builder.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10
-    builder.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11
-    builder.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12
-    builder.endRow();
-  }
-
-  public static void fillRowBlockWithNull(int i, RowWriter writer) {
-    writer.startRow();
-
-    if (i == 0) {
-      writer.skipField();
-    } else {
-      writer.putBool(i % 1 == 0 ? true : false); // 0
-    }
-    if (i % 1 == 0) {
-      writer.skipField();
-    } else {
-      writer.putInt2((short) 1);                 // 1
-    }
-
-    if (i % 2 == 0) {
-      writer.skipField();
-    } else {
-      writer.putInt4(i);                         // 2
-    }
-
-    if (i % 3 == 0) {
-      writer.skipField();
-    } else {
-      writer.putInt8(i);                         // 3
-    }
-
-    if (i % 4 == 0) {
-      writer.skipField();
-    } else {
-      writer.putFloat4(i);                       // 4
-    }
-
-    if (i % 5 == 0) {
-      writer.skipField();
-    } else {
-      writer.putFloat8(i);                       // 5
-    }
-
-    if (i % 6 == 0) {
-      writer.skipField();
-    } else {
-      writer.putText((UNICODE_FIELD_PREFIX + i).getBytes());  // 6
-    }
-
-    if (i % 7 == 0) {
-      writer.skipField();
-    } else {
-      writer.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7
-    }
-
-    if (i % 8 == 0) {
-      writer.skipField();
-    } else {
-      writer.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8
-    }
-
-    if (i % 9 == 0) {
-      writer.skipField();
-    } else {
-      writer.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9
-    }
-
-    if (i % 10 == 0) {
-      writer.skipField();
-    } else {
-      writer.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10
-    }
-
-    if (i % 11 == 0) {
-      writer.skipField();
-    } else {
-      writer.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11
-    }
-
-    if (i % 12 == 0) {
-      writer.skipField();
-    } else {
-      writer.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12
-    }
-
-    writer.endRow();
-  }
-
-  public static void fillVTuple(int i, VTuple tuple) {
-    tuple.put(0, DatumFactory.createBool(i % 1 == 0));
-    tuple.put(1, DatumFactory.createInt2((short) 1));
-    tuple.put(2, DatumFactory.createInt4(i));
-    tuple.put(3, DatumFactory.createInt8(i));
-    tuple.put(4, DatumFactory.createFloat4(i));
-    tuple.put(5, DatumFactory.createFloat8(i));
-    tuple.put(6, DatumFactory.createText((UNICODE_FIELD_PREFIX + i).getBytes()));
-    tuple.put(7, DatumFactory.createTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i)); // 7
-    tuple.put(8, DatumFactory.createDate(DatumFactory.createDate("2014-04-16").asInt4() + i)); // 8
-    tuple.put(9, DatumFactory.createTime(DatumFactory.createTime("08:48:00").asInt8() + i)); // 9
-    tuple.put(10, DatumFactory.createInterval((i + 1) + " hours")); // 10
-    tuple.put(11, DatumFactory.createInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i)); // 11
-    tuple.put(12, new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12;
-  }
-
-  public static void validateResults(OffHeapRowBlock rowBlock) {
-    long readStart = System.currentTimeMillis();
-    ZeroCopyTuple tuple = new ZeroCopyTuple();
-    int j = 0;
-    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
-    reader.reset();
-    while(reader.next(tuple)) {
-      validateTupleResult(j, tuple);
-      j++;
-    }
-    long readEnd = System.currentTimeMillis();
-    LOG.info("Reading takes " + (readEnd - readStart) + " msec");
-  }
-
-  public static void validateTupleResult(int j, Tuple t) {
-    assertTrue((j % 1 == 0) == t.getBool(0));
-    assertTrue(1 == t.getInt2(1));
-    assertEquals(j, t.getInt4(2));
-    assertEquals(j, t.getInt8(3));
-    assertTrue(j == t.getFloat4(4));
-    assertTrue(j == t.getFloat8(5));
-    assertEquals(new String(UNICODE_FIELD_PREFIX + j), t.getText(6));
-    assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, t.getInt8(7));
-    assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, t.getInt4(8));
-    assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, t.getInt8(9));
-    assertEquals(DatumFactory.createInterval((j + 1) + " hours"), t.getInterval(10));
-    assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, t.getInt4(11));
-    assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), t.getProtobufDatum(12));
-  }
-
-  public static void validateNullity(int j, Tuple tuple) {
-    if (j == 0) {
-      tuple.isBlankOrNull(0);
-    } else {
-      assertTrue((j % 1 == 0) == tuple.getBool(0));
-    }
-
-    if (j % 1 == 0) {
-      tuple.isBlankOrNull(1);
-    } else {
-      assertTrue(1 == tuple.getInt2(1));
-    }
-
-    if (j % 2 == 0) {
-      tuple.isBlankOrNull(2);
-    } else {
-      assertEquals(j, tuple.getInt4(2));
-    }
-
-    if (j % 3 == 0) {
-      tuple.isBlankOrNull(3);
-    } else {
-      assertEquals(j, tuple.getInt8(3));
-    }
-
-    if (j % 4 == 0) {
-      tuple.isBlankOrNull(4);
-    } else {
-      assertTrue(j == tuple.getFloat4(4));
-    }
-
-    if (j % 5 == 0) {
-      tuple.isBlankOrNull(5);
-    } else {
-      assertTrue(j == tuple.getFloat8(5));
-    }
-
-    if (j % 6 == 0) {
-      tuple.isBlankOrNull(6);
-    } else {
-      assertEquals(new String(UNICODE_FIELD_PREFIX + j), tuple.getText(6));
-    }
-
-    if (j % 7 == 0) {
-      tuple.isBlankOrNull(7);
-    } else {
-      assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, tuple.getInt8(7));
-    }
-
-    if (j % 8 == 0) {
-      tuple.isBlankOrNull(8);
-    } else {
-      assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, tuple.getInt4(8));
-    }
-
-    if (j % 9 == 0) {
-      tuple.isBlankOrNull(9);
-    } else {
-      assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, tuple.getInt8(9));
-    }
-
-    if (j % 10 == 0) {
-      tuple.isBlankOrNull(10);
-    } else {
-      assertEquals(DatumFactory.createInterval((j + 1) + " hours"), tuple.getInterval(10));
-    }
-
-    if (j % 11 == 0) {
-      tuple.isBlankOrNull(11);
-    } else {
-      assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, tuple.getInt4(11));
-    }
-
-    if (j % 12 == 0) {
-      tuple.isBlankOrNull(12);
-    } else {
-      assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), tuple.getProtobufDatum(12));
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java
deleted file mode 100644
index 1eb9c17..0000000
--- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.unit.StorageUnit;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class TestResizableSpec {
-
-  @Test
-  public void testResizableLimit() {
-    ResizableLimitSpec limit = new ResizableLimitSpec(10 * StorageUnit.MB, 1000 * StorageUnit.MB, 0.1f, 1.0f);
-
-    long expectedMaxSize = (long) (1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f));
-
-    assertTrue(limit.limit() == 1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f));
-
-    assertEquals(20971520, limit.increasedSize(10 * StorageUnit.MB));
-
-    assertEquals(expectedMaxSize, limit.increasedSize(1600 * StorageUnit.MB));
-
-    assertEquals(0.98f, limit.remainRatio(980 * StorageUnit.MB), 0.1);
-
-    assertFalse(limit.canIncrease(limit.limit()));
-  }
-
-  @Test
-  public void testFixedLimit() {
-    FixedSizeLimitSpec limit = new FixedSizeLimitSpec(100 * StorageUnit.MB, 0.0f);
-
-    assertEquals(limit.limit(), 100 * StorageUnit.MB);
-
-    assertEquals(100 * StorageUnit.MB, limit.increasedSize(1000));
-
-    assertEquals(100 * StorageUnit.MB, limit.increasedSize(1600 * StorageUnit.MB));
-
-    assertTrue(0.98f == limit.remainRatio(98 * StorageUnit.MB));
-
-    assertFalse(limit.canIncrease(limit.limit()));
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
index f637da0..8a9b9ea 100644
--- a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
@@ -38,7 +38,7 @@
   <!--- Registered Scanner Handler -->
   <property>
     <name>tajo.storage.scanner-handler</name>
-    <value>text,json,raw,rcfile,row,parquet,orc,sequencefile,avro,hbase</value>
+    <value>text,json,raw,draw,rcfile,row,parquet,orc,sequencefile,avro,hbase</value>
   </property>
 
   <!--- Fragment Class Configurations -->
@@ -55,6 +55,10 @@
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
   <property>
+    <name>tajo.storage.fragment.draw.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
     <name>tajo.storage.fragment.rcfile.class</name>
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
@@ -100,6 +104,11 @@
   </property>
 
   <property>
+    <name>tajo.storage.scanner-handler.draw.class</name>
+    <value>org.apache.tajo.storage.rawfile.DirectRawFileScanner</value>
+  </property>
+
+  <property>
     <name>tajo.storage.scanner-handler.rcfile.class</name>
     <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
   </property>
@@ -116,7 +125,7 @@
 
   <property>
     <name>tajo.storage.scanner-handler.orc.class</name>
-    <value>org.apache.tajo.storage.orc.OrcScanner</value>
+    <value>org.apache.tajo.storage.orc.ORCScanner</value>
   </property>
 
   <property>
@@ -137,7 +146,7 @@
   <!--- Appender Handler -->
   <property>
     <name>tajo.storage.appender-handler</name>
-    <value>text,raw,rcfile,row,parquet,sequencefile,avro,hbase</value>
+    <value>text,raw,draw,rcfile,row,parquet,sequencefile,avro,hbase</value>
   </property>
 
   <property>
@@ -156,6 +165,11 @@
   </property>
 
   <property>
+    <name>tajo.storage.appender-handler.draw.class</name>
+    <value>org.apache.tajo.storage.rawfile.DirectRawFileWriter</value>
+  </property>
+
+  <property>
     <name>tajo.storage.appender-handler.rcfile.class</name>
     <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value>
   </property>
@@ -211,4 +225,4 @@
     <value>131072</value>
     <description>128KB write buffer</description>
   </property>
-</configuration>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-storage/tajo-storage-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/pom.xml b/tajo-storage/tajo-storage-hbase/pom.xml
index 211bc14..7480fa3 100644
--- a/tajo-storage/tajo-storage-hbase/pom.xml
+++ b/tajo-storage/tajo-storage-hbase/pom.xml
@@ -233,6 +233,10 @@
           <groupId>com.sun.jersey.jersey-test-framework</groupId>
           <artifactId>jersey-test-framework-grizzly2</artifactId>
         </exclusion>
+        <exclusion>
+          <artifactId>netty-all</artifactId>
+          <groupId>io.netty</groupId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -306,6 +310,12 @@
       <artifactId>hbase-server</artifactId>
       <version>${hbase.version}</version>
       <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>netty-all</artifactId>
+          <groupId>io.netty</groupId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
index 8ae9a26..0172484 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
@@ -25,15 +25,17 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
-import org.apache.tajo.tuple.offheap.OffHeapRowBlockReader;
-import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.tuple.RowBlockReader;
+import org.apache.tajo.tuple.memory.MemoryRowBlock;
+import org.apache.tajo.tuple.memory.RowBlock;
+import org.apache.tajo.tuple.memory.UnSafeTuple;
+import org.apache.tajo.tuple.memory.ZeroCopyTuple;
 import org.apache.tajo.unit.StorageUnit;
 
 import java.io.File;
@@ -44,30 +46,27 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner
   private static final Log LOG = LogFactory.getLog(DirectRawFileScanner.class);
 
   private SeekableInputChannel channel;
-  private TajoDataTypes.DataType[] columnTypes;
 
-  private boolean eof = false;
+  private boolean eos = false;
   private long fileSize;
   private long recordCount;
+  private long filePosition;
+  private long endOffset;
 
-  private ZeroCopyTuple unSafeTuple = new ZeroCopyTuple();
-  private OffHeapRowBlock tupleBuffer;
-  private OffHeapRowBlockReader reader;
+  private ZeroCopyTuple unSafeTuple = new UnSafeTuple();
+  private RowBlock tupleBuffer;
+  private RowBlockReader reader;
 
-  public DirectRawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException {
+  public DirectRawFileScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException {
     super(conf, schema, meta, fragment);
   }
 
   public void init() throws IOException {
     initChannel();
 
-    columnTypes = new TajoDataTypes.DataType[schema.size()];
-    for (int i = 0; i < schema.size(); i++) {
-      columnTypes[i] = schema.getColumn(i).getDataType();
-    }
+    tupleBuffer = new MemoryRowBlock(SchemaUtil.toDataTypes(schema), 64 * StorageUnit.KB, true);
 
-    tupleBuffer = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
-    reader = new OffHeapRowBlockReader(tupleBuffer);
+    reader = tupleBuffer.getReader();
 
     fetchNeeded = !next(tupleBuffer);
 
@@ -90,44 +89,55 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner
       }
 
       channel = new LocalFileInputChannel(new FileInputStream(file));
-      channel.seek(fragment.getStartKey());
       fileSize = channel.size();
     } else {
       channel = new FSDataInputChannel(fs.open(fragment.getPath()));
-      channel.seek(fragment.getStartKey());
       fileSize = channel.size();
     }
 
+    // initial set position
+    if (fragment.getStartKey() > 0) {
+      channel.seek(fragment.getStartKey());
+    }
+
     if (tableStats != null) {
       tableStats.setNumBytes(fileSize);
     }
 
+    filePosition = fragment.getStartKey();
+    endOffset = fragment.getStartKey() + fragment.getLength();
     if (LOG.isDebugEnabled()) {
       LOG.debug("RawFileScanner open:" + fragment.getPath() + ", offset :" +
-          fragment.getStartKey() + ", file size :" + fileSize);
+          fragment.getStartKey() + ", fragment length :" + fragment.getLength());
     }
   }
 
   @Override
   public long getNextOffset() throws IOException {
-    return channel.position() - reader.remainForRead();
+    return filePosition - reader.remainForRead();
   }
 
   @Override
   public void seek(long offset) throws IOException {
     channel.seek(offset);
+    filePosition = channel.position();
+    tupleBuffer.getMemory().clear();
     fetchNeeded = true;
   }
 
-  public boolean next(OffHeapRowBlock rowblock) throws IOException {
-    return rowblock.copyFromChannel(channel, tableStats);
+  public boolean next(RowBlock rowblock) throws IOException {
+    long reamin = reader.remainForRead();
+    boolean ret = rowblock.copyFromChannel(channel);
+    reader = rowblock.getReader();
+    filePosition += rowblock.getMemory().writerPosition() - reamin;
+    return ret;
   }
 
   private boolean fetchNeeded = true;
 
   @Override
   public Tuple next() throws IOException {
-    if(eof) {
+    if(eos) {
       return null;
     }
 
@@ -136,13 +146,15 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner
         if (!next(tupleBuffer)) {
           return null;
         }
-        reader.reset();
       }
 
       fetchNeeded = !reader.next(unSafeTuple);
 
       if (!fetchNeeded) {
         recordCount++;
+        if(filePosition - reader.remainForRead() >= endOffset){
+          eos = true;
+        }
         return unSafeTuple;
       }
     }
@@ -151,8 +163,9 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner
   @Override
   public void reset() throws IOException {
     // reload initial buffer
-    seek(0);
-    eof = false;
+    filePosition = fragment.getStartKey();
+    seek(filePosition);
+    eos = false;
     reader.reset();
   }
 
@@ -162,8 +175,10 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner
       tableStats.setReadBytes(fileSize);
       tableStats.setNumRows(recordCount);
     }
-    tupleBuffer.release();
-    tupleBuffer = null;
+    if(tupleBuffer != null) {
+      tupleBuffer.release();
+      tupleBuffer = null;
+    }
     reader = null;
 
     IOUtils.cleanup(LOG, channel);
@@ -201,7 +216,7 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner
         tableStats.setReadBytes(filePos);
       }
 
-      if(eof || channel == null) {
+      if(eos || channel == null) {
         tableStats.setReadBytes(fileSize);
         return 1.0f;
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
index 912649f..03642a7 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
@@ -28,38 +28,37 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
 import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.tuple.BaseTupleBuilder;
-import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
-import org.apache.tajo.tuple.offheap.UnSafeTuple;
+import org.apache.tajo.storage.FileAppender;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.storage.TableStatistics;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.memory.MemoryRowBlock;
 import org.apache.tajo.unit.StorageUnit;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 
 public class DirectRawFileWriter extends FileAppender {
   public static final String FILE_EXTENSION = "draw";
+  private static final int BUFFER_SIZE = 64 * StorageUnit.KB;
   private static final Log LOG = LogFactory.getLog(DirectRawFileWriter.class);
 
   private FileChannel channel;
   private RandomAccessFile randomAccessFile;
   private FSDataOutputStream fos;
-  private TajoDataTypes.DataType[] columnTypes;
   private boolean isLocal;
   private long pos;
 
   private TableStatistics stats;
   private ShuffleType shuffleType;
-
-  private BaseTupleBuilder builder;
+  private MemoryRowBlock memoryRowBlock;
 
   public DirectRawFileWriter(Configuration conf, TaskAttemptId taskAttemptId,
                              final Schema schema, final TableMeta meta, final Path path) throws IOException {
@@ -90,12 +89,6 @@ public class DirectRawFileWriter extends FileAppender {
       isLocal = false;
     }
 
-    pos = 0;
-    columnTypes = new TajoDataTypes.DataType[schema.size()];
-    for (int i = 0; i < schema.size(); i++) {
-      columnTypes[i] = schema.getColumn(i).getDataType();
-    }
-
     if (enabledStats) {
       this.stats = new TableStatistics(this.schema);
       this.shuffleType = PlannerUtil.getShuffleType(
@@ -103,52 +96,27 @@ public class DirectRawFileWriter extends FileAppender {
               PlannerUtil.getShuffleType(ShuffleType.NONE_SHUFFLE)));
     }
 
-    builder = new BaseTupleBuilder(schema);
-
+    memoryRowBlock = new MemoryRowBlock(SchemaUtil.toDataTypes(schema), BUFFER_SIZE, true);
+    pos = 0;
     super.init();
   }
 
   @Override
   public long getOffset() throws IOException {
-    return pos;
+    return pos + memoryRowBlock.getMemory().writerPosition();
   }
 
-  private long getFilePosition() throws IOException {
-    if (isLocal) {
-      return channel.position();
+  public void writeRowBlock(MemoryRowBlock rowBlock) throws IOException {
+    if(isLocal) {
+      pos += rowBlock.getMemory().writeTo(channel);
     } else {
-      return fos.getPos();
+      pos += rowBlock.getMemory().writeTo(fos);
     }
-  }
 
-  public void writeRowBlock(OffHeapRowBlock rowBlock) throws IOException {
-    write(rowBlock.nioBuffer());
-    if (enabledStats) {
-      stats.incrementRows(rowBlock.rows());
-    }
-
-    pos = getFilePosition();
-  }
+    rowBlock.getMemory().clear();
 
-  private ByteBuffer buffer;
-  private void ensureSize(int size) throws IOException {
-    if (buffer.remaining() < size) {
-
-      buffer.limit(buffer.position());
-      buffer.flip();
-      write(buffer);
-
-      buffer.clear();
-    }
-  }
-
-  private void write(ByteBuffer buffer) throws IOException {
-    if(isLocal) {
-      channel.write(buffer);
-    } else {
-      byte[] bytes = new byte[buffer.remaining()];
-      buffer.get(bytes);
-      fos.write(bytes);
+    if (enabledStats) {
+      stats.incrementRows(rowBlock.rows() - stats.getNumRows());
     }
   }
 
@@ -161,43 +129,24 @@ public class DirectRawFileWriter extends FileAppender {
       }
     }
 
-    if (buffer == null) {
-      buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB);
-    }
-
-    UnSafeTuple unSafeTuple;
+    memoryRowBlock.getWriter().addTuple(t);
 
-    if (!(t instanceof UnSafeTuple)) {
-      RowStoreUtil.convert(t, builder);
-      unSafeTuple = builder.buildToZeroCopyTuple();
-    } else {
-      unSafeTuple = (UnSafeTuple) t;
-    }
-
-    ByteBuffer bb = unSafeTuple.nioBuffer();
-    ensureSize(bb.limit());
-    buffer.put(bb);
-
-    pos = getFilePosition() + (buffer.limit() - buffer.remaining());
-
-    if (enabledStats) {
-      stats.incrementRow();
+    if(memoryRowBlock.getMemory().readableBytes() >= BUFFER_SIZE) {
+      writeRowBlock(memoryRowBlock);
     }
   }
 
   @Override
   public void flush() throws IOException {
-    if (buffer != null) {
-      buffer.limit(buffer.position());
-      buffer.flip();
-      write(buffer);
-      buffer.clear();
+    if(memoryRowBlock.getMemory().isReadable()) {
+      writeRowBlock(memoryRowBlock);
     }
   }
 
   @Override
   public void close() throws IOException {
     flush();
+
     if (enabledStats) {
       stats.setNumBytes(getOffset());
     }
@@ -206,6 +155,7 @@ public class DirectRawFileWriter extends FileAppender {
     }
 
     IOUtils.cleanup(LOG, channel, randomAccessFile, fos);
+    memoryRowBlock.release();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/34062ea8/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
index 3c667bf..2456907 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
@@ -29,7 +29,7 @@ import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class ByteBufLineReader implements Closeable {
-  public static int DEFAULT_BUFFER = 64 * 1024;
+  public static final int DEFAULT_BUFFER = 64 * 1024;
 
   private int bufferSize;
   private long readBytes;
@@ -100,7 +100,7 @@ public class ByteBufLineReader implements Closeable {
       tailBytes = this.buffer.writerIndex();
       if (!this.buffer.isWritable()) {
         // a line bytes is large than the buffer
-        BufferPool.ensureWritable(buffer, bufferSize * 2);
+        this.buffer = BufferPool.ensureWritable(buffer, bufferSize * 2);
         this.bufferSize = buffer.capacity();
       }
       this.startIndex = 0;


Mime
View raw message