tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [2/6] TAJO-907: Implement off-heap tuple block and zero-copy tuple.
Date Tue, 16 Sep 2014 11:06:09 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
new file mode 100644
index 0000000..4a9313f
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
@@ -0,0 +1,63 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.tuple.RowBlockReader;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+
+public class OffHeapRowBlockReader implements RowBlockReader<ZeroCopyTuple> {
+  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+  OffHeapRowBlock rowBlock;
+
+  // Read States
+  private int curRowIdxForRead;
+  private int curPosForRead;
+
+  public OffHeapRowBlockReader(OffHeapRowBlock rowBlock) {
+    this.rowBlock = rowBlock;
+  }
+
+  public long remainForRead() {
+    return rowBlock.memorySize - curPosForRead;
+  }
+
+  @Override
+  public boolean next(ZeroCopyTuple tuple) {
+    if (curRowIdxForRead < rowBlock.rows()) {
+
+      long recordStartPtr = rowBlock.address() + curPosForRead;
+      int recordLen = UNSAFE.getInt(recordStartPtr);
+      tuple.set(rowBlock.buffer, curPosForRead, recordLen, rowBlock.dataTypes);
+
+      curPosForRead += recordLen;
+      curRowIdxForRead++;
+
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public void reset() {
+    curPosForRead = 0;
+    curRowIdxForRead = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
new file mode 100644
index 0000000..147f7be
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.collect.Lists;
+import org.apache.tajo.storage.Tuple;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+public class OffHeapRowBlockUtils {
+
+  public static List<Tuple> sort(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) {
+    List<Tuple> tupleList = Lists.newArrayList();
+    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+    while(reader.next(zcTuple)) {
+      tupleList.add(zcTuple);
+      zcTuple = new ZeroCopyTuple();
+    }
+    Collections.sort(tupleList, comparator);
+    return tupleList;
+  }
+
+  public static Tuple [] sortToArray(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) {
+    Tuple [] tuples = new Tuple[rowBlock.rows()];
+    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+    for (int i = 0; i < rowBlock.rows() && reader.next(zcTuple); i++) {
+      tuples[i] = zcTuple;
+      zcTuple = new ZeroCopyTuple();
+    }
+    Arrays.sort(tuples, comparator);
+    return tuples;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
new file mode 100644
index 0000000..d177e0c
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.common.TajoDataTypes;
+
+public class OffHeapRowBlockWriter extends OffHeapRowWriter {
+  OffHeapRowBlock rowBlock;
+
+  OffHeapRowBlockWriter(OffHeapRowBlock rowBlock) {
+    super(rowBlock.dataTypes);
+    this.rowBlock = rowBlock;
+  }
+
+  public long address() {
+    return rowBlock.address();
+  }
+
+  public int position() {
+    return rowBlock.position();
+  }
+
+  @Override
+  public void forward(int length) {
+    rowBlock.position(position() + length);
+  }
+
+  public void ensureSize(int size) {
+    rowBlock.ensureSize(size);
+  }
+
+  @Override
+  public void endRow() {
+    super.endRow();
+    rowBlock.setRows(rowBlock.rows() + 1);
+  }
+
+  @Override
+  public TajoDataTypes.DataType[] dataTypes() {
+    return rowBlock.dataTypes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
new file mode 100644
index 0000000..31bc206
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.util.SizeOf;
+import sun.misc.Unsafe;
+
+/**
+ *
+ * Row Record Structure
+ *
+ * | row length (4 bytes) | field 1 offset | field 2 offset | ... | field N offset| field 1 | field 2| ... | field N |
+ *                              4 bytes          4 bytes               4 bytes
+ *
+ */
+public abstract class OffHeapRowWriter implements RowWriter {
+  /** record size + offset list */
+  private final int headerSize;
+  /** field offsets */
+  private final int [] fieldOffsets;
+  private final TajoDataTypes.DataType [] dataTypes;
+
+  private int curFieldIdx;
+  private int curOffset;
+
+  public OffHeapRowWriter(final TajoDataTypes.DataType [] dataTypes) {
+    this.dataTypes = dataTypes;
+    fieldOffsets = new int[dataTypes.length];
+    headerSize = SizeOf.SIZE_OF_INT * (dataTypes.length + 1);
+  }
+
+  public void clear() {
+    curOffset = 0;
+    curFieldIdx = 0;
+  }
+
+  public long recordStartAddr() {
+    return address() + position();
+  }
+
+  public abstract long address();
+
+  public abstract void ensureSize(int size);
+
+  public int offset() {
+    return curOffset;
+  }
+
+  /**
+   * Current position
+   *
+   * @return The position
+   */
+  public abstract int position();
+
+  /**
+   * Forward the address;
+   *
+   * @param length Length to be forwarded
+   */
+  public abstract void forward(int length);
+
+  @Override
+  public TajoDataTypes.DataType[] dataTypes() {
+    return dataTypes;
+  }
+
+  public boolean startRow() {
+    curOffset = headerSize;
+    curFieldIdx = 0;
+    return true;
+  }
+
+  public void endRow() {
+    long rowHeaderPos = address() + position();
+    OffHeapMemory.UNSAFE.putInt(rowHeaderPos, curOffset);
+    rowHeaderPos += SizeOf.SIZE_OF_INT;
+
+    for (int i = 0; i < curFieldIdx; i++) {
+      OffHeapMemory.UNSAFE.putInt(rowHeaderPos, fieldOffsets[i]);
+      rowHeaderPos += SizeOf.SIZE_OF_INT;
+    }
+    for (int i = curFieldIdx; i < dataTypes.length; i++) {
+      OffHeapMemory.UNSAFE.putInt(rowHeaderPos, OffHeapRowBlock.NULL_FIELD_OFFSET);
+      rowHeaderPos += SizeOf.SIZE_OF_INT;
+    }
+
+    // rowOffset is equivalent to a byte length of this row.
+    forward(curOffset);
+  }
+
+  public void skipField() {
+    fieldOffsets[curFieldIdx++] = OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  private void forwardField() {
+    fieldOffsets[curFieldIdx++] = curOffset;
+  }
+
+  public void putBool(boolean val) {
+    ensureSize(SizeOf.SIZE_OF_BOOL);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putByte(recordStartAddr() + curOffset, (byte) (val ? 0x01 : 0x00));
+
+    curOffset += SizeOf.SIZE_OF_BOOL;
+  }
+
+  public void putInt2(short val) {
+    ensureSize(SizeOf.SIZE_OF_SHORT);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putShort(recordStartAddr() + curOffset, val);
+    curOffset += SizeOf.SIZE_OF_SHORT;
+  }
+
+  public void putInt4(int val) {
+    ensureSize(SizeOf.SIZE_OF_INT);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, val);
+    curOffset += SizeOf.SIZE_OF_INT;
+  }
+
+  public void putInt8(long val) {
+    ensureSize(SizeOf.SIZE_OF_LONG);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putLong(recordStartAddr() + curOffset, val);
+    curOffset += SizeOf.SIZE_OF_LONG;
+  }
+
+  public void putFloat4(float val) {
+    ensureSize(SizeOf.SIZE_OF_FLOAT);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putFloat(recordStartAddr() + curOffset, val);
+    curOffset += SizeOf.SIZE_OF_FLOAT;
+  }
+
+  public void putFloat8(double val) {
+    ensureSize(SizeOf.SIZE_OF_DOUBLE);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putDouble(recordStartAddr() + curOffset, val);
+    curOffset += SizeOf.SIZE_OF_DOUBLE;
+  }
+
+  public void putText(String val) {
+    byte[] bytes = val.getBytes(TextDatum.DEFAULT_CHARSET);
+    putText(bytes);
+  }
+
+  public void putText(byte[] val) {
+    int bytesLen = val.length;
+
+    ensureSize(SizeOf.SIZE_OF_INT + bytesLen);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen);
+    curOffset += SizeOf.SIZE_OF_INT;
+
+    OffHeapMemory.UNSAFE.copyMemory(val, Unsafe.ARRAY_BYTE_BASE_OFFSET, null, recordStartAddr() + curOffset, bytesLen);
+    curOffset += bytesLen;
+  }
+
+  public void putBlob(byte[] val) {
+    int bytesLen = val.length;
+
+    ensureSize(SizeOf.SIZE_OF_INT + bytesLen);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen);
+    curOffset += SizeOf.SIZE_OF_INT;
+
+    OffHeapMemory.UNSAFE.copyMemory(val, Unsafe.ARRAY_BYTE_BASE_OFFSET, null, recordStartAddr() + curOffset, bytesLen);
+    curOffset += bytesLen;
+  }
+
+  public void putTimestamp(long val) {
+    putInt8(val);
+  }
+
+  public void putDate(int val) {
+    putInt4(val);
+  }
+
+  public void putTime(long val) {
+    putInt8(val);
+  }
+
+  public void putInterval(IntervalDatum val) {
+    ensureSize(SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG);
+    forwardField();
+
+    long offset = recordStartAddr() + curOffset;
+    OffHeapMemory.UNSAFE.putInt(offset, val.getMonths());
+    offset += SizeOf.SIZE_OF_INT;
+    OffHeapMemory.UNSAFE.putLong(offset, val.getMilliSeconds());
+    curOffset += SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG;
+  }
+
+  public void putInet4(int val) {
+    putInt4(val);
+  }
+
+  public void putProtoDatum(ProtobufDatum val) {
+    putBlob(val.asByteArray());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
new file mode 100644
index 0000000..14e67b2
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.FileUtil;
+
+/**
+ * It specifies the maximum size or increasing ratio. In addition,
+ * it guarantees that all numbers are less than or equal to Integer.MAX_VALUE 2^31
+ * due to ByteBuffer.
+ */
+public class ResizableLimitSpec {
+  private final Log LOG = LogFactory.getLog(ResizableLimitSpec.class);
+
+  public static final int MAX_SIZE_BYTES = Integer.MAX_VALUE;
+  public static final ResizableLimitSpec DEFAULT_LIMIT = new ResizableLimitSpec(Integer.MAX_VALUE);
+
+  private final long initSize;
+  private final long limitBytes;
+  private final float incRatio;
+  private final float allowedOVerflowRatio;
+  private final static float DEFAULT_ALLOWED_OVERFLOW_RATIO = 0.1f;
+  private final static float DEFAULT_INCREASE_RATIO = 1.0f;
+
+  public ResizableLimitSpec(long initSize) {
+    this(initSize, MAX_SIZE_BYTES, DEFAULT_ALLOWED_OVERFLOW_RATIO);
+  }
+
+  public ResizableLimitSpec(long initSize, long limitBytes) {
+    this(initSize, limitBytes, DEFAULT_ALLOWED_OVERFLOW_RATIO);
+  }
+
+  public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflow) {
+    this(initSize, limitBytes, allowedOverflow, DEFAULT_INCREASE_RATIO);
+  }
+
+  public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflowRatio, float incRatio) {
+    Preconditions.checkArgument(initSize > 0, "initial size must be greater than 0 bytes.");
+    Preconditions.checkArgument(initSize <= MAX_SIZE_BYTES, "The maximum initial size is 2GB.");
+    Preconditions.checkArgument(limitBytes > 0, "The limit size must be greater than 0 bytes.");
+    Preconditions.checkArgument(limitBytes <= MAX_SIZE_BYTES, "The maximum limit size is 2GB.");
+    Preconditions.checkArgument(incRatio > 0.0f, "Increase Ratio must be greater than 0.");
+
+    if (initSize == limitBytes) {
+      long overflowedSize = (long) (initSize + (initSize * allowedOverflowRatio));
+
+      if (overflowedSize > Integer.MAX_VALUE) {
+        overflowedSize = Integer.MAX_VALUE;
+      }
+
+      this.initSize = overflowedSize;
+      this.limitBytes = overflowedSize;
+    } else {
+      this.initSize = initSize;
+      limitBytes = (long) (limitBytes + (limitBytes * allowedOverflowRatio));
+
+      if (limitBytes > Integer.MAX_VALUE) {
+        this.limitBytes = Integer.MAX_VALUE;
+      } else {
+        this.limitBytes = limitBytes;
+      }
+    }
+
+    this.allowedOVerflowRatio = allowedOverflowRatio;
+    this.incRatio = incRatio;
+  }
+
+  public long initialSize() {
+    return initSize;
+  }
+
+  public long limit() {
+    return limitBytes;
+  }
+
+  public float remainRatio(long currentSize) {
+    Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
+    if (currentSize > Integer.MAX_VALUE) {
+      currentSize = Integer.MAX_VALUE;
+    }
+    return (float)currentSize / (float)limitBytes;
+  }
+
+  public boolean canIncrease(long currentSize) {
+    return remain(currentSize) > 0;
+  }
+
+  public long remain(long currentSize) {
+    Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
+    return limitBytes > Integer.MAX_VALUE ? Integer.MAX_VALUE - currentSize : limitBytes - currentSize;
+  }
+
+  public int increasedSize(int currentSize) {
+    if (currentSize < initSize) {
+      return (int) initSize;
+    }
+
+    if (currentSize > Integer.MAX_VALUE) {
+      LOG.warn("Current size already exceeds the maximum size (" + Integer.MAX_VALUE + " bytes)");
+      return Integer.MAX_VALUE;
+    }
+    long nextSize = (long) (currentSize + ((float) currentSize * incRatio));
+
+    if (nextSize > limitBytes) {
+      LOG.info("Increasing reaches size limit (" + FileUtil.humanReadableByteCount(limitBytes, false) + ")");
+      nextSize = limitBytes;
+    }
+
+    if (nextSize > Integer.MAX_VALUE) {
+      LOG.info("Increasing reaches maximum size (" + FileUtil.humanReadableByteCount(Integer.MAX_VALUE, false) + ")");
+      nextSize = Integer.MAX_VALUE;
+    }
+
+    return (int) nextSize;
+  }
+
+  @Override
+  public String toString() {
+    return "init=" + FileUtil.humanReadableByteCount(initSize, false) + ",limit="
+        + FileUtil.humanReadableByteCount(limitBytes, false) + ",overflow_ratio=" + allowedOVerflowRatio
+        + ",inc_ratio=" + incRatio;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
new file mode 100644
index 0000000..59f8d1b
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
@@ -0,0 +1,73 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+
+/**
+ * The call sequence should be as follows:
+ *
+ * <pre>
+ *   startRow() -->  skipField() or putXXX --> endRow()
+ * </pre>
+ *
+ * The total number of skipField and putXXX invocations must be equivalent to the number of fields.
+ */
+public interface RowWriter {
+
+  public TajoDataTypes.DataType [] dataTypes();
+
+  public boolean startRow();
+
+  public void endRow();
+
+  public void skipField();
+
+  public void putBool(boolean val);
+
+  public void putInt2(short val);
+
+  public void putInt4(int val);
+
+  public void putInt8(long val);
+
+  public void putFloat4(float val);
+
+  public void putFloat8(double val);
+
+  public void putText(String val);
+
+  public void putText(byte [] val);
+
+  public void putBlob(byte[] val);
+
+  public void putTimestamp(long val);
+
+  public void putTime(long val);
+
+  public void putDate(int val);
+
+  public void putInterval(IntervalDatum val);
+
+  public void putInet4(int val);
+
+  public void putProtoDatum(ProtobufDatum datum);
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
new file mode 100644
index 0000000..7ba753b
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
@@ -0,0 +1,308 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.sun.tools.javac.util.Convert;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.UnsafeUtil;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
+import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public abstract class UnSafeTuple implements Tuple {
+  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+  private DirectBuffer bb;
+  private int relativePos;
+  private int length;
+  private DataType [] types;
+
+  protected void set(ByteBuffer bb, int relativePos, int length, DataType [] types) {
+    this.bb = (DirectBuffer) bb;
+    this.relativePos = relativePos;
+    this.length = length;
+    this.types = types;
+  }
+
+  void set(ByteBuffer bb, DataType [] types) {
+    set(bb, 0, bb.limit(), types);
+  }
+
+  @Override
+  public int size() {
+    return types.length;
+  }
+
+  public ByteBuffer nioBuffer() {
+    return ((ByteBuffer)((ByteBuffer)bb).duplicate().position(relativePos).limit(relativePos + length)).slice();
+  }
+
+  public HeapTuple toHeapTuple() {
+    byte [] bytes = new byte[length];
+    UNSAFE.copyMemory(null, bb.address() + relativePos, bytes, Unsafe.ARRAY_BYTE_BASE_OFFSET, length);
+    return new HeapTuple(bytes, types);
+  }
+
+  public void copyFrom(UnSafeTuple tuple) {
+    Preconditions.checkNotNull(tuple);
+
+    ((ByteBuffer) bb).clear();
+    if (length < tuple.length) {
+      UnsafeUtil.free((ByteBuffer) bb);
+      bb = (DirectBuffer) ByteBuffer.allocateDirect(tuple.length).order(ByteOrder.nativeOrder());
+      this.relativePos = 0;
+      this.length = tuple.length;
+    }
+
+    ((ByteBuffer) bb).put(tuple.nioBuffer());
+  }
+
+  private int getFieldOffset(int fieldId) {
+    return UNSAFE.getInt(bb.address() + relativePos + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
+  }
+
+  public long getFieldAddr(int fieldId) {
+    int fieldOffset = getFieldOffset(fieldId);
+    if (fieldOffset == -1) {
+      throw new RuntimeException("Invalid Field Access: " + fieldId);
+    }
+    return bb.address() + relativePos + fieldOffset;
+  }
+
+  @Override
+  public boolean contains(int fieldid) {
+    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public boolean isNull(int fieldid) {
+    return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public boolean isNotNull(int fieldid) {
+    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public void clear() {
+    // nothing to do
+  }
+
+  @Override
+  public void put(int fieldId, Datum value) {
+    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum).");
+  }
+
+  @Override
+  public void put(int fieldId, Datum[] values) {
+    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
+  }
+
+  @Override
+  public void put(int fieldId, Tuple tuple) {
+    throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
+  }
+
+  @Override
+  public void put(Datum[] values) {
+    throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
+  }
+
+  @Override
+  public Datum get(int fieldId) {
+    if (isNull(fieldId)) {
+      return NullDatum.get();
+    }
+
+    switch (types[fieldId].getType()) {
+    case BOOLEAN:
+      return DatumFactory.createBool(getBool(fieldId));
+    case INT1:
+    case INT2:
+      return DatumFactory.createInt2(getInt2(fieldId));
+    case INT4:
+      return DatumFactory.createInt4(getInt4(fieldId));
+    case INT8:
+      return DatumFactory.createInt8(getInt4(fieldId));
+    case FLOAT4:
+      return DatumFactory.createFloat4(getFloat4(fieldId));
+    case FLOAT8:
+      return DatumFactory.createFloat8(getFloat8(fieldId));
+    case TEXT:
+      return DatumFactory.createText(getText(fieldId));
+    case TIMESTAMP:
+      return DatumFactory.createTimestamp(getInt8(fieldId));
+    case DATE:
+      return DatumFactory.createDate(getInt4(fieldId));
+    case TIME:
+      return DatumFactory.createTime(getInt8(fieldId));
+    case INTERVAL:
+      return getInterval(fieldId);
+    case INET4:
+      return DatumFactory.createInet4(getInt4(fieldId));
+    case PROTOBUF:
+      return getProtobufDatum(fieldId);
+    default:
+      throw new UnsupportedException("Unknown type: " + types[fieldId]);
+    }
+  }
+
+  @Override
+  public void setOffset(long offset) {
+  }
+
+  @Override
+  public long getOffset() {
+    return 0;
+  }
+
+  @Override
+  public boolean getBool(int fieldId) {
+    return UNSAFE.getByte(getFieldAddr(fieldId)) == 0x01;
+  }
+
+  @Override
+  public byte getByte(int fieldId) {
+    return UNSAFE.getByte(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public char getChar(int fieldId) {
+    return UNSAFE.getChar(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public byte[] getBytes(int fieldId) {
+    long pos = getFieldAddr(fieldId);
+    int len = UNSAFE.getInt(pos);
+    pos += SizeOf.SIZE_OF_INT;
+
+    byte [] bytes = new byte[len];
+    UNSAFE.copyMemory(null, pos, bytes, UNSAFE.ARRAY_BYTE_BASE_OFFSET, len);
+    return bytes;
+  }
+
+  @Override
+  public short getInt2(int fieldId) {
+    long addr = getFieldAddr(fieldId);
+    return UNSAFE.getShort(addr);
+  }
+
+  @Override
+  public int getInt4(int fieldId) {
+    return UNSAFE.getInt(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public long getInt8(int fieldId) {
+    return UNSAFE.getLong(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public float getFloat4(int fieldId) {
+    return UNSAFE.getFloat(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public double getFloat8(int fieldId) {
+    return UNSAFE.getDouble(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public String getText(int fieldId) {
+    long pos = getFieldAddr(fieldId);
+    int len = UNSAFE.getInt(pos);
+    pos += SizeOf.SIZE_OF_INT;
+
+    byte [] bytes = new byte[len];
+    UNSAFE.copyMemory(null, pos, bytes, UNSAFE.ARRAY_BYTE_BASE_OFFSET, len);
+    return new String(bytes);
+  }
+
+  public IntervalDatum getInterval(int fieldId) {
+    long pos = getFieldAddr(fieldId);
+    int months = UNSAFE.getInt(pos);
+    pos += SizeOf.SIZE_OF_INT;
+    long millisecs = UNSAFE.getLong(pos);
+    return new IntervalDatum(months, millisecs);
+  }
+
+  @Override
+  public Datum getProtobufDatum(int fieldId) {
+    byte [] bytes = getBytes(fieldId);
+
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode());
+    Message.Builder builder = factory.newBuilder();
+    try {
+      builder.mergeFrom(bytes);
+    } catch (InvalidProtocolBufferException e) {
+      return NullDatum.get();
+    }
+
+    return new ProtobufDatum(builder.build());
+  }
+
+  @Override
+  public char[] getUnicodeChars(int fieldId) {
+    long pos = getFieldAddr(fieldId);
+    int len = UNSAFE.getInt(pos);
+    pos += SizeOf.SIZE_OF_INT;
+
+    byte [] bytes = new byte[len];
+    UNSAFE.copyMemory(null, pos, bytes, UNSAFE.ARRAY_BYTE_BASE_OFFSET, len);
+    return Convert.utf2chars(bytes);
+  }
+
+  @Override
+  public Tuple clone() throws CloneNotSupportedException {
+    return toHeapTuple();
+  }
+
+  @Override
+  public Datum[] getValues() {
+    Datum [] datums = new Datum[size()];
+    for (int i = 0; i < size(); i++) {
+      if (contains(i)) {
+        datums[i] = get(i);
+      } else {
+        datums[i] = NullDatum.get();
+      }
+    }
+    return datums;
+  }
+
+  @Override
+  public String toString() {
+    return VTuple.toDisplayString(getValues());
+  }
+
+  public abstract void release();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
new file mode 100644
index 0000000..73e1e2f
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
@@ -0,0 +1,99 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.UnsignedLongs;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+
+import java.nio.ByteOrder;
+
+/**
+ * It directly access UTF bytes in UnSafeTuple without any copy. It is used by compiled TupleComparator.
+ */
+public class UnSafeTupleBytesComparator {
+  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+  static final boolean littleEndian =
+      ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
+
+  public static int compare(long ptr1, long ptr2) {
+    int lstrLen = UNSAFE.getInt(ptr1);
+    int rstrLen = UNSAFE.getInt(ptr2);
+
+    ptr1 += SizeOf.SIZE_OF_INT;
+    ptr2 += SizeOf.SIZE_OF_INT;
+
+    int minLength = Math.min(lstrLen, rstrLen);
+    int minWords = minLength / Longs.BYTES;
+
+        /*
+         * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a
+         * time is no slower than comparing 4 bytes at a time even on 32-bit.
+         * On the other hand, it is substantially faster on 64-bit.
+         */
+    for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
+      long lw = UNSAFE.getLong(ptr1);
+      long rw = UNSAFE.getLong(ptr2);
+      long diff = lw ^ rw;
+
+      if (diff != 0) {
+        if (!littleEndian) {
+          return UnsignedLongs.compare(lw, rw);
+        }
+
+        // Use binary search
+        int n = 0;
+        int y;
+        int x = (int) diff;
+        if (x == 0) {
+          x = (int) (diff >>> 32);
+          n = 32;
+        }
+
+        y = x << 16;
+        if (y == 0) {
+          n += 16;
+        } else {
+          x = y;
+        }
+
+        y = x << 8;
+        if (y == 0) {
+          n += 8;
+        }
+        return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
+      }
+
+      ptr1 += SizeOf.SIZE_OF_LONG;
+      ptr2 += SizeOf.SIZE_OF_LONG;
+    }
+
+    // The epilogue to cover the last (minLength % 8) elements.
+    for (int i = minWords * Longs.BYTES; i < minLength; i++) {
+      int result = UNSAFE.getByte(ptr1++) - UNSAFE.getByte(ptr2++);
+      if (result != 0) {
+        return result;
+      }
+    }
+    return lstrLen - rstrLen;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
new file mode 100644
index 0000000..51dbb29
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class ZeroCopyTuple extends UnSafeTuple {
+
+  public void set(ByteBuffer bb, int relativePos, int length, DataType [] types) {
+    super.set(bb, relativePos, length, types);
+  }
+
+  @Override
+  public void release() {
+    // nothing to do
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/proto/IndexProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/proto/IndexProtos.proto b/tajo-storage/src/main/proto/IndexProtos.proto
index bcb0cbe..f5c8a08 100644
--- a/tajo-storage/src/main/proto/IndexProtos.proto
+++ b/tajo-storage/src/main/proto/IndexProtos.proto
@@ -25,5 +25,7 @@ option java_generate_equals_and_hash = true;
 import "CatalogProtos.proto";
 
 message TupleComparatorProto {
-  repeated TupleComparatorSpecProto compSpecs = 1;
+  required SchemaProto schema = 1;
+  repeated SortSpecProto sortSpecs = 2;
+  repeated TupleComparatorSpecProto compSpecs = 3;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/resources/storage-default.xml b/tajo-storage/src/main/resources/storage-default.xml
index 4669477..b227e9d 100644
--- a/tajo-storage/src/main/resources/storage-default.xml
+++ b/tajo-storage/src/main/resources/storage-default.xml
@@ -40,7 +40,7 @@
   <!--- Registered Scanner Handler -->
   <property>
     <name>tajo.storage.scanner-handler</name>
-    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
+    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro,directraw</value>
   </property>
 
   <!--- Fragment Class Configurations -->
@@ -53,6 +53,10 @@
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
   <property>
+    <name>tajo.storage.fragment.directraw.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
     <name>tajo.storage.fragment.rcfile.class</name>
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
@@ -94,6 +98,11 @@
   </property>
 
   <property>
+    <name>tajo.storage.scanner-handler.directraw.class</name>
+    <value>org.apache.tajo.storage.rawfile.DirectRawFileScanner</value>
+  </property>
+
+  <property>
     <name>tajo.storage.scanner-handler.v2.raw.class</name>
     <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
   </property>
@@ -175,6 +184,11 @@
   </property>
 
   <property>
+    <name>tajo.storage.appender-handler.directraw.class</name>
+    <value>org.apache.tajo.storage.rawfile.DirectRawFileWriter</value>
+  </property>
+
+  <property>
     <name>tajo.storage.appender-handler.rcfile.class</name>
     <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value>
   </property>

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
index ab7c2b2..639ca04 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
@@ -69,7 +69,7 @@ public class TestTupleComparator {
     SortSpec sortKey1 = new SortSpec(schema.getColumn("col4"), true, false);
     SortSpec sortKey2 = new SortSpec(schema.getColumn("col5"), true, false);
 
-    TupleComparator tc = new TupleComparator(schema,
+    BaseTupleComparator tc = new BaseTupleComparator(schema,
         new SortSpec[] {sortKey1, sortKey2});
     assertEquals(-1, tc.compare(tuple1, tuple2));
     assertEquals(1, tc.compare(tuple2, tuple1));

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
index 54798a4..29ce226 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@ -74,7 +74,8 @@ public class TestBSTIndex {
   public static Collection<Object[]> generateParameters() {
     return Arrays.asList(new Object[][]{
         {StoreType.CSV},
-        {StoreType.RAW}
+        {StoreType.RAW},
+        {StoreType.DIRECTRAW}
     });
   }
 
@@ -91,15 +92,15 @@ public class TestBSTIndex {
     Path tablePath = new Path(testDir, "testFindValue_" + storeType);
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.init();
-    Tuple tuple;
+    Tuple key;
     for (int i = 0; i < TUPLE_NUM; i++) {
-      tuple = new VTuple(5);
-      tuple.put(0, DatumFactory.createInt4(i));
-      tuple.put(1, DatumFactory.createInt8(i));
-      tuple.put(2, DatumFactory.createFloat8(i));
-      tuple.put(3, DatumFactory.createFloat4(i));
-      tuple.put(4, DatumFactory.createText("field_" + i));
-      appender.addTuple(tuple);
+      key = new VTuple(5);
+      key.put(0, DatumFactory.createInt4(i));
+      key.put(1, DatumFactory.createInt8(i));
+      key.put(2, DatumFactory.createFloat8(i));
+      key.put(3, DatumFactory.createFloat4(i));
+      key.put(4, DatumFactory.createText("field_" + i));
+      appender.addTuple(key);
     }
     appender.close();
 
@@ -115,7 +116,7 @@ public class TestBSTIndex {
     keySchema.addColumn(new Column("long", Type.INT8));
     keySchema.addColumn(new Column("double", Type.FLOAT8));
 
-    TupleComparator comp = new TupleComparator(keySchema, sortKeys);
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
 
     BSTIndex bst = new BSTIndex(conf);
     BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValue_" + storeType + ".idx"),
@@ -132,11 +133,11 @@ public class TestBSTIndex {
     while (true) {
       keyTuple = new VTuple(2);
       offset = scanner.getNextOffset();
-      tuple = scanner.next();
-      if (tuple == null) break;
+      key = scanner.next();
+      if (key == null) break;
 
-      keyTuple.put(0, tuple.get(1));
-      keyTuple.put(1, tuple.get(2));
+      keyTuple.put(0, key.get(1));
+      keyTuple.put(1, key.get(2));
       creater.write(keyTuple, offset);
     }
 
@@ -144,29 +145,29 @@ public class TestBSTIndex {
     creater.close();
     scanner.close();
 
-    tuple = new VTuple(keySchema.size());
+    key = new VTuple(keySchema.size());
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + storeType + ".idx"), keySchema, comp);
     reader.open();
     scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
     scanner.init();
 
     for (int i = 0; i < TUPLE_NUM - 1; i++) {
-      tuple.put(0, DatumFactory.createInt8(i));
-      tuple.put(1, DatumFactory.createFloat8(i));
-      long offsets = reader.find(tuple);
+      key.put(0, DatumFactory.createInt8(i));
+      key.put(1, DatumFactory.createFloat8(i));
+      long offsets = reader.find(key);
       scanner.seek(offsets);
-      tuple = scanner.next();
-      assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8()));
-      assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8()));
+      Tuple found = scanner.next();
+      assertTrue("seek check [" + (i) + " ," + (found.get(1).asInt8()) + "]", (i) == (found.get(1).asInt8()));
+      assertTrue("seek check [" + (i) + " ," + (found.get(2).asFloat8()) + "]", (i) == (found.get(2).asFloat8()));
 
       offsets = reader.next();
       if (offsets == -1) {
         continue;
       }
       scanner.seek(offsets);
-      tuple = scanner.next();
-      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4()));
-      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8()));
+      found = scanner.next();
+      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (found.get(0).asInt4()));
+      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (found.get(1).asInt8()));
     }
     reader.close();
     scanner.close();
@@ -189,7 +190,7 @@ public class TestBSTIndex {
     keySchema.addColumn(new Column("long", Type.INT8));
     keySchema.addColumn(new Column("double", Type.FLOAT8));
 
-    TupleComparator comp = new TupleComparator(keySchema, sortKeys);
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
 
     BSTIndex bst = new BSTIndex(conf);
     BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"),
@@ -197,19 +198,19 @@ public class TestBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    Tuple tuple;
+    Tuple key;
     long offset;
     for (int i = 0; i < TUPLE_NUM; i++) {
-      tuple = new VTuple(5);
-      tuple.put(0, DatumFactory.createInt4(i));
-      tuple.put(1, DatumFactory.createInt8(i));
-      tuple.put(2, DatumFactory.createFloat8(i));
-      tuple.put(3, DatumFactory.createFloat4(i));
-      tuple.put(4, DatumFactory.createText("field_" + i));
+      key = new VTuple(5);
+      key.put(0, DatumFactory.createInt4(i));
+      key.put(1, DatumFactory.createInt8(i));
+      key.put(2, DatumFactory.createFloat8(i));
+      key.put(3, DatumFactory.createFloat4(i));
+      key.put(4, DatumFactory.createText("field_" + i));
 
       offset = appender.getOffset();
-      appender.addTuple(tuple);
-      creater.write(tuple, offset);
+      appender.addTuple(key);
+      creater.write(key, offset);
     }
     appender.flush();
     appender.close();
@@ -222,7 +223,7 @@ public class TestBSTIndex {
     long fileLen = status.getLen();
     FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
 
-    tuple = new VTuple(keySchema.size());
+    key = new VTuple(keySchema.size());
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"),
         keySchema, comp);
     reader.open();
@@ -230,22 +231,23 @@ public class TestBSTIndex {
     scanner.init();
 
     for (int i = 0; i < TUPLE_NUM - 1; i++) {
-      tuple.put(0, DatumFactory.createInt8(i));
-      tuple.put(1, DatumFactory.createFloat8(i));
-      long offsets = reader.find(tuple);
+      key.put(0, DatumFactory.createInt8(i));
+      key.put(1, DatumFactory.createFloat8(i));
+      long offsets = reader.find(key);
       scanner.seek(offsets);
-      tuple = scanner.next();
-      assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(1).asInt8()));
-      assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(2).asFloat8()));
+
+      Tuple found = scanner.next();
+      assertTrue("[seek check " + (i) + " ]", (i) == (found.get(1).asInt8()));
+      assertTrue("[seek check " + (i) + " ]", (i) == (found.get(2).asFloat8()));
 
       offsets = reader.next();
       if (offsets == -1) {
         continue;
       }
       scanner.seek(offsets);
-      tuple = scanner.next();
-      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4()));
-      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8()));
+      found = scanner.next();
+      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (found.get(0).asInt4()));
+      assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (found.get(1).asInt8()));
     }
     reader.close();
     scanner.close();
@@ -281,7 +283,7 @@ public class TestBSTIndex {
     keySchema.addColumn(new Column("long", Type.INT8));
     keySchema.addColumn(new Column("double", Type.FLOAT8));
 
-    TupleComparator comp = new TupleComparator(keySchema, sortKeys);
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
 
     BSTIndex bst = new BSTIndex(conf);
     BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"),
@@ -336,6 +338,7 @@ public class TestBSTIndex {
       tuple.put(2, DatumFactory.createFloat8(i));
       tuple.put(3, DatumFactory.createFloat4(i));
       tuple.put(4, DatumFactory.createText("field_" + i));
+
       appender.addTuple(tuple);
     }
     appender.close();
@@ -352,7 +355,7 @@ public class TestBSTIndex {
     keySchema.addColumn(new Column("int", Type.INT4));
     keySchema.addColumn(new Column("long", Type.INT8));
 
-    TupleComparator comp = new TupleComparator(keySchema, sortKeys);
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
 
     BSTIndex bst = new BSTIndex(conf);
     BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"),
@@ -442,7 +445,7 @@ public class TestBSTIndex {
     keySchema.addColumn(new Column("int", Type.INT4));
     keySchema.addColumn(new Column("long", Type.INT8));
 
-    TupleComparator comp = new TupleComparator(keySchema, sortKeys);
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
 
     BSTIndex bst = new BSTIndex(conf);
     BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"),
@@ -522,7 +525,7 @@ public class TestBSTIndex {
     keySchema.addColumn(new Column("long", Type.INT8));
     keySchema.addColumn(new Column("double", Type.FLOAT8));
 
-    TupleComparator comp = new TupleComparator(keySchema, sortKeys);
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
     BSTIndex bst = new BSTIndex(conf);
     BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindMinValue_" + storeType + ".idx"),
         BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
@@ -604,7 +607,7 @@ public class TestBSTIndex {
     keySchema.addColumn(new Column("int", Type.INT4));
     keySchema.addColumn(new Column("long", Type.INT8));
 
-    TupleComparator comp = new TupleComparator(keySchema, sortKeys);
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
 
     BSTIndex bst = new BSTIndex(conf);
     BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testMinMax_" + storeType + ".idx"),
@@ -709,7 +712,7 @@ public class TestBSTIndex {
     keySchema.addColumn(new Column("int", Type.INT4));
     keySchema.addColumn(new Column("long", Type.INT8));
 
-    TupleComparator comp = new TupleComparator(keySchema, sortKeys);
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
 
     BSTIndex bst = new BSTIndex(conf);
     BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"),
@@ -765,15 +768,15 @@ public class TestBSTIndex {
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.init();
 
-    Tuple tuple;
+    Tuple key;
     for (int i = (TUPLE_NUM - 1); i >= 0; i--) {
-      tuple = new VTuple(5);
-      tuple.put(0, DatumFactory.createInt4(i));
-      tuple.put(1, DatumFactory.createInt8(i));
-      tuple.put(2, DatumFactory.createFloat8(i));
-      tuple.put(3, DatumFactory.createFloat4(i));
-      tuple.put(4, DatumFactory.createText("field_" + i));
-      appender.addTuple(tuple);
+      key = new VTuple(5);
+      key.put(0, DatumFactory.createInt4(i));
+      key.put(1, DatumFactory.createInt8(i));
+      key.put(2, DatumFactory.createFloat8(i));
+      key.put(3, DatumFactory.createFloat4(i));
+      key.put(4, DatumFactory.createText("field_" + i));
+      appender.addTuple(key);
     }
     appender.close();
 
@@ -789,7 +792,7 @@ public class TestBSTIndex {
     keySchema.addColumn(new Column("long", Type.INT8));
     keySchema.addColumn(new Column("double", Type.FLOAT8));
 
-    TupleComparator comp = new TupleComparator(keySchema, sortKeys);
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
 
 
     BSTIndex bst = new BSTIndex(conf);
@@ -806,11 +809,11 @@ public class TestBSTIndex {
     while (true) {
       keyTuple = new VTuple(2);
       offset = scanner.getNextOffset();
-      tuple = scanner.next();
-      if (tuple == null) break;
+      key = scanner.next();
+      if (key == null) break;
 
-      keyTuple.put(0, tuple.get(1));
-      keyTuple.put(1, tuple.get(2));
+      keyTuple.put(0, key.get(1));
+      keyTuple.put(1, key.get(2));
       creater.write(keyTuple, offset);
     }
 
@@ -818,7 +821,7 @@ public class TestBSTIndex {
     creater.close();
     scanner.close();
 
-    tuple = new VTuple(keySchema.size());
+    key = new VTuple(keySchema.size());
 
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"),
         keySchema, comp);
@@ -827,22 +830,22 @@ public class TestBSTIndex {
     scanner.init();
 
     for (int i = (TUPLE_NUM - 1); i > 0; i--) {
-      tuple.put(0, DatumFactory.createInt8(i));
-      tuple.put(1, DatumFactory.createFloat8(i));
-      long offsets = reader.find(tuple);
+      key.put(0, DatumFactory.createInt8(i));
+      key.put(1, DatumFactory.createFloat8(i));
+      long offsets = reader.find(key);
       scanner.seek(offsets);
-      tuple = scanner.next();
-      assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8()));
-      assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8()));
+      Tuple found = scanner.next();
+      assertTrue("seek check [" + (i) + " ," + (found.get(1).asInt8()) + "]", (i) == (found.get(1).asInt8()));
+      assertTrue("seek check [" + (i) + " ," + (found.get(2).asFloat8()) + "]", (i) == (found.get(2).asFloat8()));
 
       offsets = reader.next();
       if (offsets == -1) {
         continue;
       }
       scanner.seek(offsets);
-      tuple = scanner.next();
-      assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(0).asInt4()));
-      assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(1).asInt8()));
+      found = scanner.next();
+      assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (found.get(0).asInt4()));
+      assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (found.get(1).asInt8()));
     }
     reader.close();
     scanner.close();
@@ -880,7 +883,7 @@ public class TestBSTIndex {
     keySchema.addColumn(new Column("int", Type.INT4));
     keySchema.addColumn(new Column("long", Type.INT8));
 
-    TupleComparator comp = new TupleComparator(keySchema, sortKeys);
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
 
     BSTIndex bst = new BSTIndex(conf);
     BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir,

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
index 53a2531..85c0334 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
@@ -104,7 +104,7 @@ public class TestSingleCSVFileBSTIndex {
     keySchema.addColumn(new Column("long", Type.INT8));
     keySchema.addColumn(new Column("double", Type.FLOAT8));
 
-    TupleComparator comp = new TupleComparator(keySchema, sortKeys);
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
 
     BSTIndex bst = new BSTIndex(conf);
     BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir,
@@ -193,7 +193,7 @@ public class TestSingleCSVFileBSTIndex {
     keySchema.addColumn(new Column("int", Type.INT4));
     keySchema.addColumn(new Column("long", Type.INT8));
 
-    TupleComparator comp = new TupleComparator(keySchema, sortKeys);
+    BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
     
     BSTIndex bst = new BSTIndex(conf);
     BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindNextKeyValueInCSV.idx"),

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java b/tajo-storage/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java
new file mode 100644
index 0000000..8d2f00d
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java
@@ -0,0 +1,318 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.raw;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlockReader;
+import org.apache.tajo.tuple.offheap.TestOffHeapRowBlock;
+import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
+import org.apache.tajo.storage.rawfile.DirectRawFileScanner;
+import org.apache.tajo.storage.rawfile.DirectRawFileWriter;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.FileUtil;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.tuple.offheap.TestOffHeapRowBlock.*;
+import static org.junit.Assert.*;
+
+public class TestDirectRawFile {
+  private static final Log LOG = LogFactory.getLog(TestDirectRawFile.class);
+
+  public static Path writeRowBlock(TajoConf conf, TableMeta meta, OffHeapRowBlock rowBlock, Path outputFile)
+      throws IOException {
+    DirectRawFileWriter writer = new DirectRawFileWriter(conf, schema, meta, outputFile);
+    writer.init();
+    writer.writeRowBlock(rowBlock);
+    writer.close();
+
+    FileSystem fs = FileSystem.getLocal(conf);
+    FileStatus status = fs.getFileStatus(outputFile);
+    assertTrue(status.getLen() > 0);
+    LOG.info("Written file size: " + FileUtil.humanReadableByteCount(status.getLen(), false));
+
+    return outputFile;
+  }
+
+  public static Path writeRowBlock(TajoConf conf, TableMeta meta, OffHeapRowBlock rowBlock) throws IOException {
+    Path testDir = CommonTestingUtil.getTestDir();
+    Path outputFile = new Path(testDir, "output.draw");
+    return writeRowBlock(conf, meta, rowBlock, outputFile);
+  }
+
+  //@Test
+  public void testRWForAllTypes() throws IOException {
+    int rowNum = 50000;
+
+    OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(rowNum);
+    TajoConf conf = new TajoConf();
+    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.DIRECTRAW);
+    Path outputFile = writeRowBlock(conf, meta, rowBlock);
+    rowBlock.release();
+
+
+    FileSystem fs = FileSystem.getLocal(conf);
+    FileStatus status = fs.getFileStatus(outputFile);
+    assertTrue(status.getLen() > 0);
+    LOG.info("Written file size: " + FileUtil.humanReadableByteCount(status.getLen(), false));
+
+
+    OffHeapRowBlock readBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 1);
+    OffHeapRowBlockReader blockReader = new OffHeapRowBlockReader(readBlock);
+    DirectRawFileScanner reader = new DirectRawFileScanner(conf, schema, meta, outputFile);
+    reader.init();
+
+    long readStart = System.currentTimeMillis();
+    ZeroCopyTuple tuple = new ZeroCopyTuple();
+    int j = 0;
+
+    while(reader.next(readBlock)) {
+      blockReader.reset();
+      while (blockReader.next(tuple)) {
+        TestOffHeapRowBlock.validateTupleResult(j, tuple);
+        j++;
+      }
+    }
+    LOG.info("Total read rows: " + j);
+    long readEnd = System.currentTimeMillis();
+    LOG.info("reading takes " + (readEnd - readStart) + " msec");
+    reader.close();
+    readBlock.release();
+
+    assertEquals(rowNum, j);
+  }
+
+  @Test
+  public void testRWForAllTypesWithNextTuple() throws IOException {
+    int rowNum = 10000;
+
+    OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(rowNum);
+
+    TajoConf conf = new TajoConf();
+    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.DIRECTRAW);
+    Path outputFile = writeRowBlock(conf, meta, rowBlock);
+    rowBlock.release();
+
+    DirectRawFileScanner reader = new DirectRawFileScanner(conf, schema, meta, outputFile);
+    reader.init();
+
+    long readStart = System.currentTimeMillis();
+    int j = 0;
+    Tuple tuple;
+    while ((tuple = reader.next()) != null) {
+      TestOffHeapRowBlock.validateTupleResult(j, tuple);
+      j++;
+    }
+
+    LOG.info("Total read rows: " + j);
+    long readEnd = System.currentTimeMillis();
+    LOG.info("reading takes " + (readEnd - readStart) + " msec");
+    reader.close();
+    assertEquals(rowNum, j);
+  }
+
+  @Test
+  public void testRepeatedScan() throws IOException {
+    int rowNum = 2;
+
+    OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(rowNum);
+
+    TajoConf conf = new TajoConf();
+    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.DIRECTRAW);
+    Path outputFile = writeRowBlock(conf, meta, rowBlock);
+
+    rowBlock.release();
+
+    DirectRawFileScanner reader = new DirectRawFileScanner(conf, schema, meta, outputFile);
+    reader.init();
+
+    int j = 0;
+    while (reader.next() != null) {
+      j++;
+    }
+    assertEquals(rowNum, j);
+
+    for (int i = 0; i < 5; i++) {
+      assertNull(reader.next());
+    }
+
+    reader.close();
+  }
+
+  @Test
+  public void testReset() throws IOException {
+    int rowNum = 2;
+
+    OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(rowNum);
+
+    TajoConf conf = new TajoConf();
+    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.DIRECTRAW);
+    Path outputFile = writeRowBlock(conf, meta, rowBlock);
+    rowBlock.release();
+
+    DirectRawFileScanner reader = new DirectRawFileScanner(conf, schema, meta, outputFile);
+    reader.init();
+
+    int j = 0;
+    while (reader.next() != null) {
+      j++;
+    }
+    assertEquals(rowNum, j);
+
+    for (int i = 0; i < 5; i++) {
+      assertNull(reader.next());
+    }
+
+    reader.reset();
+
+    j = 0;
+    while (reader.next() != null) {
+      j++;
+    }
+    assertEquals(rowNum, j);
+
+    for (int i = 0; i < 5; i++) {
+      assertNull(reader.next());
+    }
+    reader.close();
+  }
+
+  //@Test
+  public void testRWWithAddTupleForAllTypes() throws IOException {
+    int rowNum = 10;
+
+    OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(rowNum);
+    OffHeapRowBlockReader blockReader = new OffHeapRowBlockReader(rowBlock);
+
+    Path testDir = CommonTestingUtil.getTestDir();
+    Path outputFile = new Path(testDir, "output.draw");
+    TajoConf conf = new TajoConf();
+    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.DIRECTRAW);
+    DirectRawFileWriter writer = new DirectRawFileWriter(conf, schema, meta, outputFile);
+    writer.init();
+
+    blockReader.reset();
+    int i = 0;
+    ZeroCopyTuple tuple = new ZeroCopyTuple();
+    while(blockReader.next(tuple)) {
+      writer.addTuple(tuple);
+    }
+    writer.close();
+    rowBlock.release();
+
+
+    FileSystem fs = FileSystem.getLocal(conf);
+    FileStatus status = fs.getFileStatus(outputFile);
+    assertTrue(status.getLen() > 0);
+    LOG.info("Written file size: " + FileUtil.humanReadableByteCount(status.getLen(), false));
+
+    OffHeapRowBlock readBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 1);
+    OffHeapRowBlockReader blockReader2 = new OffHeapRowBlockReader(readBlock);
+    DirectRawFileScanner reader = new DirectRawFileScanner(conf, schema, meta, outputFile);
+    reader.init();
+
+    long readStart = System.currentTimeMillis();
+    tuple = new ZeroCopyTuple();
+    int j = 0;
+    while(reader.next(readBlock)) {
+      blockReader2.reset();
+      while (blockReader2.next(tuple)) {
+        TestOffHeapRowBlock.validateTupleResult(j, tuple);
+        j++;
+      }
+    }
+    LOG.info("Total read rows: " + j);
+    long readEnd = System.currentTimeMillis();
+    LOG.info("reading takes " + (readEnd - readStart) + " msec");
+    reader.close();
+    readBlock.release();
+
+    assertEquals(rowNum, j);
+  }
+
+  //@Test
+  public void testNullityValidation() throws IOException {
+    int rowNum = 1000;
+
+    long allocateStart = System.currentTimeMillis();
+    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024);
+    long allocatedEnd = System.currentTimeMillis();
+    LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated "
+        + (allocatedEnd - allocateStart) + " msec");
+
+    long writeStart = System.currentTimeMillis();
+    for (int i = 0; i < rowNum; i++) {
+      fillRowBlockWithNull(i, rowBlock.getWriter());
+    }
+    long writeEnd = System.currentTimeMillis();
+    LOG.info("writing and nullity validating take " + (writeEnd - writeStart) +" msec");
+
+    Path testDir = CommonTestingUtil.getTestDir();
+    Path outputFile = new Path(testDir, "output.draw");
+    TajoConf conf = new TajoConf();
+    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.DIRECTRAW);
+    DirectRawFileWriter writer = new DirectRawFileWriter(conf, schema, meta, outputFile);
+    writer.init();
+    writer.writeRowBlock(rowBlock);
+    writer.close();
+    rowBlock.release();
+
+
+    FileSystem fs = FileSystem.getLocal(conf);
+    FileStatus status = fs.getFileStatus(outputFile);
+    assertTrue(status.getLen() > 0);
+    LOG.info("Written file size: " + FileUtil.humanReadableByteCount(status.getLen(), false));
+
+    OffHeapRowBlock readBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 1);
+    OffHeapRowBlockReader blockReader = new OffHeapRowBlockReader(rowBlock);
+    DirectRawFileScanner reader = new DirectRawFileScanner(conf, schema, meta, outputFile);
+    reader.init();
+
+    long readStart = System.currentTimeMillis();
+    ZeroCopyTuple tuple = new ZeroCopyTuple();
+    int j = 0;
+
+    do {
+      blockReader.reset();
+      while (blockReader.next(tuple)) {
+        validateNullity(j, tuple);
+        j++;
+      }
+    } while(reader.next(readBlock));
+    LOG.info("Total read rows: " + j);
+    long readEnd = System.currentTimeMillis();
+    LOG.info("reading takes " + (readEnd - readStart) + " msec");
+    reader.close();
+    readBlock.release();
+
+    assertEquals(rowNum, j);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
index 357dadb..6366141 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
@@ -219,7 +219,7 @@ public class TestStorages {
           || storeType == StoreType.CSV
           || storeType == StoreType.PARQUET
           || storeType == StoreType.AVRO) {
-        assertTrue(tuple.get(0) == null || tuple.get(0) instanceof NullDatum);
+        assertTrue(tuple.get(0) == null || tuple.get(0).isNull());
       }
       assertTrue(tupleCnt + 2 == tuple.get(1).asInt8());
       assertTrue(tupleCnt + 3 == tuple.get(2).asFloat4());

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java b/tajo-storage/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java
new file mode 100644
index 0000000..b332364
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.tuple.offheap.*;
+import org.junit.Test;
+
+public class TestBaseTupleBuilder {
+
+  @Test
+  public void testBuild() {
+    BaseTupleBuilder builder = new BaseTupleBuilder(TestOffHeapRowBlock.schema);
+
+    OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(10248);
+    OffHeapRowBlockReader reader = rowBlock.getReader();
+
+    ZeroCopyTuple inputTuple = new ZeroCopyTuple();
+
+    HeapTuple heapTuple = null;
+    ZeroCopyTuple zcTuple = null;
+    int i = 0;
+    while(reader.next(inputTuple)) {
+      RowStoreUtil.convert(inputTuple, builder);
+
+      heapTuple = builder.buildToHeapTuple();
+      TestOffHeapRowBlock.validateTupleResult(i, heapTuple);
+
+      zcTuple = builder.buildToZeroCopyTuple();
+      TestOffHeapRowBlock.validateTupleResult(i, zcTuple);
+
+      i++;
+    }
+  }
+
+  @Test
+  public void testBuildWithNull() {
+    BaseTupleBuilder builder = new BaseTupleBuilder(TestOffHeapRowBlock.schema);
+
+    OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlockWithNull(10248);
+    OffHeapRowBlockReader reader = rowBlock.getReader();
+
+    ZeroCopyTuple inputTuple = new ZeroCopyTuple();
+
+    HeapTuple heapTuple = null;
+    ZeroCopyTuple zcTuple = null;
+    int i = 0;
+    while(reader.next(inputTuple)) {
+      RowStoreUtil.convert(inputTuple, builder);
+
+      heapTuple = builder.buildToHeapTuple();
+      TestOffHeapRowBlock.validateNullity(i, heapTuple);
+
+      zcTuple = builder.buildToZeroCopyTuple();
+      TestOffHeapRowBlock.validateNullity(i, zcTuple);
+
+      i++;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java b/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java
new file mode 100644
index 0000000..96f465a
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java
@@ -0,0 +1,45 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.catalog.SchemaUtil;
+import org.junit.Test;
+
+public class TestHeapTuple {
+
+  @Test
+  public void testHeapTuple() {
+    OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(1024);
+
+    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+
+    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+    int i = 0;
+    while (reader.next(zcTuple)) {
+      byte [] bytes = new byte[zcTuple.nioBuffer().limit()];
+      zcTuple.nioBuffer().get(bytes);
+
+      HeapTuple heapTuple = new HeapTuple(bytes, SchemaUtil.toDataTypes(TestOffHeapRowBlock.schema));
+      TestOffHeapRowBlock.validateTupleResult(i, heapTuple);
+      i++;
+    }
+
+    rowBlock.release();
+  }
+}
\ No newline at end of file


Mime
View raw message