tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hj...@apache.org
Subject [15/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project structure.
Date Wed, 03 Dec 2014 05:30:30 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java b/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
deleted file mode 100644
index c43ba38..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
+++ /dev/null
@@ -1,577 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.ProtobufDatum;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.storage.BaseTupleComparator;
-import org.apache.tajo.storage.RowStoreUtil;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.unit.StorageUnit;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.ProtoUtil;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.tajo.common.TajoDataTypes.Type;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestOffHeapRowBlock {
-  private static final Log LOG = LogFactory.getLog(TestOffHeapRowBlock.class);
-  public static String UNICODE_FIELD_PREFIX = "abc_가나다_";
-  public static Schema schema;
-
-  static {
-    schema = new Schema();
-    schema.addColumn("col0", Type.BOOLEAN);
-    schema.addColumn("col1", Type.INT2);
-    schema.addColumn("col2", Type.INT4);
-    schema.addColumn("col3", Type.INT8);
-    schema.addColumn("col4", Type.FLOAT4);
-    schema.addColumn("col5", Type.FLOAT8);
-    schema.addColumn("col6", Type.TEXT);
-    schema.addColumn("col7", Type.TIMESTAMP);
-    schema.addColumn("col8", Type.DATE);
-    schema.addColumn("col9", Type.TIME);
-    schema.addColumn("col10", Type.INTERVAL);
-    schema.addColumn("col11", Type.INET4);
-    schema.addColumn("col12",
-        CatalogUtil.newDataType(TajoDataTypes.Type.PROTOBUF, PrimitiveProtos.StringProto.class.getName()));
-  }
-
-  private void explainRowBlockAllocation(OffHeapRowBlock rowBlock, long startTime, long endTime) {
-    LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated "
-        + (endTime - startTime) + " msec");
-  }
-
-  @Test
-  public void testPutAndReadValidation() {
-    int rowNum = 1000;
-
-    long allocStart = System.currentTimeMillis();
-    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024);
-    long allocEnd = System.currentTimeMillis();
-    explainRowBlockAllocation(rowBlock, allocStart, allocEnd);
-
-    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
-
-    ZeroCopyTuple tuple = new ZeroCopyTuple();
-    long writeStart = System.currentTimeMillis();
-    for (int i = 0; i < rowNum; i++) {
-      fillRow(i, rowBlock.getWriter());
-
-      reader.reset();
-      int j = 0;
-      while(reader.next(tuple)) {
-        validateTupleResult(j, tuple);
-
-        j++;
-      }
-    }
-    long writeEnd = System.currentTimeMillis();
-    LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec");
-
-    long readStart = System.currentTimeMillis();
-    tuple = new ZeroCopyTuple();
-    int j = 0;
-    reader.reset();
-    while(reader.next(tuple)) {
-      validateTupleResult(j, tuple);
-      j++;
-    }
-    long readEnd = System.currentTimeMillis();
-    LOG.info("reading takes " + (readEnd - readStart) + " msec");
-
-    rowBlock.release();
-  }
-
-  @Test
-  public void testNullityValidation() {
-    int rowNum = 1000;
-
-    long allocStart = System.currentTimeMillis();
-    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024);
-    long allocEnd = System.currentTimeMillis();
-    explainRowBlockAllocation(rowBlock, allocStart, allocEnd);
-
-    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
-    ZeroCopyTuple tuple = new ZeroCopyTuple();
-    long writeStart = System.currentTimeMillis();
-    for (int i = 0; i < rowNum; i++) {
-      fillRowBlockWithNull(i, rowBlock.getWriter());
-
-      reader.reset();
-      int j = 0;
-      while(reader.next(tuple)) {
-        validateNullity(j, tuple);
-
-        j++;
-      }
-    }
-    long writeEnd = System.currentTimeMillis();
-    LOG.info("writing and nullity validating take " + (writeEnd - writeStart) +" msec");
-
-    long readStart = System.currentTimeMillis();
-    tuple = new ZeroCopyTuple();
-    int j = 0;
-    reader.reset();
-    while(reader.next(tuple)) {
-      validateNullity(j, tuple);
-
-      j++;
-    }
-    long readEnd = System.currentTimeMillis();
-    LOG.info("reading takes " + (readEnd - readStart) + " msec");
-
-    rowBlock.release();
-  }
-
-  @Test
-  public void testEmptyRow() {
-    int rowNum = 1000;
-
-    long allocStart = System.currentTimeMillis();
-    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 10);
-    long allocEnd = System.currentTimeMillis();
-    explainRowBlockAllocation(rowBlock, allocStart, allocEnd);
-
-    long writeStart = System.currentTimeMillis();
-    for (int i = 0; i < rowNum; i++) {
-      rowBlock.getWriter().startRow();
-      // empty columns
-      rowBlock.getWriter().endRow();
-    }
-    long writeEnd = System.currentTimeMillis();
-    LOG.info("writing tooks " + (writeEnd - writeStart) + " msec");
-
-    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
-
-    long readStart = System.currentTimeMillis();
-    ZeroCopyTuple tuple = new ZeroCopyTuple();
-    int j = 0;
-    reader.reset();
-    while(reader.next(tuple)) {
-      j++;
-    }
-    long readEnd = System.currentTimeMillis();
-    LOG.info("reading takes " + (readEnd - readStart) + " msec");
-    rowBlock.release();
-
-    assertEquals(rowNum, j);
-    assertEquals(rowNum, rowBlock.rows());
-  }
-
-  @Test
-  public void testSortBenchmark() {
-    int rowNum = 1000;
-
-    OffHeapRowBlock rowBlock = createRowBlock(rowNum);
-    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
-
-    List<ZeroCopyTuple> unSafeTuples = Lists.newArrayList();
-
-    long readStart = System.currentTimeMillis();
-    ZeroCopyTuple tuple = new ZeroCopyTuple();
-    reader.reset();
-    while(reader.next(tuple)) {
-      unSafeTuples.add(tuple);
-      tuple = new ZeroCopyTuple();
-    }
-    long readEnd = System.currentTimeMillis();
-    LOG.info("reading takes " + (readEnd - readStart) + " msec");
-
-    SortSpec sortSpec = new SortSpec(new Column("col2", Type.INT4));
-    BaseTupleComparator comparator = new BaseTupleComparator(schema, new SortSpec[] {sortSpec});
-
-    long sortStart = System.currentTimeMillis();
-    Collections.sort(unSafeTuples, comparator);
-    long sortEnd = System.currentTimeMillis();
-    LOG.info("sorting took " + (sortEnd - sortStart) + " msec");
-    rowBlock.release();
-  }
-
-  @Test
-  public void testVTuplePutAndGetBenchmark() {
-    int rowNum = 1000;
-
-    List<VTuple> rowBlock = Lists.newArrayList();
-    long writeStart = System.currentTimeMillis();
-    VTuple tuple;
-    for (int i = 0; i < rowNum; i++) {
-      tuple = new VTuple(schema.size());
-      fillVTuple(i, tuple);
-      rowBlock.add(tuple);
-    }
-    long writeEnd = System.currentTimeMillis();
-    LOG.info("Writing takes " + (writeEnd - writeStart) + " msec");
-
-    long readStart = System.currentTimeMillis();
-    int j = 0;
-    for (VTuple t : rowBlock) {
-      validateTupleResult(j, t);
-      j++;
-    }
-    long readEnd = System.currentTimeMillis();
-    LOG.info("reading takes " + (readEnd - readStart) + " msec");
-
-    int count = 0;
-    for (int l = 0; l < rowBlock.size(); l++) {
-      for(int m = 0; m < schema.size(); m++ ) {
-        if (rowBlock.get(l).contains(m) && rowBlock.get(l).get(m).type() == Type.INT4) {
-          count ++;
-        }
-      }
-    }
-    // For preventing unnecessary code elimination optimization.
-    LOG.info("The number of INT4 values is " + count + ".");
-  }
-
-  @Test
-  public void testVTuplePutAndGetBenchmarkViaDirectRowEncoder() {
-    int rowNum = 1000;
-
-    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 100);
-
-    long writeStart = System.currentTimeMillis();
-    VTuple tuple = new VTuple(schema.size());
-    for (int i = 0; i < rowNum; i++) {
-      fillVTuple(i, tuple);
-
-      RowStoreUtil.convert(tuple, rowBlock.getWriter());
-    }
-    long writeEnd = System.currentTimeMillis();
-    LOG.info("Writing takes " + (writeEnd - writeStart) + " msec");
-
-    validateResults(rowBlock);
-    rowBlock.release();
-  }
-
-  @Test
-  public void testSerDerOfRowBlock() {
-    int rowNum = 1000;
-
-    OffHeapRowBlock rowBlock = createRowBlock(rowNum);
-
-    ByteBuffer bb = rowBlock.nioBuffer();
-    OffHeapRowBlock restoredRowBlock = new OffHeapRowBlock(schema, bb);
-    validateResults(restoredRowBlock);
-    rowBlock.release();
-  }
-
-  @Test
-  public void testSerDerOfZeroCopyTuple() {
-    int rowNum = 1000;
-
-    OffHeapRowBlock rowBlock = createRowBlock(rowNum);
-
-    ByteBuffer bb = rowBlock.nioBuffer();
-    OffHeapRowBlock restoredRowBlock = new OffHeapRowBlock(schema, bb);
-    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(restoredRowBlock);
-
-    long readStart = System.currentTimeMillis();
-    ZeroCopyTuple tuple = new ZeroCopyTuple();
-    ZeroCopyTuple copyTuple = new ZeroCopyTuple();
-    int j = 0;
-    reader.reset();
-    while(reader.next(tuple)) {
-      ByteBuffer copy = tuple.nioBuffer();
-      copyTuple.set(copy, SchemaUtil.toDataTypes(schema));
-
-      validateTupleResult(j, copyTuple);
-
-      j++;
-    }
-    long readEnd = System.currentTimeMillis();
-    LOG.info("reading takes " + (readEnd - readStart) + " msec");
-
-    rowBlock.release();
-  }
-
-  public static OffHeapRowBlock createRowBlock(int rowNum) {
-    long allocateStart = System.currentTimeMillis();
-    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 8);
-    long allocatedEnd = System.currentTimeMillis();
-    LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated "
-        + (allocatedEnd - allocateStart) + " msec");
-
-    long writeStart = System.currentTimeMillis();
-    for (int i = 0; i < rowNum; i++) {
-      fillRow(i, rowBlock.getWriter());
-    }
-    long writeEnd = System.currentTimeMillis();
-    LOG.info("writing takes " + (writeEnd - writeStart) + " msec");
-
-    return rowBlock;
-  }
-
-  public static OffHeapRowBlock createRowBlockWithNull(int rowNum) {
-    long allocateStart = System.currentTimeMillis();
-    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 8);
-    long allocatedEnd = System.currentTimeMillis();
-    LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated "
-        + (allocatedEnd - allocateStart) + " msec");
-
-    long writeStart = System.currentTimeMillis();
-    for (int i = 0; i < rowNum; i++) {
-      fillRowBlockWithNull(i, rowBlock.getWriter());
-    }
-    long writeEnd = System.currentTimeMillis();
-    LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec");
-
-    return rowBlock;
-  }
-
-  public static void fillRow(int i, RowWriter builder) {
-    builder.startRow();
-    builder.putBool(i % 1 == 0 ? true : false); // 0
-    builder.putInt2((short) 1);                 // 1
-    builder.putInt4(i);                         // 2
-    builder.putInt8(i);                         // 3
-    builder.putFloat4(i);                       // 4
-    builder.putFloat8(i);                       // 5
-    builder.putText((UNICODE_FIELD_PREFIX + i).getBytes());  // 6
-    builder.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7
-    builder.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8
-    builder.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9
-    builder.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10
-    builder.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11
-    builder.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12
-    builder.endRow();
-  }
-
-  public static void fillRowBlockWithNull(int i, RowWriter writer) {
-    writer.startRow();
-
-    if (i == 0) {
-      writer.skipField();
-    } else {
-      writer.putBool(i % 1 == 0 ? true : false); // 0
-    }
-    if (i % 1 == 0) {
-      writer.skipField();
-    } else {
-      writer.putInt2((short) 1);                 // 1
-    }
-
-    if (i % 2 == 0) {
-      writer.skipField();
-    } else {
-      writer.putInt4(i);                         // 2
-    }
-
-    if (i % 3 == 0) {
-      writer.skipField();
-    } else {
-      writer.putInt8(i);                         // 3
-    }
-
-    if (i % 4 == 0) {
-      writer.skipField();
-    } else {
-      writer.putFloat4(i);                       // 4
-    }
-
-    if (i % 5 == 0) {
-      writer.skipField();
-    } else {
-      writer.putFloat8(i);                       // 5
-    }
-
-    if (i % 6 == 0) {
-      writer.skipField();
-    } else {
-      writer.putText((UNICODE_FIELD_PREFIX + i).getBytes());  // 6
-    }
-
-    if (i % 7 == 0) {
-      writer.skipField();
-    } else {
-      writer.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7
-    }
-
-    if (i % 8 == 0) {
-      writer.skipField();
-    } else {
-      writer.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8
-    }
-
-    if (i % 9 == 0) {
-      writer.skipField();
-    } else {
-      writer.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9
-    }
-
-    if (i % 10 == 0) {
-      writer.skipField();
-    } else {
-      writer.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10
-    }
-
-    if (i % 11 == 0) {
-      writer.skipField();
-    } else {
-      writer.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11
-    }
-
-    if (i % 12 == 0) {
-      writer.skipField();
-    } else {
-      writer.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12
-    }
-
-    writer.endRow();
-  }
-
-  public static void fillVTuple(int i, VTuple tuple) {
-    tuple.put(0, DatumFactory.createBool(i % 1 == 0));
-    tuple.put(1, DatumFactory.createInt2((short) 1));
-    tuple.put(2, DatumFactory.createInt4(i));
-    tuple.put(3, DatumFactory.createInt8(i));
-    tuple.put(4, DatumFactory.createFloat4(i));
-    tuple.put(5, DatumFactory.createFloat8(i));
-    tuple.put(6, DatumFactory.createText((UNICODE_FIELD_PREFIX + i).getBytes()));
-    tuple.put(7, DatumFactory.createTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i)); // 7
-    tuple.put(8, DatumFactory.createDate(DatumFactory.createDate("2014-04-16").asInt4() + i)); // 8
-    tuple.put(9, DatumFactory.createTime(DatumFactory.createTime("08:48:00").asInt8() + i)); // 9
-    tuple.put(10, DatumFactory.createInterval((i + 1) + " hours")); // 10
-    tuple.put(11, DatumFactory.createInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i)); // 11
-    tuple.put(12, new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12;
-  }
-
-  public static void validateResults(OffHeapRowBlock rowBlock) {
-    long readStart = System.currentTimeMillis();
-    ZeroCopyTuple tuple = new ZeroCopyTuple();
-    int j = 0;
-    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
-    reader.reset();
-    while(reader.next(tuple)) {
-      validateTupleResult(j, tuple);
-      j++;
-    }
-    long readEnd = System.currentTimeMillis();
-    LOG.info("Reading takes " + (readEnd - readStart) + " msec");
-  }
-
-  public static void validateTupleResult(int j, Tuple t) {
-    assertTrue((j % 1 == 0) == t.getBool(0));
-    assertTrue(1 == t.getInt2(1));
-    assertEquals(j, t.getInt4(2));
-    assertEquals(j, t.getInt8(3));
-    assertTrue(j == t.getFloat4(4));
-    assertTrue(j == t.getFloat8(5));
-    assertEquals(new String(UNICODE_FIELD_PREFIX + j), t.getText(6));
-    assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, t.getInt8(7));
-    assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, t.getInt4(8));
-    assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, t.getInt8(9));
-    assertEquals(DatumFactory.createInterval((j + 1) + " hours"), t.getInterval(10));
-    assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, t.getInt4(11));
-    assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), t.getProtobufDatum(12));
-  }
-
-  public static void validateNullity(int j, Tuple tuple) {
-    if (j == 0) {
-      tuple.isNull(0);
-    } else {
-      assertTrue((j % 1 == 0) == tuple.getBool(0));
-    }
-
-    if (j % 1 == 0) {
-      tuple.isNull(1);
-    } else {
-      assertTrue(1 == tuple.getInt2(1));
-    }
-
-    if (j % 2 == 0) {
-      tuple.isNull(2);
-    } else {
-      assertEquals(j, tuple.getInt4(2));
-    }
-
-    if (j % 3 == 0) {
-      tuple.isNull(3);
-    } else {
-      assertEquals(j, tuple.getInt8(3));
-    }
-
-    if (j % 4 == 0) {
-      tuple.isNull(4);
-    } else {
-      assertTrue(j == tuple.getFloat4(4));
-    }
-
-    if (j % 5 == 0) {
-      tuple.isNull(5);
-    } else {
-      assertTrue(j == tuple.getFloat8(5));
-    }
-
-    if (j % 6 == 0) {
-      tuple.isNull(6);
-    } else {
-      assertEquals(new String(UNICODE_FIELD_PREFIX + j), tuple.getText(6));
-    }
-
-    if (j % 7 == 0) {
-      tuple.isNull(7);
-    } else {
-      assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, tuple.getInt8(7));
-    }
-
-    if (j % 8 == 0) {
-      tuple.isNull(8);
-    } else {
-      assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, tuple.getInt4(8));
-    }
-
-    if (j % 9 == 0) {
-      tuple.isNull(9);
-    } else {
-      assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, tuple.getInt8(9));
-    }
-
-    if (j % 10 == 0) {
-      tuple.isNull(10);
-    } else {
-      assertEquals(DatumFactory.createInterval((j + 1) + " hours"), tuple.getInterval(10));
-    }
-
-    if (j % 11 == 0) {
-      tuple.isNull(11);
-    } else {
-      assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, tuple.getInt4(11));
-    }
-
-    if (j % 12 == 0) {
-      tuple.isNull(12);
-    } else {
-      assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), tuple.getProtobufDatum(12));
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java b/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java
deleted file mode 100644
index 1eb9c17..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.unit.StorageUnit;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class TestResizableSpec {
-
-  @Test
-  public void testResizableLimit() {
-    ResizableLimitSpec limit = new ResizableLimitSpec(10 * StorageUnit.MB, 1000 * StorageUnit.MB, 0.1f, 1.0f);
-
-    long expectedMaxSize = (long) (1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f));
-
-    assertTrue(limit.limit() == 1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f));
-
-    assertEquals(20971520, limit.increasedSize(10 * StorageUnit.MB));
-
-    assertEquals(expectedMaxSize, limit.increasedSize(1600 * StorageUnit.MB));
-
-    assertEquals(0.98f, limit.remainRatio(980 * StorageUnit.MB), 0.1);
-
-    assertFalse(limit.canIncrease(limit.limit()));
-  }
-
-  @Test
-  public void testFixedLimit() {
-    FixedSizeLimitSpec limit = new FixedSizeLimitSpec(100 * StorageUnit.MB, 0.0f);
-
-    assertEquals(limit.limit(), 100 * StorageUnit.MB);
-
-    assertEquals(100 * StorageUnit.MB, limit.increasedSize(1000));
-
-    assertEquals(100 * StorageUnit.MB, limit.increasedSize(1600 * StorageUnit.MB));
-
-    assertTrue(0.98f == limit.remainRatio(98 * StorageUnit.MB));
-
-    assertFalse(limit.canIncrease(limit.limit()));
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/storage-default.xml b/tajo-storage/src/test/resources/storage-default.xml
deleted file mode 100644
index 790d5a8..0000000
--- a/tajo-storage/src/test/resources/storage-default.xml
+++ /dev/null
@@ -1,154 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-
-<configuration>
-  <property>
-    <name>fs.s3.impl</name>
-    <value>org.apache.tajo.storage.s3.SmallBlockS3FileSystem</value>
-  </property>
-
-  <!--- Registered Scanner Handler -->
-  <property>
-    <name>tajo.storage.scanner-handler</name>
-    <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value>
-  </property>
-
-  <!--- Fragment Class Configurations -->
-  <property>
-    <name>tajo.storage.fragment.textfile.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.csv.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.raw.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.rcfile.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.row.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.parquet.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.sequencefile.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.avro.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-
-  <!--- Scanner Handler -->
-  <property>
-    <name>tajo.storage.scanner-handler.textfile.class</name>
-    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.csv.class</name>
-    <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.raw.class</name>
-    <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.rcfile.class</name>
-    <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.rowfile.class</name>
-    <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.parquet.class</name>
-    <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.sequencefile.class</name>
-    <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.scanner-handler.avro.class</name>
-    <value>org.apache.tajo.storage.avro.AvroScanner</value>
-  </property>
-
-  <!--- Appender Handler -->
-  <property>
-    <name>tajo.storage.appender-handler</name>
-    <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.textfile.class</name>
-    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.csv.class</name>
-    <value>org.apache.tajo.storage.CSVFile$CSVAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.raw.class</name>
-    <value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.rcfile.class</name>
-    <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.rowfile.class</name>
-    <value>org.apache.tajo.storage.RowFile$RowFileAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.parquet.class</name>
-    <value>org.apache.tajo.storage.parquet.ParquetAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.sequencefile.class</name>
-    <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.appender-handler.avro.class</name>
-    <value>org.apache.tajo.storage.avro.AvroAppender</value>
-  </property>
-</configuration>

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/resources/testVariousTypes.avsc
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/testVariousTypes.avsc b/tajo-storage/src/test/resources/testVariousTypes.avsc
deleted file mode 100644
index 611b97f..0000000
--- a/tajo-storage/src/test/resources/testVariousTypes.avsc
+++ /dev/null
@@ -1,21 +0,0 @@
-{
-  "type": "record",
-  "namespace": "org.apache.tajo",
-  "name": "testVariousTypes",
-  "fields": [
-    { "name": "col1", "type": "boolean" },
-    { "name": "col2", "type": "int" },
-    { "name": "col3", "type": "string" },
-    { "name": "col4", "type": "int" },
-    { "name": "col5", "type": "int" },
-    { "name": "col6", "type": "long" },
-    { "name": "col7", "type": "float" },
-    { "name": "col8", "type": "double" },
-    { "name": "col9", "type": "string" },
-    { "name": "col10", "type": "bytes" },
-    { "name": "col11", "type": "bytes" },
-    { "name": "col12", "type": "null" },
-    { "name": "col13", "type": "bytes" }
-  ]
-}
-

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/pom.xml b/tajo-storage/tajo-storage-common/pom.xml
new file mode 100644
index 0000000..c600b4b
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/pom.xml
@@ -0,0 +1,337 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>tajo-project</artifactId>
+    <groupId>org.apache.tajo</groupId>
+    <version>0.9.1-SNAPSHOT</version>
+    <relativePath>../../tajo-project</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>tajo-storage-common</artifactId>
+  <packaging>jar</packaging>
+  <name>Tajo Storage Common</name>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+  </properties>
+
+  <repositories>
+    <repository>
+      <id>repository.jboss.org</id>
+      <url>https://repository.jboss.org/nexus/content/repositories/releases/
+      </url>
+      <snapshots>
+        <enabled>false</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+          <encoding>${project.build.sourceEncoding}</encoding>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>verify</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemProperties>
+            <tajo.test>TRUE</tajo.test>
+          </systemProperties>
+          <argLine>-Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8</argLine>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.4</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>create-protobuf-generated-sources-directory</id>
+            <phase>initialize</phase>
+            <configuration>
+              <target>
+                <mkdir dir="target/generated-sources/proto" />
+              </target>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>exec-maven-plugin</artifactId>
+        <version>1.2</version>
+        <executions>
+          <execution>
+            <id>generate-sources</id>
+            <phase>generate-sources</phase>
+            <configuration>
+              <executable>protoc</executable>
+              <arguments>
+                <argument>-Isrc/main/proto/</argument>
+                <argument>--proto_path=../../tajo-common/src/main/proto</argument>
+                <argument>--proto_path=../../tajo-catalog/tajo-catalog-common/src/main/proto</argument>
+                <argument>--java_out=target/generated-sources/proto</argument>
+                <argument>src/main/proto/IndexProtos.proto</argument>
+              </arguments>
+            </configuration>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.5</version>
+        <executions>
+          <execution>
+            <id>add-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>target/generated-sources/proto</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-pmd-plugin</artifactId>
+        <version>2.7.1</version>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-catalog-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-plan</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>zookeeper</artifactId>
+          <groupId>org.apache.zookeeper</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>slf4j-api</artifactId>
+          <groupId>org.slf4j</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>jersey-json</artifactId>
+          <groupId>com.sun.jersey</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-el</groupId>
+          <artifactId>commons-el</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-2.1-jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey.jersey-test-framework</groupId>
+          <artifactId>jersey-test-framework-grizzly2</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-el</groupId>
+          <artifactId>commons-el</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-2.1-jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey.jersey-test-framework</groupId>
+          <artifactId>jersey-test-framework-grizzly2</artifactId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-yarn-server-tests</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-mapreduce-client-app</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-yarn-api</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-mapreduce-client-hs</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-mapreduce-client-core</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-buffer</artifactId>
+    </dependency>
+  </dependencies>
+
+  <profiles>
+    <profile>
+      <id>docs</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-javadoc-plugin</artifactId>
+            <executions>
+              <execution>
+                <!-- build javadoc jars per jar for publishing to maven -->
+                <id>module-javadocs</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>jar</goal>
+                </goals>
+                <configuration>
+                  <destDir>${project.build.directory}</destDir>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+  <reporting>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+        <version>2.15</version>
+      </plugin>
+    </plugins>
+  </reporting>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Appender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Appender.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Appender.java
new file mode 100644
index 0000000..c5e96ac
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Appender.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.statistics.TableStats;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface Appender extends Closeable {
+
+  void init() throws IOException;
+
+  void addTuple(Tuple t) throws IOException;
+  
+  void flush() throws IOException;
+
+  long getEstimatedOutputSize() throws IOException;
+  
+  void close() throws IOException;
+
+  void enableStats();
+  
+  TableStats getStats();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
new file mode 100644
index 0000000..b829f60
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.datum.Datum;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.TupleComparatorSpecProto;
+import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
+
+/**
+ * The Comparator class for Tuples
+ * 
+ * @see Tuple
+ */
+public class BaseTupleComparator extends TupleComparator implements ProtoObject<TupleComparatorProto> {
+  private final Schema schema;
+  private final SortSpec [] sortSpecs;
+  private final int[] sortKeyIds;
+  private final boolean[] asc;
+  @SuppressWarnings("unused")
+  private final boolean[] nullFirsts;  
+
+  private Datum left;
+  private Datum right;
+  private int compVal;
+
+  /**
+   * @param schema The schema of input tuples
+   * @param sortKeys The description of sort keys
+   */
+  public BaseTupleComparator(Schema schema, SortSpec[] sortKeys) {
+    Preconditions.checkArgument(sortKeys.length > 0, 
+        "At least one sort key must be specified.");
+
+    this.schema = schema;
+    this.sortSpecs = sortKeys;
+    this.sortKeyIds = new int[sortKeys.length];
+    this.asc = new boolean[sortKeys.length];
+    this.nullFirsts = new boolean[sortKeys.length];
+    for (int i = 0; i < sortKeys.length; i++) {
+      if (sortKeys[i].getSortKey().hasQualifier()) {
+        this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName());
+      } else {
+        this.sortKeyIds[i] = schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName());
+      }
+          
+      this.asc[i] = sortKeys[i].isAscending();
+      this.nullFirsts[i]= sortKeys[i].isNullFirst();
+    }
+  }
+
+  public BaseTupleComparator(TupleComparatorProto proto) {
+    this.schema = new Schema(proto.getSchema());
+
+    this.sortSpecs = new SortSpec[proto.getSortSpecsCount()];
+    for (int i = 0; i < proto.getSortSpecsCount(); i++) {
+      sortSpecs[i] = new SortSpec(proto.getSortSpecs(i));
+    }
+
+    this.sortKeyIds = new int[proto.getCompSpecsCount()];
+    this.asc = new boolean[proto.getCompSpecsCount()];
+    this.nullFirsts = new boolean[proto.getCompSpecsCount()];
+
+    for (int i = 0; i < proto.getCompSpecsCount(); i++) {
+      TupleComparatorSpecProto sortSepcProto = proto.getCompSpecs(i);
+      sortKeyIds[i] = sortSepcProto.getColumnId();
+      asc[i] = sortSepcProto.getAscending();
+      nullFirsts[i] = sortSepcProto.getNullFirst();
+    }
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  public SortSpec [] getSortSpecs() {
+    return sortSpecs;
+  }
+
+  public int [] getSortKeyIds() {
+    return sortKeyIds;
+  }
+
+  @Override
+  public boolean isAscendingFirstKey() {
+    return this.asc[0];
+  }
+
+  @Override
+  public int compare(Tuple tuple1, Tuple tuple2) {
+    for (int i = 0; i < sortKeyIds.length; i++) {
+      left = tuple1.get(sortKeyIds[i]);
+      right = tuple2.get(sortKeyIds[i]);
+
+      if (left.isNull() || right.isNull()) {
+        if (!left.equals(right)) {
+          if (left.isNull()) {
+            compVal = 1;
+          } else if (right.isNull()) {
+            compVal = -1;
+          }
+          if (nullFirsts[i]) {
+            if (compVal != 0) {
+              compVal *= -1;
+            }
+          }
+        } else {
+          compVal = 0;
+        }
+      } else {
+        if (asc[i]) {
+          compVal = left.compareTo(right);
+        } else {
+          compVal = right.compareTo(left);
+        }
+      }
+
+      if (compVal < 0 || compVal > 0) {
+        return compVal;
+      }
+    }
+    return 0;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(sortKeyIds);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof BaseTupleComparator) {
+      BaseTupleComparator other = (BaseTupleComparator) obj;
+      if (sortKeyIds.length != other.sortKeyIds.length) {
+        return false;
+      }
+
+      for (int i = 0; i < sortKeyIds.length; i++) {
+        if (sortKeyIds[i] != other.sortKeyIds[i] ||
+            asc[i] != other.asc[i] ||
+            nullFirsts[i] != other.nullFirsts[i]) {
+          return false;
+        }
+      }
+
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public TupleComparatorProto getProto() {
+    TupleComparatorProto.Builder builder = TupleComparatorProto.newBuilder();
+    builder.setSchema(schema.getProto());
+    for (int i = 0; i < sortSpecs.length; i++) {
+      builder.addSortSpecs(sortSpecs[i].getProto());
+    }
+
+    TupleComparatorSpecProto.Builder sortSpecBuilder;
+    for (int i = 0; i < sortKeyIds.length; i++) {
+      sortSpecBuilder = TupleComparatorSpecProto.newBuilder();
+      sortSpecBuilder.setColumnId(sortKeyIds[i]);
+      sortSpecBuilder.setAscending(asc[i]);
+      sortSpecBuilder.setNullFirst(nullFirsts[i]);
+      builder.addCompSpecs(sortSpecBuilder);
+    }
+
+    return builder.build();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+
+    String prefix = "";
+    for (int i = 0; i < sortKeyIds.length; i++) {
+      sb.append(prefix).append("SortKeyId=").append(sortKeyIds[i])
+        .append(",Asc=").append(asc[i])
+        .append(",NullFirst=").append(nullFirsts[i]);
+      prefix = " ,";
+    }
+    return sb.toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
new file mode 100644
index 0000000..00112e7
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Message;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.util.Bytes;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+@Deprecated
+public class BinarySerializerDeserializer implements SerializerDeserializer {
+
+  static final byte[] INVALID_UTF__SINGLE_BYTE = {(byte) Integer.parseInt("10111111", 2)};
+
+  @Override
+  public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters)
+      throws IOException {
+    byte[] bytes;
+    int length = 0;
+    if (datum == null || datum instanceof NullDatum) {
+      return 0;
+    }
+
+    switch (col.getDataType().getType()) {
+      case BOOLEAN:
+      case BIT:
+      case CHAR:
+        bytes = datum.asByteArray();
+        length = bytes.length;
+        out.write(bytes, 0, length);
+        break;
+      case INT2:
+        length = writeShort(out, datum.asInt2());
+        break;
+      case INT4:
+        length = writeVLong(out, datum.asInt4());
+        break;
+      case INT8:
+        length = writeVLong(out, datum.asInt8());
+        break;
+      case FLOAT4:
+        length = writeFloat(out, datum.asFloat4());
+        break;
+      case FLOAT8:
+        length = writeDouble(out, datum.asFloat8());
+        break;
+      case TEXT: {
+        bytes = datum.asTextBytes();
+        length = datum.size();
+        if (length == 0) {
+          bytes = INVALID_UTF__SINGLE_BYTE;
+          length = INVALID_UTF__SINGLE_BYTE.length;
+        }
+        out.write(bytes, 0, bytes.length);
+        break;
+      }
+      case BLOB:
+      case INET4:
+      case INET6:
+        bytes = datum.asByteArray();
+        length = bytes.length;
+        out.write(bytes, 0, length);
+        break;
+      case PROTOBUF:
+        ProtobufDatum protobufDatum = (ProtobufDatum) datum;
+        bytes = protobufDatum.asByteArray();
+        length = bytes.length;
+        out.write(bytes, 0, length);
+        break;
+      case NULL_TYPE:
+        break;
+      default:
+        throw new IOException("Does not support type");
+    }
+    return length;
+  }
+
+  @Override
+  public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
+    if (length == 0) return NullDatum.get();
+
+    Datum datum;
+    switch (col.getDataType().getType()) {
+      case BOOLEAN:
+        datum = DatumFactory.createBool(bytes[offset]);
+        break;
+      case BIT:
+        datum = DatumFactory.createBit(bytes[offset]);
+        break;
+      case CHAR: {
+        byte[] chars = new byte[length];
+        System.arraycopy(bytes, offset, chars, 0, length);
+        datum = DatumFactory.createChar(chars);
+        break;
+      }
+      case INT2:
+        datum = DatumFactory.createInt2(Bytes.toShort(bytes, offset, length));
+        break;
+      case INT4:
+        datum = DatumFactory.createInt4((int) Bytes.readVLong(bytes, offset));
+        break;
+      case INT8:
+        datum = DatumFactory.createInt8(Bytes.readVLong(bytes, offset));
+        break;
+      case FLOAT4:
+        datum = DatumFactory.createFloat4(toFloat(bytes, offset, length));
+        break;
+      case FLOAT8:
+        datum = DatumFactory.createFloat8(toDouble(bytes, offset, length));
+        break;
+      case TEXT: {
+        byte[] chars = new byte[length];
+        System.arraycopy(bytes, offset, chars, 0, length);
+
+        if (Bytes.equals(INVALID_UTF__SINGLE_BYTE, chars)) {
+          datum = DatumFactory.createText(new byte[0]);
+        } else {
+          datum = DatumFactory.createText(chars);
+        }
+        break;
+      }
+      case PROTOBUF: {
+        ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType().getCode());
+        Message.Builder builder = factory.newBuilder();
+        builder.mergeFrom(bytes, offset, length);
+        datum = factory.createDatum(builder);
+        break;
+      }
+      case INET4:
+        datum = DatumFactory.createInet4(bytes, offset, length);
+        break;
+      case BLOB:
+        datum = DatumFactory.createBlob(bytes, offset, length);
+        break;
+      default:
+        datum = NullDatum.get();
+    }
+    return datum;
+  }
+
+  private byte[] shortBytes = new byte[2];
+
+  public int writeShort(OutputStream out, short val) throws IOException {
+    shortBytes[0] = (byte) (val >> 8);
+    shortBytes[1] = (byte) val;
+    out.write(shortBytes, 0, 2);
+    return 2;
+  }
+
+  public float toFloat(byte[] bytes, int offset, int length) {
+    Preconditions.checkArgument(length == 4);
+
+    int val = ((bytes[offset] & 0x000000FF) << 24) +
+        ((bytes[offset + 1] & 0x000000FF) << 16) +
+        ((bytes[offset + 2] & 0x000000FF) << 8) +
+        (bytes[offset + 3] & 0x000000FF);
+    return Float.intBitsToFloat(val);
+  }
+
+  private byte[] floatBytes = new byte[4];
+
+  public int writeFloat(OutputStream out, float f) throws IOException {
+    int val = Float.floatToIntBits(f);
+
+    floatBytes[0] = (byte) (val >> 24);
+    floatBytes[1] = (byte) (val >> 16);
+    floatBytes[2] = (byte) (val >> 8);
+    floatBytes[3] = (byte) val;
+    out.write(floatBytes, 0, 4);
+    return floatBytes.length;
+  }
+
+  public double toDouble(byte[] bytes, int offset, int length) {
+    Preconditions.checkArgument(length == 8);
+    long val = ((long) (bytes[offset] & 0x00000000000000FF) << 56) +
+        ((long) (bytes[offset + 1] & 0x00000000000000FF) << 48) +
+        ((long) (bytes[offset + 2] & 0x00000000000000FF) << 40) +
+        ((long) (bytes[offset + 3] & 0x00000000000000FF) << 32) +
+        ((long) (bytes[offset + 4] & 0x00000000000000FF) << 24) +
+        ((long) (bytes[offset + 5] & 0x00000000000000FF) << 16) +
+        ((long) (bytes[offset + 6] & 0x00000000000000FF) << 8) +
+        (long) (bytes[offset + 7] & 0x00000000000000FF);
+    return Double.longBitsToDouble(val);
+  }
+
+  private byte[] doubleBytes = new byte[8];
+
+  public int writeDouble(OutputStream out, double d) throws IOException {
+    long val = Double.doubleToLongBits(d);
+
+    doubleBytes[0] = (byte) (val >> 56);
+    doubleBytes[1] = (byte) (val >> 48);
+    doubleBytes[2] = (byte) (val >> 40);
+    doubleBytes[3] = (byte) (val >> 32);
+    doubleBytes[4] = (byte) (val >> 24);
+    doubleBytes[5] = (byte) (val >> 16);
+    doubleBytes[6] = (byte) (val >> 8);
+    doubleBytes[7] = (byte) val;
+    out.write(doubleBytes, 0, 8);
+    return doubleBytes.length;
+  }
+
+  private byte[] vLongBytes = new byte[9];
+
+  public static int writeVLongToByteArray(byte[] bytes, int offset, long l) {
+    if (l >= -112 && l <= 127) {
+      bytes[offset] = (byte) l;
+      return 1;
+    }
+
+    int len = -112;
+    if (l < 0) {
+      l ^= -1L; // take one's complement'
+      len = -120;
+    }
+
+    long tmp = l;
+    while (tmp != 0) {
+      tmp = tmp >> 8;
+      len--;
+    }
+
+    bytes[offset++] = (byte) len;
+    len = (len < -120) ? -(len + 120) : -(len + 112);
+
+    for (int idx = len; idx != 0; idx--) {
+      int shiftbits = (idx - 1) * 8;
+      bytes[offset++] = (byte) ((l & (0xFFL << shiftbits)) >> shiftbits);
+    }
+    return 1 + len;
+  }
+
+  public int writeVLong(OutputStream out, long l) throws IOException {
+    int len = writeVLongToByteArray(vLongBytes, 0, l);
+    out.write(vLongBytes, 0, len);
+    return len;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java
new file mode 100644
index 0000000..85c79fa
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.internal.PlatformDependent;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/* this class is PooledBuffer holder */
+public class BufferPool {
+
+  private static final PooledByteBufAllocator allocator;
+
+  private BufferPool() {
+  }
+
+  static {
+    //TODO we need determine the default params
+    allocator = new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
+
+    /* if you are finding memory leak, please enable this line */
+    //ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
+  }
+
+  public static long maxDirectMemory() {
+    return PlatformDependent.maxDirectMemory();
+  }
+
+
+  public synchronized static ByteBuf directBuffer(int size) {
+    return allocator.directBuffer(size);
+  }
+
+  /**
+   *
+   * @param size the initial capacity
+   * @param max the max capacity
+   * @return allocated ByteBuf from pool
+   */
+  public static ByteBuf directBuffer(int size, int max) {
+    return allocator.directBuffer(size, max);
+  }
+
+  @InterfaceStability.Unstable
+  public static void forceRelease(ByteBuf buf) {
+    buf.release(buf.refCnt());
+  }
+
+  /**
+   * the ByteBuf will increase to writable size
+   * @param buf
+   * @param minWritableBytes required minimum writable size
+   */
+  public static void ensureWritable(ByteBuf buf, int minWritableBytes) {
+    buf.ensureWritable(minWritableBytes);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
new file mode 100644
index 0000000..b1b6d65
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.io.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.nio.channels.spi.AbstractInterruptibleChannel;
+
+public class ByteBufInputChannel extends AbstractInterruptibleChannel implements ScatteringByteChannel {
+
+  ByteBufferReadable byteBufferReadable;
+  ReadableByteChannel channel;
+  InputStream inputStream;
+
+  public ByteBufInputChannel(InputStream inputStream) {
+    if (inputStream instanceof DFSInputStream && inputStream instanceof ByteBufferReadable) {
+      this.byteBufferReadable = (ByteBufferReadable) inputStream;
+    } else {
+      this.channel = Channels.newChannel(inputStream);
+    }
+
+    this.inputStream = inputStream;
+  }
+
+  @Override
+  public long read(ByteBuffer[] dsts, int offset, int length) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long read(ByteBuffer[] dsts) {
+    return read(dsts, 0, dsts.length);
+  }
+
+  @Override
+  public int read(ByteBuffer dst) throws IOException {
+    if (byteBufferReadable != null) {
+      return byteBufferReadable.read(dst);
+    } else {
+      return channel.read(dst);
+    }
+  }
+
+  @Override
+  protected void implCloseChannel() throws IOException {
+    IOUtils.cleanup(null, channel, inputStream);
+  }
+
+  public int available() throws IOException {
+    return inputStream.available();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java
new file mode 100644
index 0000000..8841a31
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+public class DataLocation {
+  private String host;
+  private int volumeId;
+
+  public DataLocation(String host, int volumeId) {
+    this.host = host;
+    this.volumeId = volumeId;
+  }
+
+  public String getHost() {
+    return host;
+  }
+
+  public int getVolumeId() {
+    return volumeId;
+  }
+
+  @Override
+  public String toString() {
+    return "DataLocation{" +
+        "host=" + host +
+        ", volumeId=" + volumeId +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java
new file mode 100644
index 0000000..2396349
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DiskDeviceInfo {
+	private int id;
+	private String name;
+	
+	private List<DiskMountInfo> mountInfos = new ArrayList<DiskMountInfo>();
+
+	public DiskDeviceInfo(int id) {
+		this.id = id;
+	}
+	
+	public int getId() {
+		return id;
+	}
+
+	public String getName() {
+		return name;
+	}
+
+	public void setName(String name) {
+		this.name = name;
+	}
+	
+	@Override
+	public String toString() {
+		return id + "," + name;
+	}
+
+	public void addMountPath(DiskMountInfo diskMountInfo) {
+		mountInfos.add(diskMountInfo);
+	}
+
+	public List<DiskMountInfo> getMountInfos() {
+		return mountInfos;
+	}
+
+	public void setMountInfos(List<DiskMountInfo> mountInfos) {
+		this.mountInfos = mountInfos;
+	}
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskInfo.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskInfo.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskInfo.java
new file mode 100644
index 0000000..22f18ba
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskInfo.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+public class DiskInfo {
+	private int id;
+	private String partitionName;
+	private String mountPath;
+	
+	private long capacity;
+	private long used;
+	
+	public DiskInfo(int id, String partitionName) {
+		this.id = id;
+		this.partitionName = partitionName;
+	}
+
+	public int getId() {
+		return id;
+	}
+
+	public void setId(int id) {
+		this.id = id;
+	}
+
+	public String getPartitionName() {
+		return partitionName;
+	}
+
+	public void setPartitionName(String partitionName) {
+		this.partitionName = partitionName;
+	}
+
+	public String getMountPath() {
+		return mountPath;
+	}
+
+	public void setMountPath(String mountPath) {
+		this.mountPath = mountPath;
+	}
+
+	public long getCapacity() {
+		return capacity;
+	}
+
+	public void setCapacity(long capacity) {
+		this.capacity = capacity;
+	}
+
+	public long getUsed() {
+		return used;
+	}
+
+	public void setUsed(long used) {
+		this.used = used;
+	}
+	
+	
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskMountInfo.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskMountInfo.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskMountInfo.java
new file mode 100644
index 0000000..aadb0e7
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskMountInfo.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Objects;
+
+public class DiskMountInfo implements Comparable<DiskMountInfo> {
+	private String mountPath;
+	
+	private long capacity;
+	private long used;
+	
+	private int deviceId;
+	
+	public DiskMountInfo(int deviceId, String mountPath) {
+		this.mountPath = mountPath;
+	}
+
+	public String getMountPath() {
+		return mountPath;
+	}
+
+	public void setMountPath(String mountPath) {
+		this.mountPath = mountPath;
+	}
+
+	public long getCapacity() {
+		return capacity;
+	}
+
+	public void setCapacity(long capacity) {
+		this.capacity = capacity;
+	}
+
+	public long getUsed() {
+		return used;
+	}
+
+	public void setUsed(long used) {
+		this.used = used;
+	}
+
+	public int getDeviceId() {
+		return deviceId;
+	}
+
+  @Override
+  public boolean equals(Object obj){
+    if (!(obj instanceof DiskMountInfo)) return false;
+
+    if (compareTo((DiskMountInfo) obj) == 0) return true;
+    else return false;
+  }
+
+  @Override
+  public int hashCode(){
+    return Objects.hashCode(mountPath);
+  }
+
+	@Override
+	public int compareTo(DiskMountInfo other) {
+		String path1 = mountPath;
+		String path2 = other.mountPath;
+		
+		int path1Depth = "/".equals(path1) ? 0 : path1.split("/", -1).length - 1 ;
+		int path2Depth = "/".equals(path2) ? 0 : path2.split("/", -1).length - 1 ;
+		
+		if(path1Depth > path2Depth) {
+			return -1;
+		} else if(path1Depth < path2Depth) {
+			return 1;
+		} else {
+			int path1Length = path1.length();
+			int path2Length = path2.length();
+			
+			if(path1Length < path2Length) {
+				return 1;
+			} else if(path1Length > path2Length) {
+				return -1;
+			} else {
+				return path1.compareTo(path2);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java
new file mode 100644
index 0000000..2d68870
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.common.Util;
+
+import java.io.*;
+import java.net.URI;
+import java.util.*;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+
+public class DiskUtil {
+
+  static String UNIX_DISK_DEVICE_PATH = "/proc/partitions";
+
+  public enum OSType {
+		OS_TYPE_UNIX, OS_TYPE_WINXP, OS_TYPE_SOLARIS, OS_TYPE_MAC
+	}
+
+	static private OSType getOSType() {
+		String osName = System.getProperty("os.name");
+		if (osName.contains("Windows")
+				&& (osName.contains("XP") || osName.contains("2003")
+						|| osName.contains("Vista")
+						|| osName.contains("Windows_7")
+						|| osName.contains("Windows 7") || osName
+							.contains("Windows7"))) {
+			return OSType.OS_TYPE_WINXP;
+		} else if (osName.contains("SunOS") || osName.contains("Solaris")) {
+			return OSType.OS_TYPE_SOLARIS;
+		} else if (osName.contains("Mac")) {
+			return OSType.OS_TYPE_MAC;
+		} else {
+			return OSType.OS_TYPE_UNIX;
+		}
+	}
+	
+	public static List<DiskDeviceInfo> getDiskDeviceInfos() throws IOException {
+		List<DiskDeviceInfo> deviceInfos;
+		
+		if(getOSType() == OSType.OS_TYPE_UNIX) {
+			deviceInfos = getUnixDiskDeviceInfos();
+			setDeviceMountInfo(deviceInfos);
+		} else {
+			deviceInfos = getDefaultDiskDeviceInfos();
+		}
+		
+		return deviceInfos;
+	}
+
+	private static List<DiskDeviceInfo> getUnixDiskDeviceInfos() {
+		List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
+		
+		File file = new File(UNIX_DISK_DEVICE_PATH);
+		if(!file.exists()) {
+			System.out.println("No partition file:" + file.getAbsolutePath());
+			return getDefaultDiskDeviceInfos();
+		}
+		
+		BufferedReader reader = null;
+		try {
+			reader = new BufferedReader(new InputStreamReader(new FileInputStream(UNIX_DISK_DEVICE_PATH)));
+			String line = null;
+			
+			int count = 0;
+			Set<String> deviceNames = new TreeSet<String>();
+			while((line = reader.readLine()) != null) {
+				if(count > 0 && !line.trim().isEmpty()) {
+					String[] tokens = line.trim().split(" +");
+					if(tokens.length == 4) {
+						String deviceName = getDiskDeviceName(tokens[3]);
+						deviceNames.add(deviceName);
+					}
+				}
+				count++;
+			}
+			
+			int id = 0;
+			for(String eachDeviceName: deviceNames) {
+				DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(id++);
+				diskDeviceInfo.setName(eachDeviceName);
+				
+				//TODO set addtional info
+				// /sys/block/sda/queue
+				infos.add(diskDeviceInfo);
+			}
+		} catch (Exception e) {
+			e.printStackTrace();
+		} finally {
+			if(reader != null) {
+				try {
+					reader.close();
+				} catch (IOException e) {
+				}
+			}
+		}
+		
+		return infos;
+	}
+	
+	private static String getDiskDeviceName(String partitionName) {
+		byte[] bytes = partitionName.getBytes();
+		
+		byte[] result = new byte[bytes.length];
+		int length = 0;
+		for(int i = 0; i < bytes.length; i++, length++) {
+			if(bytes[i] >= '0' && bytes[i] <= '9') {
+				break;
+			} else {
+				result[i] = bytes[i];
+			}
+		}
+		
+		return new String(result, 0, length);
+	}
+	
+	public static List<DiskDeviceInfo> getDefaultDiskDeviceInfos() {
+		DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(0);
+		diskDeviceInfo.setName("default");
+		
+		List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
+		
+		infos.add(diskDeviceInfo);
+		
+		return infos;
+	}
+	
+	
+	private static void setDeviceMountInfo(List<DiskDeviceInfo> deviceInfos) throws IOException {
+		Map<String, DiskDeviceInfo> deviceMap = new HashMap<String, DiskDeviceInfo>();
+		for(DiskDeviceInfo eachDevice: deviceInfos) {
+			deviceMap.put(eachDevice.getName(), eachDevice);
+		}
+		
+		BufferedReader mountOutput = null;
+		try {
+			Process mountProcess = Runtime.getRuntime().exec("mount");
+			mountOutput = new BufferedReader(new InputStreamReader(
+					mountProcess.getInputStream()));
+			while (true) {
+				String line = mountOutput.readLine();
+				if (line == null) {
+					break;
+				}
+
+				int indexStart = line.indexOf(" on /");
+				int indexEnd = line.indexOf(" ", indexStart + 4);
+
+				String deviceName = line.substring(0, indexStart).trim();
+				String[] deviceNameTokens = deviceName.split("/");
+				if(deviceNameTokens.length == 3) {
+					if("dev".equals(deviceNameTokens[1])) {
+						String realDeviceName = getDiskDeviceName(deviceNameTokens[2]);
+						String mountPath = new File(line.substring(indexStart + 4, indexEnd)).getAbsolutePath();
+						
+						DiskDeviceInfo diskDeviceInfo = deviceMap.get(realDeviceName);
+						if(diskDeviceInfo != null) {
+							diskDeviceInfo.addMountPath(new DiskMountInfo(diskDeviceInfo.getId(), mountPath));
+						}
+					}
+				}
+			}
+		} catch (IOException e) {
+			throw e;
+		} finally {
+			if (mountOutput != null) {
+				mountOutput.close();
+			}
+		}
+	}
+
+  public static int getDataNodeStorageSize(){
+    return getStorageDirs().size();
+  }
+
+  public static List<URI> getStorageDirs(){
+    Configuration conf = new HdfsConfiguration();
+    Collection<String> dirNames = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
+    return Util.stringCollectionAsURIs(dirNames);
+  }
+
+	public static void main(String[] args) throws Exception {
+		System.out.println("/dev/sde1".split("/").length);
+		for(String eachToken: "/dev/sde1".split("/")) {
+			System.out.println(eachToken);
+		}
+ 	}
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
new file mode 100644
index 0000000..7df4584
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.Datum;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+
+public interface FieldSerializerDeserializer {
+
+  public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException;
+
+  public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
new file mode 100644
index 0000000..8b7e2e0
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.storage;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.exception.UnsupportedException;
+
+/**
+ * An instance of FrameTuple is an immutable tuple.
+ * It contains two tuples and pretends to be one instance of Tuple for
+ * join qual evaluatations.
+ */
+public class FrameTuple implements Tuple, Cloneable {
+  private int size;
+  private int leftSize;
+  
+  private Tuple left;
+  private Tuple right;
+  
+  public FrameTuple() {}
+  
+  public FrameTuple(Tuple left, Tuple right) {
+    set(left, right);
+  }
+  
+  public void set(Tuple left, Tuple right) {
+    this.size = left.size() + right.size();
+    this.left = left;
+    this.leftSize = left.size();
+    this.right = right;
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  @Override
+  public boolean contains(int fieldId) {
+    Preconditions.checkArgument(fieldId < size, 
+        "Out of field access: " + fieldId);
+    
+    if (fieldId < leftSize) {
+      return left.contains(fieldId);
+    } else {
+      return right.contains(fieldId - leftSize);
+    }
+  }
+
+  @Override
+  public boolean isNull(int fieldid) {
+    return get(fieldid).isNull();
+  }
+
+  @Override
+  public boolean isNotNull(int fieldid) {
+    return !isNull(fieldid);
+  }
+
+  @Override
+  public void clear() {
+    throw new UnsupportedException();
+  }
+
+  @Override
+  public void put(int fieldId, Datum value) {
+    throw new UnsupportedException();
+  }
+
+  @Override
+  public void put(int fieldId, Datum[] values) {
+    throw new UnsupportedException();
+  }
+
+  @Override
+  public void put(int fieldId, Tuple tuple) {
+    throw new UnsupportedException();
+  }
+
+  @Override
+  public void setOffset(long offset) {
+    throw new UnsupportedException();
+  }
+  
+  @Override
+  public long getOffset() {
+    throw new UnsupportedException();
+  }
+
+  @Override
+  public void put(Datum [] values) {
+    throw new UnsupportedException();
+  }
+
+  @Override
+  public Datum get(int fieldId) {
+    Preconditions.checkArgument(fieldId < size, 
+        "Out of field access: " + fieldId);
+    
+    if (fieldId < leftSize) {
+      return left.get(fieldId);
+    } else {
+      return right.get(fieldId - leftSize);
+    }
+  }
+
+  @Override
+  public boolean getBool(int fieldId) {
+    return get(fieldId).asBool();
+  }
+
+  @Override
+  public byte getByte(int fieldId) {
+    return get(fieldId).asByte();
+  }
+
+  @Override
+  public char getChar(int fieldId) {
+    return get(fieldId).asChar();
+  }
+
+  @Override
+  public byte [] getBytes(int fieldId) {
+    return get(fieldId).asByteArray();
+  }
+
+  @Override
+  public short getInt2(int fieldId) {
+    return get(fieldId).asInt2();
+  }
+
+  @Override
+  public int getInt4(int fieldId) {
+    return get(fieldId).asInt4();
+  }
+
+  @Override
+  public long getInt8(int fieldId) {
+    return get(fieldId).asInt8();
+  }
+
+  @Override
+  public float getFloat4(int fieldId) {
+    return get(fieldId).asFloat4();
+  }
+
+  @Override
+  public double getFloat8(int fieldId) {
+    return get(fieldId).asFloat8();
+  }
+
+  @Override
+  public String getText(int fieldId) {
+    return get(fieldId).asChars();
+  }
+
+  @Override
+  public ProtobufDatum getProtobufDatum(int fieldId) {
+    return (ProtobufDatum) get(fieldId);
+  }
+
+  @Override
+  public IntervalDatum getInterval(int fieldId) {
+    return (IntervalDatum) get(fieldId);
+  }
+
+  @Override
+  public char [] getUnicodeChars(int fieldId) {
+    return get(fieldId).asUnicodeChars();
+  }
+
+  @Override
+  public Tuple clone() throws CloneNotSupportedException {
+    FrameTuple frameTuple = (FrameTuple) super.clone();
+    frameTuple.set(this.left.clone(), this.right.clone());
+    return frameTuple;
+  }
+
+  @Override
+  public Datum[] getValues(){
+    throw new UnsupportedException();
+  }
+
+  public String toString() {
+    boolean first = true;
+    StringBuilder str = new StringBuilder();
+    str.append("(");
+    for(int i=0; i < size(); i++) {      
+      if(contains(i)) {
+        if(first) {
+          first = false;
+        } else {
+          str.append(", ");
+        }
+        str.append(i)
+        .append("=>")
+        .append(get(i));
+      }
+    }
+    str.append(")");
+    return str.toString();
+  }
+}


Mime
View raw message