tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hj...@apache.org
Subject [14/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project structure.
Date Wed, 03 Dec 2014 05:30:29 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java
new file mode 100644
index 0000000..bfbe478
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.exception.UnsupportedException;
+
+import java.util.Arrays;
+
+public class LazyTuple implements Tuple, Cloneable {
+  private long offset;
+  private Datum[] values;
+  private byte[][] textBytes;
+  private Schema schema;
+  private byte[] nullBytes;
+  private SerializerDeserializer serializeDeserialize;
+
+  public LazyTuple(Schema schema, byte[][] textBytes, long offset) {
+    this(schema, textBytes, offset, NullDatum.get().asTextBytes(), new TextSerializerDeserializer());
+  }
+
+  public LazyTuple(Schema schema, byte[][] textBytes, long offset, byte[] nullBytes, SerializerDeserializer serde) {
+    this.schema = schema;
+    this.textBytes = textBytes;
+    this.values = new Datum[schema.size()];
+    this.offset = offset;
+    this.nullBytes = nullBytes;
+    this.serializeDeserialize = serde;
+  }
+
+  public LazyTuple(LazyTuple tuple) {
+    this.values = tuple.getValues();
+    this.offset = tuple.offset;
+    this.schema = tuple.schema;
+    this.textBytes = new byte[size()][];
+    this.nullBytes = tuple.nullBytes;
+    this.serializeDeserialize = tuple.serializeDeserialize;
+  }
+
+  @Override
+  public int size() {
+    return values.length;
+  }
+
+  @Override
+  public boolean contains(int fieldid) {
+    return textBytes[fieldid] != null || values[fieldid] != null;
+  }
+
+  @Override
+  public boolean isNull(int fieldid) {
+    return get(fieldid).isNull();
+  }
+
+  @Override
+  public boolean isNotNull(int fieldid) {
+    return !isNull(fieldid);
+  }
+
+  @Override
+  public void clear() {
+    for (int i = 0; i < values.length; i++) {
+      values[i] = null;
+      textBytes[i] = null;
+    }
+  }
+
+  //////////////////////////////////////////////////////
+  // Setter
+  //////////////////////////////////////////////////////
+  @Override
+  public void put(int fieldId, Datum value) {
+    values[fieldId] = value;
+    textBytes[fieldId] = null;
+  }
+
+  @Override
+  public void put(int fieldId, Datum[] values) {
+    for (int i = fieldId, j = 0; j < values.length; i++, j++) {
+      this.values[i] = values[j];
+    }
+    this.textBytes = new byte[values.length][];
+  }
+
+  @Override
+  public void put(int fieldId, Tuple tuple) {
+    for (int i = fieldId, j = 0; j < tuple.size(); i++, j++) {
+      values[i] = tuple.get(j);
+      textBytes[i] = null;
+    }
+  }
+
+  @Override
+  public void put(Datum[] values) {
+    System.arraycopy(values, 0, this.values, 0, size());
+    this.textBytes = new byte[values.length][];
+  }
+
+  //////////////////////////////////////////////////////
+  // Getter
+  //////////////////////////////////////////////////////
+  @Override
+  public Datum get(int fieldId) {
+    if (values[fieldId] != null)
+      return values[fieldId];
+    else if (textBytes.length <= fieldId) {
+      values[fieldId] = NullDatum.get();  // split error. (col : 3, separator: ',', row text: "a,")
+    } else if (textBytes[fieldId] != null) {
+      try {
+        values[fieldId] = serializeDeserialize.deserialize(schema.getColumn(fieldId),
+            textBytes[fieldId], 0, textBytes[fieldId].length, nullBytes);
+      } catch (Exception e) {
+        values[fieldId] = NullDatum.get();
+      }
+      textBytes[fieldId] = null;
+    } else {
+      //non-projection
+    }
+    return values[fieldId];
+  }
+
+  @Override
+  public void setOffset(long offset) {
+    this.offset = offset;
+  }
+
+  @Override
+  public long getOffset() {
+    return this.offset;
+  }
+
+  @Override
+  public boolean getBool(int fieldId) {
+    return get(fieldId).asBool();
+  }
+
+  @Override
+  public byte getByte(int fieldId) {
+    return get(fieldId).asByte();
+  }
+
+  @Override
+  public char getChar(int fieldId) {
+    return get(fieldId).asChar();
+  }
+
+  @Override
+  public byte [] getBytes(int fieldId) {
+    return get(fieldId).asByteArray();
+  }
+
+  @Override
+  public short getInt2(int fieldId) {
+    return get(fieldId).asInt2();
+  }
+
+  @Override
+  public int getInt4(int fieldId) {
+    return get(fieldId).asInt4();
+  }
+
+  @Override
+  public long getInt8(int fieldId) {
+    return get(fieldId).asInt8();
+  }
+
+  @Override
+  public float getFloat4(int fieldId) {
+    return get(fieldId).asFloat4();
+  }
+
+  @Override
+  public double getFloat8(int fieldId) {
+    return get(fieldId).asFloat8();
+  }
+
+  @Override
+  public String getText(int fieldId) {
+    return get(fieldId).asChars();
+  }
+
+  @Override
+  public ProtobufDatum getProtobufDatum(int fieldId) {
+    throw new UnsupportedException();
+  }
+
+  @Override
+  public IntervalDatum getInterval(int fieldId) {
+    return (IntervalDatum) get(fieldId);
+  }
+
+  @Override
+  public char[] getUnicodeChars(int fieldId) {
+    return get(fieldId).asUnicodeChars();
+  }
+
+  public String toString() {
+    boolean first = true;
+    StringBuilder str = new StringBuilder();
+    str.append("(");
+    Datum d;
+    for (int i = 0; i < values.length; i++) {
+      d = get(i);
+      if (d != null) {
+        if (first) {
+          first = false;
+        } else {
+          str.append(", ");
+        }
+        str.append(i)
+            .append("=>")
+            .append(d);
+      }
+    }
+    str.append(")");
+    return str.toString();
+  }
+
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(values);
+  }
+
+  @Override
+  public Datum[] getValues() {
+    Datum[] datums = new Datum[values.length];
+    for (int i = 0; i < values.length; i++) {
+      datums[i] = get(i);
+    }
+    return datums;
+  }
+
+  @Override
+  public Tuple clone() throws CloneNotSupportedException {
+    LazyTuple lazyTuple = (LazyTuple) super.clone();
+
+    lazyTuple.values = getValues(); //shallow copy
+    lazyTuple.textBytes = new byte[size()][];
+    return lazyTuple;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof Tuple) {
+      Tuple other = (Tuple) obj;
+      return Arrays.equals(getValues(), other.getValues());
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java
new file mode 100644
index 0000000..f19b61f
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.datum.*;
+import org.apache.tajo.util.ClassSize;
+
+public class MemoryUtil {
+
+  /** Overhead for an NullDatum */
+  public static final long NULL_DATUM;
+
+  /** Overhead for an BoolDatum */
+  public static final long BOOL_DATUM;
+
+  /** Overhead for an CharDatum */
+  public static final long CHAR_DATUM;
+
+  /** Overhead for an BitDatum */
+  public static final long BIT_DATUM;
+
+  /** Overhead for an Int2Datum */
+  public static final long INT2_DATUM;
+
+  /** Overhead for an Int4Datum */
+  public static final long INT4_DATUM;
+
+  /** Overhead for an Int8Datum */
+  public static final long INT8_DATUM;
+
+  /** Overhead for an Float4Datum */
+  public static final long FLOAT4_DATUM;
+
+  /** Overhead for an Float8Datum */
+  public static final long FLOAT8_DATUM;
+
+  /** Overhead for an TextDatum */
+  public static final long TEXT_DATUM;
+
+  /** Overhead for an BlobDatum */
+  public static final long BLOB_DATUM;
+
+  /** Overhead for an DateDatum */
+  public static final long DATE_DATUM;
+
+  /** Overhead for an TimeDatum */
+  public static final long TIME_DATUM;
+
+  /** Overhead for an TimestampDatum */
+  public static final long TIMESTAMP_DATUM;
+
+  static {
+    NULL_DATUM = ClassSize.estimateBase(NullDatum.class, false);
+
+    CHAR_DATUM = ClassSize.estimateBase(CharDatum.class, false);
+
+    BOOL_DATUM = ClassSize.estimateBase(BooleanDatum.class, false);
+
+    BIT_DATUM = ClassSize.estimateBase(BitDatum.class, false);
+
+    INT2_DATUM = ClassSize.estimateBase(Int2Datum.class, false);
+
+    INT4_DATUM = ClassSize.estimateBase(Int4Datum.class, false);
+
+    INT8_DATUM = ClassSize.estimateBase(Int8Datum.class, false);
+
+    FLOAT4_DATUM = ClassSize.estimateBase(Float4Datum.class, false);
+
+    FLOAT8_DATUM = ClassSize.estimateBase(Float8Datum.class, false);
+
+    TEXT_DATUM = ClassSize.estimateBase(TextDatum.class, false);
+
+    BLOB_DATUM = ClassSize.estimateBase(BlobDatum.class, false);
+
+    DATE_DATUM = ClassSize.estimateBase(DateDatum.class, false);
+
+    TIME_DATUM = ClassSize.estimateBase(TimeDatum.class, false);
+
+    TIMESTAMP_DATUM = ClassSize.estimateBase(TimestampDatum.class, false);
+  }
+
+  public static long calculateMemorySize(Tuple tuple) {
+    long total = ClassSize.OBJECT;
+    for (Datum datum : tuple.getValues()) {
+      switch (datum.type()) {
+
+      case NULL_TYPE:
+        total += NULL_DATUM;
+        break;
+
+      case BOOLEAN:
+        total += BOOL_DATUM;
+        break;
+
+      case BIT:
+        total += BIT_DATUM;
+        break;
+
+      case CHAR:
+        total += CHAR_DATUM + datum.size();
+        break;
+
+      case INT1:
+      case INT2:
+        total += INT2_DATUM;
+        break;
+
+      case INT4:
+        total += INT4_DATUM;
+        break;
+
+      case INT8:
+        total += INT8_DATUM;
+        break;
+
+      case FLOAT4:
+        total += FLOAT4_DATUM;
+        break;
+
+      case FLOAT8:
+        total += FLOAT4_DATUM;
+        break;
+
+      case TEXT:
+        total += TEXT_DATUM + datum.size();
+        break;
+
+      case DATE:
+        total += DATE_DATUM;
+        break;
+
+      case TIME:
+        total += TIME_DATUM;
+        break;
+
+      case TIMESTAMP:
+        total += TIMESTAMP_DATUM;
+        break;
+
+      default:
+        break;
+      }
+    }
+
+    return total;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
new file mode 100644
index 0000000..66b3667
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.ColumnStats;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class MergeScanner implements Scanner {
+  private Configuration conf;
+  private TableMeta meta;
+  private Schema schema;
+  private List<Fragment> fragments;
+  private Iterator<Fragment> iterator;
+  private Fragment currentFragment;
+  private Scanner currentScanner;
+  private Tuple tuple;
+  private boolean projectable = false;
+  private boolean selectable = false;
+  private Schema target;
+  private float progress;
+  protected TableStats tableStats;
+
+  public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<Fragment> rawFragmentList)
+      throws IOException {
+    this(conf, schema, meta, rawFragmentList, schema);
+  }
+
+  public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<Fragment> rawFragmentList,
+                      Schema target)
+      throws IOException {
+    this.conf = conf;
+    this.schema = schema;
+    this.meta = meta;
+    this.target = target;
+
+    this.fragments = new ArrayList<Fragment>();
+
+    long numBytes = 0;
+    for (Fragment eachFileFragment: rawFragmentList) {
+      long fragmentLength = StorageManager.getFragmentLength((TajoConf)conf, eachFileFragment);
+      if (fragmentLength > 0) {
+        numBytes += fragmentLength;
+        fragments.add(eachFileFragment);
+      }
+    }
+
+    // it should keep the input order. Otherwise, it causes wrong result of sort queries.
+    this.reset();
+
+    if (currentScanner != null) {
+      this.projectable = currentScanner.isProjectable();
+      this.selectable = currentScanner.isSelectable();
+    }
+
+    tableStats = new TableStats();
+
+    tableStats.setNumBytes(numBytes);
+    tableStats.setNumBlocks(fragments.size());
+
+    for(Column eachColumn: schema.getColumns()) {
+      ColumnStats columnStats = new ColumnStats(eachColumn);
+      tableStats.addColumnStat(columnStats);
+    }
+  }
+
+  @Override
+  public void init() throws IOException {
+    progress = 0.0f;
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    if (currentScanner != null)
+      tuple = currentScanner.next();
+
+    if (tuple != null) {
+      return tuple;
+    } else {
+      if (currentScanner != null) {
+        currentScanner.close();
+        TableStats scannerTableStsts = currentScanner.getInputStats();
+        if (scannerTableStsts != null) {
+          tableStats.setReadBytes(tableStats.getReadBytes() + scannerTableStsts.getReadBytes());
+          tableStats.setNumRows(tableStats.getNumRows() + scannerTableStsts.getNumRows());
+        }
+      }
+      currentScanner = getNextScanner();
+      if (currentScanner != null) {
+        tuple = currentScanner.next();
+      }
+    }
+    return tuple;
+  }
+
+  @Override
+  public void reset() throws IOException {
+    this.iterator = fragments.iterator();
+    if (currentScanner != null) {
+      currentScanner.close();
+    }
+    this.currentScanner = getNextScanner();
+  }
+
+  private Scanner getNextScanner() throws IOException {
+    if (iterator.hasNext()) {
+      currentFragment = iterator.next();
+      currentScanner = StorageManager.getStorageManager((TajoConf)conf, meta.getStoreType()).getScanner(meta, schema,
+          currentFragment, target);
+      currentScanner.init();
+      return currentScanner;
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if(currentScanner != null) {
+      currentScanner.close();
+      currentScanner = null;
+    }
+    iterator = null;
+    progress = 1.0f;
+  }
+
+  @Override
+  public boolean isProjectable() {
+    return projectable;
+  }
+
+  @Override
+  public void setTarget(Column[] targets) {
+    this.target = new Schema(targets);
+  }
+
+  @Override
+  public boolean isSelectable() {
+    return selectable;
+  }
+
+  @Override
+  public void setSearchCondition(Object expr) {
+  }
+
+  @Override
+  public Schema getSchema() {
+    return schema;
+  }
+
+  @Override
+  public boolean isSplittable(){
+    return false;
+  }
+
+  @Override
+  public float getProgress() {
+    if (currentScanner != null && iterator != null && tableStats.getNumBytes() > 0) {
+      TableStats scannerTableStsts = currentScanner.getInputStats();
+      long currentScannerReadBytes = 0;
+      if (scannerTableStsts != null) {
+        currentScannerReadBytes = scannerTableStsts.getReadBytes();
+      }
+
+      return (float)(tableStats.getReadBytes() + currentScannerReadBytes) / (float)tableStats.getNumBytes();
+    } else {
+      return progress;
+    }
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    return tableStats;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java
new file mode 100644
index 0000000..4272228
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java
@@ -0,0 +1,109 @@
+package org.apache.tajo.storage; /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.IOException;
+
+public class NullScanner implements Scanner {
+  protected final Configuration conf;
+  protected final TableMeta meta;
+  protected final Schema schema;
+  protected final Fragment fragment;
+  protected final int columnNum;
+  protected Column [] targets;
+  protected float progress;
+  protected TableStats tableStats;
+
+  public NullScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) {
+    this.conf = conf;
+    this.meta = meta;
+    this.schema = schema;
+    this.fragment = fragment;
+    this.tableStats = new TableStats();
+    this.columnNum = this.schema.size();
+  }
+
+  @Override
+  public void init() throws IOException {
+    progress = 0.0f;
+    tableStats.setNumBytes(0);
+    tableStats.setNumBlocks(0);
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    progress = 1.0f;
+    return null;
+  }
+
+  @Override
+  public void reset() throws IOException {
+    progress = 0.0f;
+  }
+
+  @Override
+  public void close() throws IOException {
+    progress = 1.0f;
+  }
+
+  @Override
+  public boolean isProjectable() {
+    return false;
+  }
+
+  @Override
+  public void setTarget(Column[] targets) {
+    this.targets = targets;
+  }
+
+  @Override
+  public boolean isSelectable() {
+    return true;
+  }
+
+  @Override
+  public void setSearchCondition(Object expr) {
+
+  }
+
+  @Override
+  public boolean isSplittable() {
+    return true;
+  }
+
+  @Override
+  public float getProgress() {
+    return progress;
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    return tableStats;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return schema;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NumericPathComparator.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
new file mode 100644
index 0000000..94d13ee
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.Comparator;
+
+public class NumericPathComparator implements Comparator<Path> {
+
+  @Override
+  public int compare(Path p1, Path p2) {
+    int num1 = Integer.parseInt(p1.getName());
+    int num2 = Integer.parseInt(p2.getName());
+
+    return num1 - num2;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
new file mode 100644
index 0000000..24b6280
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.exception.UnknownDataTypeException;
+import org.apache.tajo.tuple.offheap.RowWriter;
+import org.apache.tajo.util.BitArray;
+
+import java.nio.ByteBuffer;
+
+public class RowStoreUtil {
+  public static int[] getTargetIds(Schema inSchema, Schema outSchema) {
+    int[] targetIds = new int[outSchema.size()];
+    int i = 0;
+    for (Column target : outSchema.getColumns()) {
+      targetIds[i] = inSchema.getColumnId(target.getQualifiedName());
+      i++;
+    }
+
+    return targetIds;
+  }
+
+  public static Tuple project(Tuple in, Tuple out, int[] targetIds) {
+    out.clear();
+    for (int idx = 0; idx < targetIds.length; idx++) {
+      out.put(idx, in.get(targetIds[idx]));
+    }
+    return out;
+  }
+
+  public static RowStoreEncoder createEncoder(Schema schema) {
+    return new RowStoreEncoder(schema);
+  }
+
+  public static RowStoreDecoder createDecoder(Schema schema) {
+    return new RowStoreDecoder(schema);
+  }
+
+  public static class RowStoreDecoder {
+
+    private Schema schema;
+    private BitArray nullFlags;
+    private int headerSize;
+
+    private RowStoreDecoder(Schema schema) {
+      this.schema = schema;
+      nullFlags = new BitArray(schema.size());
+      headerSize = nullFlags.bytesLength();
+    }
+
+
+    public Tuple toTuple(byte [] bytes) {
+      nullFlags.clear();
+      ByteBuffer bb = ByteBuffer.wrap(bytes);
+      Tuple tuple = new VTuple(schema.size());
+      Column col;
+      TajoDataTypes.DataType type;
+
+      bb.limit(headerSize);
+      nullFlags.fromByteBuffer(bb);
+      bb.limit(bytes.length);
+
+      for (int i =0; i < schema.size(); i++) {
+        if (nullFlags.get(i)) {
+          tuple.put(i, DatumFactory.createNullDatum());
+          continue;
+        }
+
+        col = schema.getColumn(i);
+        type = col.getDataType();
+        switch (type.getType()) {
+          case BOOLEAN: tuple.put(i, DatumFactory.createBool(bb.get())); break;
+          case BIT:
+            byte b = bb.get();
+            tuple.put(i, DatumFactory.createBit(b));
+            break;
+
+          case CHAR:
+            byte c = bb.get();
+            tuple.put(i, DatumFactory.createChar(c));
+            break;
+
+          case INT2:
+            short s = bb.getShort();
+            tuple.put(i, DatumFactory.createInt2(s));
+            break;
+
+          case INT4:
+          case DATE:
+            int i_ = bb.getInt();
+            tuple.put(i, DatumFactory.createFromInt4(type, i_));
+            break;
+
+          case INT8:
+          case TIME:
+          case TIMESTAMP:
+            long l = bb.getLong();
+            tuple.put(i, DatumFactory.createFromInt8(type, l));
+            break;
+
+        case INTERVAL:
+            int month  = bb.getInt();
+            long milliseconds  = bb.getLong();
+            tuple.put(i, new IntervalDatum(month, milliseconds));
+            break;
+
+          case FLOAT4:
+            float f = bb.getFloat();
+            tuple.put(i, DatumFactory.createFloat4(f));
+            break;
+
+          case FLOAT8:
+            double d = bb.getDouble();
+            tuple.put(i, DatumFactory.createFloat8(d));
+            break;
+
+          case TEXT:
+            byte [] _string = new byte[bb.getInt()];
+            bb.get(_string);
+            tuple.put(i, DatumFactory.createText(_string));
+            break;
+
+          case BLOB:
+            byte [] _bytes = new byte[bb.getInt()];
+            bb.get(_bytes);
+            tuple.put(i, DatumFactory.createBlob(_bytes));
+            break;
+
+          case INET4:
+            byte [] _ipv4 = new byte[4];
+            bb.get(_ipv4);
+            tuple.put(i, DatumFactory.createInet4(_ipv4));
+            break;
+          case INET6:
+            // TODO - to be implemented
+            throw new UnsupportedException(type.getType().name());
+          default:
+            throw new RuntimeException(new UnknownDataTypeException(type.getType().name()));
+        }
+      }
+      return tuple;
+    }
+
+    public Schema getSchema() {
+      return schema;
+    }
+  }
+
+  public static class RowStoreEncoder {
+    private Schema schema;
+    private BitArray nullFlags;
+    private int headerSize;
+
+    private RowStoreEncoder(Schema schema) {
+      this.schema = schema;
+      nullFlags = new BitArray(schema.size());
+      headerSize = nullFlags.bytesLength();
+    }
+
+    public byte[] toBytes(Tuple tuple) {
+      nullFlags.clear();
+      int size = estimateTupleDataSize(tuple);
+      ByteBuffer bb = ByteBuffer.allocate(size + headerSize);
+      bb.position(headerSize);
+      Column col;
+      for (int i = 0; i < schema.size(); i++) {
+        if (tuple.isNull(i)) {
+          nullFlags.set(i);
+          continue;
+        }
+
+        col = schema.getColumn(i);
+        switch (col.getDataType().getType()) {
+        case NULL_TYPE:
+          nullFlags.set(i);
+          break;
+        case BOOLEAN:
+          bb.put(tuple.get(i).asByte());
+          break;
+        case BIT:
+          bb.put(tuple.get(i).asByte());
+          break;
+        case CHAR:
+          bb.put(tuple.get(i).asByte());
+          break;
+        case INT2:
+          bb.putShort(tuple.get(i).asInt2());
+          break;
+        case INT4:
+          bb.putInt(tuple.get(i).asInt4());
+          break;
+        case INT8:
+          bb.putLong(tuple.get(i).asInt8());
+          break;
+        case FLOAT4:
+          bb.putFloat(tuple.get(i).asFloat4());
+          break;
+        case FLOAT8:
+          bb.putDouble(tuple.get(i).asFloat8());
+          break;
+        case TEXT:
+          byte[] _string = tuple.get(i).asByteArray();
+          bb.putInt(_string.length);
+          bb.put(_string);
+          break;
+        case DATE:
+          bb.putInt(tuple.get(i).asInt4());
+          break;
+        case TIME:
+        case TIMESTAMP:
+          bb.putLong(tuple.get(i).asInt8());
+          break;
+        case INTERVAL:
+          IntervalDatum interval = (IntervalDatum) tuple.get(i);
+          bb.putInt(interval.getMonths());
+          bb.putLong(interval.getMilliSeconds());
+          break;
+        case BLOB:
+          byte[] bytes = tuple.get(i).asByteArray();
+          bb.putInt(bytes.length);
+          bb.put(bytes);
+          break;
+        case INET4:
+          byte[] ipBytes = tuple.get(i).asByteArray();
+          bb.put(ipBytes);
+          break;
+        case INET6:
+          bb.put(tuple.get(i).asByteArray());
+          break;
+        default:
+          throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
+        }
+      }
+
+      byte[] flags = nullFlags.toArray();
+      int finalPosition = bb.position();
+      bb.position(0);
+      bb.put(flags);
+
+      bb.position(finalPosition);
+      bb.flip();
+      byte[] buf = new byte[bb.limit()];
+      bb.get(buf);
+      return buf;
+    }
+
+    // Note that, NULL values are treated separately
+    private int estimateTupleDataSize(Tuple tuple) {
+      int size = 0;
+      Column col;
+
+      for (int i = 0; i < schema.size(); i++) {
+        if (tuple.isNull(i)) {
+          continue;
+        }
+
+        col = schema.getColumn(i);
+        switch (col.getDataType().getType()) {
+        case BOOLEAN:
+        case BIT:
+        case CHAR:
+          size += 1;
+          break;
+        case INT2:
+          size += 2;
+          break;
+        case DATE:
+        case INT4:
+        case FLOAT4:
+          size += 4;
+          break;
+        case TIME:
+        case TIMESTAMP:
+        case INT8:
+        case FLOAT8:
+          size += 8;
+          break;
+        case INTERVAL:
+          size += 12;
+          break;
+        case TEXT:
+        case BLOB:
+          size += (4 + tuple.get(i).asByteArray().length);
+          break;
+        case INET4:
+        case INET6:
+          size += tuple.get(i).asByteArray().length;
+          break;
+        default:
+          throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
+        }
+      }
+
+      size += 100; // optimistic reservation
+
+      return size;
+    }
+
+    public Schema getSchema() {
+      return schema;
+    }
+  }
+
+  public static void convert(Tuple tuple, RowWriter writer) {
+    writer.startRow();
+
+    for (int i = 0; i < writer.dataTypes().length; i++) {
+      if (tuple.isNull(i)) {
+        writer.skipField();
+        continue;
+      }
+      switch (writer.dataTypes()[i].getType()) {
+      case BOOLEAN:
+        writer.putBool(tuple.getBool(i));
+        break;
+      case INT1:
+      case INT2:
+        writer.putInt2(tuple.getInt2(i));
+        break;
+      case INT4:
+      case DATE:
+      case INET4:
+        writer.putInt4(tuple.getInt4(i));
+        break;
+      case INT8:
+      case TIMESTAMP:
+      case TIME:
+        writer.putInt8(tuple.getInt8(i));
+        break;
+      case FLOAT4:
+        writer.putFloat4(tuple.getFloat4(i));
+        break;
+      case FLOAT8:
+        writer.putFloat8(tuple.getFloat8(i));
+        break;
+      case TEXT:
+        writer.putText(tuple.getBytes(i));
+        break;
+      case INTERVAL:
+        writer.putInterval((IntervalDatum) tuple.getInterval(i));
+        break;
+      case PROTOBUF:
+        writer.putProtoDatum((ProtobufDatum) tuple.getProtobufDatum(i));
+        break;
+      case NULL_TYPE:
+        writer.skipField();
+        break;
+      default:
+        throw new UnsupportedException("Unknown data type: " + writer.dataTypes()[i]);
+      }
+    }
+    writer.endRow();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java
new file mode 100644
index 0000000..0356b19
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.SchemaObject;
+import org.apache.tajo.catalog.statistics.TableStats;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Scanner Interface
+ */
+public interface Scanner extends SchemaObject, Closeable {
+
+  void init() throws IOException;
+
+  /**
+   * It returns one tuple at each call. 
+   * 
+   * @return retrieve null if the scanner has no more tuples. 
+   * Otherwise it returns one tuple.
+   * 
+   * @throws java.io.IOException if internal I/O error occurs during next method
+   */
+  Tuple next() throws IOException;
+
+  /**
+   * Reset the cursor. After executed, the scanner
+   * will retrieve the first tuple.
+   *
+   * @throws java.io.IOException if internal I/O error occurs during reset method
+   */
+  void reset() throws IOException;
+
+  /**
+   * Close scanner
+   *
+   * @throws java.io.IOException if internal I/O error occurs during close method
+   */
+  void close() throws IOException;
+
+
+  /**
+   * It returns if the projection is executed in the underlying scanner layer.
+   *
+   * @return true if this scanner can project the given columns.
+   */
+  boolean isProjectable();
+
+  /**
+   * Set target columns
+   * @param targets columns to be projected
+   */
+  void setTarget(Column[] targets);
+
+  /**
+   * It returns if the selection is executed in the underlying scanner layer.
+   *
+   * @return true if this scanner can filter tuples against a given condition.
+   */
+  boolean isSelectable();
+
+  /**
+   * Set a search condition
+   * @param expr to be searched
+   *
+   * TODO - to be changed Object type
+   */
+  void setSearchCondition(Object expr);
+
+  /**
+   * It returns if the file is splittable.
+   *
+   * @return true if this scanner can split the a file.
+   */
+  boolean isSplittable();
+
+  /**
+   * How much of the input has the Scanner consumed
+   * @return progress from <code>0.0</code> to <code>1.0</code>.
+   */
+  float getProgress();
+
+  TableStats getInputStats();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableScanner.java
new file mode 100644
index 0000000..894e7ee
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableScanner.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import java.io.IOException;
+
+public interface SeekableScanner extends Scanner {
+
+  public abstract long getNextOffset() throws IOException;
+
+  public abstract void seek(long offset) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
new file mode 100644
index 0000000..564a9f5
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.Datum;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+@Deprecated
+public interface SerializerDeserializer {
+
+  public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException;
+
+  public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
new file mode 100644
index 0000000..d2a692d
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -0,0 +1,933 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.*;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.plan.rewrite.RewriteRule;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.URI;
+import java.text.NumberFormat;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * StorageManager manages the functions of storing and reading data.
+ * StorageManager is a abstract class.
+ * For supporting such as HDFS, HBASE, a specific StorageManager should be implemented by inheriting this class.
+ *
+ */
+public abstract class StorageManager {
+  private final Log LOG = LogFactory.getLog(StorageManager.class);
+
+  private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
+      Configuration.class,
+      Schema.class,
+      TableMeta.class,
+      Fragment.class
+  };
+
+  private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
+      Configuration.class,
+      QueryUnitAttemptId.class,
+      Schema.class,
+      TableMeta.class,
+      Path.class
+  };
+
+  protected TajoConf conf;
+  protected StoreType storeType;
+
+  /**
+   * Cache of StorageManager.
+   * Key is manager key(warehouse path) + store type
+   */
+  private static final Map<String, StorageManager> storageManagers = Maps.newHashMap();
+
+  /**
+   * Cache of scanner handlers for each storage type.
+   */
+  protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE
+      = new ConcurrentHashMap<String, Class<? extends Scanner>>();
+
+  /**
+   * Cache of appender handlers for each storage type.
+   */
+  protected static final Map<String, Class<? extends Appender>> APPENDER_HANDLER_CACHE
+      = new ConcurrentHashMap<String, Class<? extends Appender>>();
+
+  /**
+   * Cache of constructors for each class. Pins the classes so they
+   * can't be garbage collected until ReflectionUtils can be collected.
+   */
+  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
+      new ConcurrentHashMap<Class<?>, Constructor<?>>();
+
+  public StorageManager(StoreType storeType) {
+    this.storeType = storeType;
+  }
+
+  /**
+   * Initialize storage manager.
+   * @throws java.io.IOException
+   */
+  protected abstract void storageInit() throws IOException;
+
+  /**
+   * This method is called after executing "CREATE TABLE" statement.
+   * If a storage is a file based storage, a storage manager may create directory.
+   *
+   * @param tableDesc Table description which is created.
+   * @param ifNotExists Creates the table only when the table does not exist.
+   * @throws java.io.IOException
+   */
+  public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException;
+
+  /**
+   * This method is called after executing "DROP TABLE" statement with the 'PURGE' option
+   * which is the option to delete all the data.
+   *
+   * @param tableDesc
+   * @throws java.io.IOException
+   */
+  public abstract void purgeTable(TableDesc tableDesc) throws IOException;
+
+  /**
+   * Returns the splits that will serve as input for the scan tasks. The
+   * number of splits matches the number of regions in a table.
+   * @param fragmentId The table name or previous ExecutionBlockId
+   * @param tableDesc The table description for the target data.
+   * @param scanNode The logical node for scanning.
+   * @return The list of input fragments.
+   * @throws java.io.IOException
+   */
+  public abstract List<Fragment> getSplits(String fragmentId, TableDesc tableDesc,
+                                           ScanNode scanNode) throws IOException;
+
+  /**
+   * It returns the splits that will serve as input for the non-forward query scanner such as 'select * from table1'.
+   * The result list should be small. If there is many fragments for scanning, TajoMaster uses the paging navigation.
+   * @param tableDesc The table description for the target data.
+   * @param currentPage The current page number within the entire list.
+   * @param numFragments The number of fragments in the result.
+   * @return The list of input fragments.
+   * @throws java.io.IOException
+   */
+  public abstract List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments)
+      throws IOException;
+
+  /**
+   * It returns the storage property.
+   * @return The storage property
+   */
+  public abstract StorageProperty getStorageProperty();
+
+  /**
+   * Release storage manager resource
+   */
+  public abstract void closeStorageManager();
+
+  /**
+   * It is called by a Repartitioner for range shuffling when the SortRangeType of SortNode is USING_STORAGE_MANAGER.
+   * In general Repartitioner determines the partition range using previous output statistics data.
+   * In the special cases, such as HBase Repartitioner uses the result of this method.
+   *
+   * @param queryContext The current query context which contains query properties.
+   * @param tableDesc The table description for the target data.
+   * @param inputSchema The input schema
+   * @param sortSpecs The sort specification that contains the sort column and sort order.
+   * @return The list of sort ranges.
+   * @throws java.io.IOException
+   */
+  public abstract TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc,
+                                                   Schema inputSchema, SortSpec[] sortSpecs,
+                                                   TupleRange dataRange) throws IOException;
+
+  /**
+   * This method is called before executing 'INSERT' or 'CREATE TABLE as SELECT'.
+   * In general Tajo creates the target table after finishing the final sub-query of CATS.
+   * But In the special cases, such as HBase INSERT or CAST query uses the target table information.
+   * That kind of the storage should implements the logic related to creating table in this method.
+   *
+   * @param node The child node of the root node.
+   * @throws java.io.IOException
+   */
+  public abstract void beforeInsertOrCATS(LogicalNode node) throws IOException;
+
+  /**
+   * It is called when the query failed.
+   * Each storage manager should implement to be processed when the query fails in this method.
+   *
+   * @param node The child node of the root node.
+   * @throws java.io.IOException
+   */
+  public abstract void rollbackOutputCommit(LogicalNode node) throws IOException;
+
+  /**
+   * Returns the current storage type.
+   * @return
+   */
+  public StoreType getStoreType() {
+    return storeType;
+  }
+
+  /**
+   * Initialize StorageManager instance. It should be called before using.
+   *
+   * @param tajoConf
+   * @throws java.io.IOException
+   */
+  public void init(TajoConf tajoConf) throws IOException {
+    this.conf = tajoConf;
+    storageInit();
+  }
+
+  /**
+   * Close StorageManager
+   * @throws java.io.IOException
+   */
+  public void close() throws IOException {
+    synchronized(storageManagers) {
+      for (StorageManager eachStorageManager: storageManagers.values()) {
+        eachStorageManager.closeStorageManager();
+      }
+    }
+  }
+
+  /**
+   * Returns the splits that will serve as input for the scan tasks. The
+   * number of splits matches the number of regions in a table.
+   *
+   * @param fragmentId The table name or previous ExecutionBlockId
+   * @param tableDesc The table description for the target data.
+   * @return The list of input fragments.
+   * @throws java.io.IOException
+   */
+  public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc) throws IOException {
+    return getSplits(fragmentId, tableDesc, null);
+  }
+
+  /**
+   * Returns FileStorageManager instance.
+   *
+   * @param tajoConf Tajo system property.
+   * @return
+   * @throws java.io.IOException
+   */
+  public static StorageManager getFileStorageManager(TajoConf tajoConf) throws IOException {
+    return getFileStorageManager(tajoConf, null);
+  }
+
+  /**
+   * Returns FileStorageManager instance and sets WAREHOUSE_DIR property in tajoConf with warehousePath parameter.
+   *
+   * @param tajoConf Tajo system property.
+   * @param warehousePath The warehouse directory to be set in the tajoConf.
+   * @return
+   * @throws java.io.IOException
+   */
+  public static StorageManager getFileStorageManager(TajoConf tajoConf, Path warehousePath) throws IOException {
+    URI uri;
+    TajoConf copiedConf = new TajoConf(tajoConf);
+    if (warehousePath != null) {
+      copiedConf.setVar(ConfVars.WAREHOUSE_DIR, warehousePath.toUri().toString());
+    }
+    uri = TajoConf.getWarehouseDir(copiedConf).toUri();
+    String key = "file".equals(uri.getScheme()) ? "file" : uri.toString();
+    return getStorageManager(copiedConf, StoreType.CSV, key);
+  }
+
+  /**
+   * Returns the proper StorageManager instance according to the storeType.
+   *
+   * @param tajoConf Tajo system property.
+   * @param storeType Storage type
+   * @return
+   * @throws java.io.IOException
+   */
+  public static StorageManager getStorageManager(TajoConf tajoConf, String storeType) throws IOException {
+    if ("HBASE".equals(storeType)) {
+      return getStorageManager(tajoConf, StoreType.HBASE);
+    } else {
+      return getStorageManager(tajoConf, StoreType.CSV);
+    }
+  }
+
+  /**
+   * Returns the proper StorageManager instance according to the storeType.
+   *
+   * @param tajoConf Tajo system property.
+   * @param storeType Storage type
+   * @return
+   * @throws java.io.IOException
+   */
+  public static StorageManager getStorageManager(TajoConf tajoConf, StoreType storeType) throws IOException {
+    return getStorageManager(tajoConf, storeType, null);
+  }
+
+  /**
+   * Returns the proper StorageManager instance according to the storeType
+   *
+   * @param tajoConf Tajo system property.
+   * @param storeType Storage type
+   * @param managerKey Key that can identify each storage manager(may be a path)
+   * @return
+   * @throws java.io.IOException
+   */
+  public static synchronized StorageManager getStorageManager (
+      TajoConf tajoConf, StoreType storeType, String managerKey) throws IOException {
+    synchronized (storageManagers) {
+      String storeKey = storeType + managerKey;
+      StorageManager manager = storageManagers.get(storeKey);
+      if (manager == null) {
+        String typeName = "hdfs";
+
+        switch (storeType) {
+          case HBASE:
+            typeName = "hbase";
+            break;
+          default:
+            typeName = "hdfs";
+        }
+
+        Class<? extends StorageManager> storageManagerClass =
+            tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, StorageManager.class);
+
+        if (storageManagerClass == null) {
+          throw new IOException("Unknown Storage Type: " + typeName);
+        }
+
+        try {
+          Constructor<? extends StorageManager> constructor =
+              (Constructor<? extends StorageManager>) CONSTRUCTOR_CACHE.get(storageManagerClass);
+          if (constructor == null) {
+            constructor = storageManagerClass.getDeclaredConstructor(new Class<?>[]{StoreType.class});
+            constructor.setAccessible(true);
+            CONSTRUCTOR_CACHE.put(storageManagerClass, constructor);
+          }
+          manager = constructor.newInstance(new Object[]{storeType});
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+        manager.init(tajoConf);
+        storageManagers.put(storeKey, manager);
+      }
+
+      return manager;
+    }
+  }
+
+  /**
+   * Returns Scanner instance.
+   *
+   * @param meta The table meta
+   * @param schema The input schema
+   * @param fragment The fragment for scanning
+   * @param target Columns which are selected.
+   * @return Scanner instance
+   * @throws java.io.IOException
+   */
+  public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException {
+    return getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), target);
+  }
+
+  /**
+   * Returns Scanner instance.
+   *
+   * @param meta The table meta
+   * @param schema The input schema
+   * @param fragment The fragment for scanning
+   * @return Scanner instance
+   * @throws java.io.IOException
+   */
+  public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException {
+    return getScanner(meta, schema, fragment, schema);
+  }
+
+  /**
+   * Returns Scanner instance.
+   *
+   * @param meta The table meta
+   * @param schema The input schema
+   * @param fragment The fragment for scanning
+   * @param target The output schema
+   * @return Scanner instance
+   * @throws java.io.IOException
+   */
+  public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
+    if (fragment.isEmpty()) {
+      Scanner scanner = new NullScanner(conf, schema, meta, fragment);
+      scanner.setTarget(target.toArray());
+
+      return scanner;
+    }
+
+    Scanner scanner;
+
+    Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType());
+    scanner = newScannerInstance(scannerClass, conf, schema, meta, fragment);
+    if (scanner.isProjectable()) {
+      scanner.setTarget(target.toArray());
+    }
+
+    return scanner;
+  }
+
+  /**
+   * Returns Scanner instance.
+   *
+   * @param conf The system property
+   * @param meta The table meta
+   * @param schema The input schema
+   * @param fragment The fragment for scanning
+   * @param target The output schema
+   * @return Scanner instance
+   * @throws java.io.IOException
+   */
+  public static synchronized SeekableScanner getSeekableScanner(
+      TajoConf conf, TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
+    return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target);
+  }
+
+  /**
+   * Returns Appender instance.
+   * @param queryContext Query property.
+   * @param taskAttemptId Task id.
+   * @param meta Table meta data.
+   * @param schema Output schema.
+   * @param workDir Working directory
+   * @return Appender instance
+   * @throws java.io.IOException
+   */
+  public Appender getAppender(OverridableConf queryContext,
+                              QueryUnitAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir)
+      throws IOException {
+    Appender appender;
+
+    Class<? extends Appender> appenderClass;
+
+    String handlerName = meta.getStoreType().name().toLowerCase();
+    appenderClass = APPENDER_HANDLER_CACHE.get(handlerName);
+    if (appenderClass == null) {
+      appenderClass = conf.getClass(
+          String.format("tajo.storage.appender-handler.%s.class",
+              meta.getStoreType().name().toLowerCase()), null, Appender.class);
+      APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
+    }
+
+    if (appenderClass == null) {
+      throw new IOException("Unknown Storage Type: " + meta.getStoreType());
+    }
+
+    appender = newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir);
+
+    return appender;
+  }
+
+  /**
+   * Creates a scanner instance.
+   *
+   * @param theClass Concrete class of scanner
+   * @param conf System property
+   * @param schema Input schema
+   * @param meta Table meta data
+   * @param fragment The fragment for scanning
+   * @param <T>
+   * @return The scanner instance
+   */
+  public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta,
+                                         Fragment fragment) {
+    T result;
+    try {
+      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+      if (meth == null) {
+        meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
+        meth.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(theClass, meth);
+      }
+      result = meth.newInstance(new Object[]{conf, schema, meta, fragment});
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return result;
+  }
+
+  /**
+   * Creates a scanner instance.
+   *
+   * @param theClass Concrete class of scanner
+   * @param conf System property
+   * @param taskAttemptId Task id
+   * @param meta Table meta data
+   * @param schema Input schema
+   * @param workDir Working directory
+   * @param <T>
+   * @return The scanner instance
+   */
+  public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, QueryUnitAttemptId taskAttemptId,
+                                          TableMeta meta, Schema schema, Path workDir) {
+    T result;
+    try {
+      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+      if (meth == null) {
+        meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
+        meth.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(theClass, meth);
+      }
+      result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, meta, workDir});
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return result;
+  }
+
+  /**
+   * Return the Scanner class for the StoreType that is defined in storage-default.xml.
+   *
+   * @param storeType store type
+   * @return The Scanner class
+   * @throws java.io.IOException
+   */
+  public Class<? extends Scanner> getScannerClass(CatalogProtos.StoreType storeType) throws IOException {
+    String handlerName = storeType.name().toLowerCase();
+    Class<? extends Scanner> scannerClass = SCANNER_HANDLER_CACHE.get(handlerName);
+    if (scannerClass == null) {
+      scannerClass = conf.getClass(
+          String.format("tajo.storage.scanner-handler.%s.class",storeType.name().toLowerCase()), null, Scanner.class);
+      SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
+    }
+
+    if (scannerClass == null) {
+      throw new IOException("Unknown Storage Type: " + storeType.name());
+    }
+
+    return scannerClass;
+  }
+
+  /**
+   * Return length of the fragment.
+   * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration.
+   *
+   * @param conf Tajo system property
+   * @param fragment Fragment
+   * @return
+   */
+  public static long getFragmentLength(TajoConf conf, Fragment fragment) {
+    if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) {
+      return conf.getLongVar(ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH);
+    } else {
+      return fragment.getLength();
+    }
+  }
+
+  /**
+   * It is called after making logical plan. Storage manager should verify the schema for inserting.
+   *
+   * @param tableDesc The table description of insert target.
+   * @param outSchema  The output schema of select query for inserting.
+   * @throws java.io.IOException
+   */
+  public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException {
+    // nothing to do
+  }
+
+  /**
+   * Returns the list of storage specified rewrite rules.
+   * This values are used by LogicalOptimizer.
+   *
+   * @param queryContext The query property
+   * @param tableDesc The description of the target table.
+   * @return The list of storage specified rewrite rules
+   * @throws java.io.IOException
+   */
+  public List<RewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException {
+    return null;
+  }
+
+  /**
+   * Finalizes result data. Tajo stores result data in the staging directory.
+   * If the query fails, clean up the staging directory.
+   * Otherwise the query is successful, move to the final directory from the staging directory.
+   *
+   * @param queryContext The query property
+   * @param finalEbId The final execution block id
+   * @param plan The query plan
+   * @param schema The final output schema
+   * @param tableDesc The description of the target table
+   * @return Saved path
+   * @throws java.io.IOException
+   */
+  public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId,
+                               LogicalPlan plan, Schema schema,
+                               TableDesc tableDesc) throws IOException {
+    return commitOutputData(queryContext, finalEbId, plan, schema, tableDesc, true);
+  }
+
+  /**
+   * Finalizes result data. Tajo stores result data in the staging directory.
+   * If the query fails, clean up the staging directory.
+   * Otherwise the query is successful, move to the final directory from the staging directory.
+   *
+   * @param queryContext The query property
+   * @param finalEbId The final execution block id
+   * @param plan The query plan
+   * @param schema The final output schema
+   * @param tableDesc The description of the target table
+   * @param changeFileSeq If true change result file name with max sequence.
+   * @return Saved path
+   * @throws java.io.IOException
+   */
+  protected Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId,
+                               LogicalPlan plan, Schema schema,
+                               TableDesc tableDesc, boolean changeFileSeq) throws IOException {
+    Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR));
+    Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+    Path finalOutputDir;
+    if (!queryContext.get(QueryVars.OUTPUT_TABLE_PATH, "").isEmpty()) {
+      finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_PATH));
+      FileSystem fs = stagingResultDir.getFileSystem(conf);
+
+      if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO
+
+        // It moves the original table into the temporary location.
+        // Then it moves the new result table into the original table location.
+        // Upon failed, it recovers the original table if possible.
+        boolean movedToOldTable = false;
+        boolean committed = false;
+        Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
+
+        if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) {
+          // This is a map for existing non-leaf directory to rename. A key is current directory and a value is
+          // renaming directory.
+          Map<Path, Path> renameDirs = TUtil.newHashMap();
+          // This is a map for recovering existing partition directory. A key is current directory and a value is
+          // temporary directory to back up.
+          Map<Path, Path> recoveryDirs = TUtil.newHashMap();
+
+          try {
+            if (!fs.exists(finalOutputDir)) {
+              fs.mkdirs(finalOutputDir);
+            }
+
+            visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(),
+                renameDirs, oldTableDir);
+
+            // Rename target partition directories
+            for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
+              // Backup existing data files for recovering
+              if (fs.exists(entry.getValue())) {
+                String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(),
+                    oldTableDir.toString());
+                Path recoveryPath = new Path(recoveryPathString);
+                fs.rename(entry.getValue(), recoveryPath);
+                fs.exists(recoveryPath);
+                recoveryDirs.put(entry.getValue(), recoveryPath);
+              }
+              // Delete existing directory
+              fs.delete(entry.getValue(), true);
+              // Rename staging directory to final output directory
+              fs.rename(entry.getKey(), entry.getValue());
+            }
+
+          } catch (IOException ioe) {
+            // Remove created dirs
+            for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
+              fs.delete(entry.getValue(), true);
+            }
+
+            // Recovery renamed dirs
+            for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) {
+              fs.delete(entry.getValue(), true);
+              fs.rename(entry.getValue(), entry.getKey());
+            }
+            throw new IOException(ioe.getMessage());
+          }
+        } else {
+          try {
+            if (fs.exists(finalOutputDir)) {
+              fs.rename(finalOutputDir, oldTableDir);
+              movedToOldTable = fs.exists(oldTableDir);
+            } else { // if the parent does not exist, make its parent directory.
+              fs.mkdirs(finalOutputDir.getParent());
+            }
+
+            fs.rename(stagingResultDir, finalOutputDir);
+            committed = fs.exists(finalOutputDir);
+          } catch (IOException ioe) {
+            // recover the old table
+            if (movedToOldTable && !committed) {
+              fs.rename(oldTableDir, finalOutputDir);
+            }
+          }
+        }
+      } else {
+        String queryType = queryContext.get(QueryVars.COMMAND_TYPE);
+
+        if (queryType != null && queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table
+
+          NumberFormat fmt = NumberFormat.getInstance();
+          fmt.setGroupingUsed(false);
+          fmt.setMinimumIntegerDigits(3);
+
+          if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) {
+            for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
+              if (eachFile.isFile()) {
+                LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath());
+                continue;
+              }
+              moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq);
+            }
+          } else {
+            int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1;
+            for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
+              if (eachFile.getPath().getName().startsWith("_")) {
+                continue;
+              }
+              moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq);
+            }
+          }
+          // checking all file moved and remove empty dir
+          verifyAllFileMoved(fs, stagingResultDir);
+          FileStatus[] files = fs.listStatus(stagingResultDir);
+          if (files != null && files.length != 0) {
+            for (FileStatus eachFile: files) {
+              LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
+            }
+          }
+        } else { // CREATE TABLE AS SELECT (CTAS)
+          fs.rename(stagingResultDir, finalOutputDir);
+          LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir);
+        }
+      }
+    } else {
+      finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+    }
+
+    return finalOutputDir;
+  }
+
+  /**
+   * Attach the sequence number to the output file name and than move the file into the final result path.
+   *
+   * @param fs FileSystem
+   * @param stagingResultDir The staging result dir
+   * @param fileStatus The file status
+   * @param finalOutputPath Final output path
+   * @param nf Number format
+   * @param fileSeq The sequence number
+   * @throws java.io.IOException
+   */
+  private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir,
+                                          FileStatus fileStatus, Path finalOutputPath,
+                                          NumberFormat nf,
+                                          int fileSeq, boolean changeFileSeq) throws IOException {
+    if (fileStatus.isDirectory()) {
+      String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
+      if (subPath != null) {
+        Path finalSubPath = new Path(finalOutputPath, subPath);
+        if (!fs.exists(finalSubPath)) {
+          fs.mkdirs(finalSubPath);
+        }
+        int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false);
+        for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) {
+          if (eachFile.getPath().getName().startsWith("_")) {
+            continue;
+          }
+          moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq, changeFileSeq);
+        }
+      } else {
+        throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath());
+      }
+    } else {
+      String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
+      if (subPath != null) {
+        Path finalSubPath = new Path(finalOutputPath, subPath);
+        if (changeFileSeq) {
+          finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf));
+        }
+        if (!fs.exists(finalSubPath.getParent())) {
+          fs.mkdirs(finalSubPath.getParent());
+        }
+        if (fs.exists(finalSubPath)) {
+          throw new IOException("Already exists data file:" + finalSubPath);
+        }
+        boolean success = fs.rename(fileStatus.getPath(), finalSubPath);
+        if (success) {
+          LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " +
+              "to final output[" + finalSubPath + "]");
+        } else {
+          LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " +
+              "to final output[" + finalSubPath + "]");
+        }
+      }
+    }
+  }
+
+  /**
+   * Removes the path of the parent.
+   * @param parentPath
+   * @param childPath
+   * @return
+   */
+  private String extractSubPath(Path parentPath, Path childPath) {
+    String parentPathStr = parentPath.toUri().getPath();
+    String childPathStr = childPath.toUri().getPath();
+
+    if (parentPathStr.length() > childPathStr.length()) {
+      return null;
+    }
+
+    int index = childPathStr.indexOf(parentPathStr);
+    if (index != 0) {
+      return null;
+    }
+
+    return childPathStr.substring(parentPathStr.length() + 1);
+  }
+
+  /**
+   * Attach the sequence number to a path.
+   *
+   * @param path Path
+   * @param seq sequence number
+   * @param nf Number format
+   * @return New path attached with sequence number
+   * @throws java.io.IOException
+   */
+  private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException {
+    String[] tokens = path.getName().split("-");
+    if (tokens.length != 4) {
+      throw new IOException("Wrong result file name:" + path);
+    }
+    return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq);
+  }
+
+  /**
+   * Make sure all files are moved.
+   * @param fs FileSystem
+   * @param stagingPath The stagind directory
+   * @return
+   * @throws java.io.IOException
+   */
+  private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException {
+    FileStatus[] files = fs.listStatus(stagingPath);
+    if (files != null && files.length != 0) {
+      for (FileStatus eachFile: files) {
+        if (eachFile.isFile()) {
+          LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
+          return false;
+        } else {
+          if (verifyAllFileMoved(fs, eachFile.getPath())) {
+            fs.delete(eachFile.getPath(), false);
+          } else {
+            return false;
+          }
+        }
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * This method sets a rename map which includes renamed staging directory to final output directory recursively.
+   * If there exists some data files, this delete it for duplicate data.
+   *
+   *
+   * @param fs
+   * @param stagingPath
+   * @param outputPath
+   * @param stagingParentPathString
+   * @throws java.io.IOException
+   */
+  private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath,
+                                         String stagingParentPathString,
+                                         Map<Path, Path> renameDirs, Path oldTableDir) throws IOException {
+    FileStatus[] files = fs.listStatus(stagingPath);
+
+    for(FileStatus eachFile : files) {
+      if (eachFile.isDirectory()) {
+        Path oldPath = eachFile.getPath();
+
+        // Make recover directory.
+        String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString,
+            oldTableDir.toString());
+        Path recoveryPath = new Path(recoverPathString);
+        if (!fs.exists(recoveryPath)) {
+          fs.mkdirs(recoveryPath);
+        }
+
+        visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString,
+            renameDirs, oldTableDir);
+        // Find last order partition for renaming
+        String newPathString = oldPath.toString().replaceAll(stagingParentPathString,
+            outputPath.toString());
+        Path newPath = new Path(newPathString);
+        if (!isLeafDirectory(fs, eachFile.getPath())) {
+          renameDirs.put(eachFile.getPath(), newPath);
+        } else {
+          if (!fs.exists(newPath)) {
+            fs.mkdirs(newPath);
+          }
+        }
+      }
+    }
+  }
+
+  private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException {
+    boolean retValue = false;
+
+    FileStatus[] files = fs.listStatus(path);
+    for (FileStatus file : files) {
+      if (fs.isDirectory(file.getPath())) {
+        retValue = true;
+        break;
+      }
+    }
+
+    return retValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
new file mode 100644
index 0000000..6816d08
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+public class StorageProperty {
+  private boolean supportsInsertInto;
+  private boolean sortedInsert;
+
+  public boolean isSupportsInsertInto() {
+    return supportsInsertInto;
+  }
+
+  public void setSupportsInsertInto(boolean supportsInsertInto) {
+    this.supportsInsertInto = supportsInsertInto;
+  }
+
+  public boolean isSortedInsert() {
+    return sortedInsert;
+  }
+
+  public void setSortedInsert(boolean sortedInsert) {
+    this.sortedInsert = sortedInsert;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
new file mode 100644
index 0000000..54fdb69
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.util.FileUtil;
+import sun.nio.ch.DirectBuffer;
+
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class StorageUtil extends StorageConstants {
+  public static int getRowByteSize(Schema schema) {
+    int sum = 0;
+    for(Column col : schema.getColumns()) {
+      sum += StorageUtil.getColByteSize(col);
+    }
+
+    return sum;
+  }
+
+  public static int getColByteSize(Column col) {
+    switch (col.getDataType().getType()) {
+      case BOOLEAN:
+        return 1;
+      case CHAR:
+        return 1;
+      case BIT:
+        return 1;
+      case INT2:
+        return 2;
+      case INT4:
+        return 4;
+      case INT8:
+        return 8;
+      case FLOAT4:
+        return 4;
+      case FLOAT8:
+        return 8;
+      case INET4:
+        return 4;
+      case INET6:
+        return 32;
+      case TEXT:
+        return 256;
+      case BLOB:
+        return 256;
+      case DATE:
+        return 4;
+      case TIME:
+        return 8;
+      case TIMESTAMP:
+        return 8;
+      default:
+        return 0;
+    }
+  }
+
+  public static void writeTableMeta(Configuration conf, Path tableroot, TableMeta meta) throws IOException {
+    FileSystem fs = tableroot.getFileSystem(conf);
+    FSDataOutputStream out = fs.create(new Path(tableroot, ".meta"));
+    FileUtil.writeProto(out, meta.getProto());
+    out.flush();
+    out.close();
+  }
+  
+  public static Path concatPath(String parent, String...childs) {
+    return concatPath(new Path(parent), childs);
+  }
+  
+  public static Path concatPath(Path parent, String...childs) {
+    StringBuilder sb = new StringBuilder();
+    
+    for(int i=0; i < childs.length; i++) {      
+      sb.append(childs[i]);
+      if(i < childs.length - 1)
+        sb.append("/");
+    }
+    
+    return new Path(parent, sb.toString());
+  }
+
+  static final String fileNamePatternV08 = "part-[0-9]*-[0-9]*";
+  static final String fileNamePatternV09 = "part-[0-9]*-[0-9]*-[0-9]*";
+
+  /**
+   * Written files can be one of two forms: "part-[0-9]*-[0-9]*" or "part-[0-9]*-[0-9]*-[0-9]*".
+   *
+   * This method finds the maximum sequence number from existing data files through the above patterns.
+   * If it cannot find any matched file or the maximum number, it will return -1.
+   *
+   * @param fs
+   * @param path
+   * @param recursive
+   * @return The maximum sequence number
+   * @throws java.io.IOException
+   */
+  public static int getMaxFileSequence(FileSystem fs, Path path, boolean recursive) throws IOException {
+    if (!fs.isDirectory(path)) {
+      return -1;
+    }
+
+    FileStatus[] files = fs.listStatus(path);
+
+    if (files == null || files.length == 0) {
+      return -1;
+    }
+
+    int maxValue = -1;
+    List<Path> fileNamePatternMatchedList = new ArrayList<Path>();
+
+    for (FileStatus eachFile: files) {
+      // In the case of partition table, return largest value within all partition dirs.
+      if (eachFile.isDirectory() && recursive) {
+        int value = getMaxFileSequence(fs, eachFile.getPath(), recursive);
+        if (value > maxValue) {
+          maxValue = value;
+        }
+      } else {
+        if (eachFile.getPath().getName().matches(fileNamePatternV08) ||
+            eachFile.getPath().getName().matches(fileNamePatternV09)) {
+          fileNamePatternMatchedList.add(eachFile.getPath());
+        }
+      }
+    }
+
+    if (fileNamePatternMatchedList.isEmpty()) {
+      return maxValue;
+    }
+    Path lastFile = fileNamePatternMatchedList.get(fileNamePatternMatchedList.size() - 1);
+    String pathName = lastFile.getName();
+
+    // 0.8: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq>
+    // 0.9: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq>-<Sequence>
+    String[] pathTokens = pathName.split("-");
+    if (pathTokens.length == 3) {
+      return -1;
+    } else if(pathTokens.length == 4) {
+      return Integer.parseInt(pathTokens[3]);
+    } else {
+      return -1;
+    }
+  }
+
+  public static void closeBuffer(ByteBuffer buffer) {
+    if (buffer != null) {
+      if (buffer.isDirect()) {
+        ((DirectBuffer) buffer).cleaner().clean();
+      } else {
+        buffer.clear();
+      }
+    }
+  }
+
+  public static int readFully(InputStream is, byte[] buffer, int offset, int length)
+      throws IOException {
+    int nread = 0;
+    while (nread < length) {
+      int nbytes = is.read(buffer, offset + nread, length - nread);
+      if (nbytes < 0) {
+        return nread > 0 ? nread : nbytes;
+      }
+      nread += nbytes;
+    }
+    return nread;
+  }
+
+  /**
+   * Similar to readFully(). Skips bytes in a loop.
+   * @param in The DataInput to skip bytes from
+   * @param len number of bytes to skip.
+   * @throws java.io.IOException if it could not skip requested number of bytes
+   * for any reason (including EOF)
+   */
+  public static void skipFully(DataInput in, int len) throws IOException {
+    int amt = len;
+    while (amt > 0) {
+      long ret = in.skipBytes(amt);
+      if (ret == 0) {
+        // skip may return 0 even if we're not at EOF.  Luckily, we can
+        // use the read() method to figure out if we're at the end.
+        int b = in.readByte();
+        if (b == -1) {
+          throw new EOFException( "Premature EOF from inputStream after " +
+              "skipping " + (len - amt) + " byte(s).");
+        }
+        ret = 1;
+      }
+      amt -= ret;
+    }
+  }
+}


Mime
View raw message